// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
#include <inttypes.h>
#include "db/builder.h"
+#include "db/error_handler.h"
+#include "db/event_helpers.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_updater.h"
#include "util/sync_point.h"
namespace rocksdb {
+
+bool DBImpl::EnoughRoomForCompaction(
+ ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
+ bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
+ // Check if we have enough room to do the compaction
+ bool enough_room = true;
+#ifndef ROCKSDB_LITE
+ auto sfm = static_cast<SstFileManagerImpl*>(
+ immutable_db_options_.sst_file_manager.get());
+ if (sfm) {
+ // Pass the current bg_error_ to SFM so it can decide what checks to
+ // perform. If this DB instance hasn't seen any error yet, the SFM can be
+ // optimistic and not do disk space checks
+ enough_room =
+ sfm->EnoughRoomForCompaction(cfd, inputs, error_handler_.GetBGError());
+ if (enough_room) {
+ *sfm_reserved_compact_space = true;
+ }
+ }
+#else
+ (void)cfd;
+ (void)inputs;
+ (void)sfm_reserved_compact_space;
+#endif // ROCKSDB_LITE
+ if (!enough_room) {
+ // Just in case tests want to change the value of enough_room
+ TEST_SYNC_POINT_CALLBACK(
+ "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
+ ROCKS_LOG_BUFFER(log_buffer,
+ "Cancelled compaction because not enough room");
+ RecordTick(stats_, COMPACTION_CANCELLED, 1);
+ }
+ return enough_room;
+}
+
Status DBImpl::SyncClosedLogs(JobContext* job_context) {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
mutex_.AssertHeld();
"[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
log->get_log_number());
s = log->file()->Sync(immutable_db_options_.use_fsync);
+ if (!s.ok()) {
+ break;
+ }
}
if (s.ok()) {
s = directories_.GetWalDir()->Fsync();
// "number < current_log_number".
MarkLogsSynced(current_log_number - 1, true, s);
if (!s.ok()) {
- bg_error_ = s;
+ error_handler_.SetBGError(s, BackgroundErrorReason::kFlush);
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
return s;
}
Status DBImpl::FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
- bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) {
+ bool* made_progress, JobContext* job_context,
+ SuperVersionContext* superversion_context, LogBuffer* log_buffer) {
mutex_.AssertHeld();
assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending());
std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot);
+ auto snapshot_checker = snapshot_checker_.get();
+ if (use_custom_gc_ && snapshot_checker == nullptr) {
+ snapshot_checker = DisableGCSnapshotChecker::Instance();
+ }
FlushJob flush_job(
- dbname_, cfd, immutable_db_options_, mutable_cf_options, env_options_,
- versions_.get(), &mutex_, &shutting_down_, snapshot_seqs,
- earliest_write_conflict_snapshot, job_context, log_buffer,
- directories_.GetDbDir(), directories_.GetDataDir(0U),
+ dbname_, cfd, immutable_db_options_, mutable_cf_options,
+ env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
+ snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
+ job_context, log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats);
// and EventListener callback will be called when the db_mutex
// is unlocked by the current thread.
if (s.ok()) {
- s = flush_job.Run(&file_meta);
+ s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
} else {
flush_job.Cancel();
}
if (s.ok()) {
- InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context,
- mutable_cf_options);
+ InstallSuperVersionAndScheduleWork(cfd, superversion_context,
+ mutable_cf_options);
if (made_progress) {
*made_progress = 1;
}
cfd->current()->storage_info()->LevelSummary(&tmp));
}
- if (!s.ok() && !s.IsShutdownInProgress() &&
- immutable_db_options_.paranoid_checks && bg_error_.ok()) {
- // if a bad error happened (not ShutdownInProgress) and paranoid_checks is
- // true, mark DB read-only
- bg_error_ = s;
+ if (!s.ok() && !s.IsShutdownInProgress()) {
+ Status new_bg_error = s;
+ error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
if (s.ok()) {
#ifndef ROCKSDB_LITE
if (sfm) {
// Notify sst_file_manager that a new file was added
std::string file_path = MakeTableFileName(
- immutable_db_options_.db_paths[0].path, file_meta.fd.GetNumber());
+ cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
sfm->OnAddFile(file_path);
- if (sfm->IsMaxAllowedSpaceReached() && bg_error_.ok()) {
- bg_error_ = Status::IOError("Max allowed space was reached");
+ if (sfm->IsMaxAllowedSpaceReached()) {
+ Status new_bg_error = Status::SpaceLimit("Max allowed space was reached");
TEST_SYNC_POINT_CALLBACK(
"DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
- &bg_error_);
+ &new_bg_error);
+ error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
}
#endif // ROCKSDB_LITE
return s;
}
+Status DBImpl::FlushMemTablesToOutputFiles(
+ const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
+ JobContext* job_context, LogBuffer* log_buffer) {
+ Status s;
+ for (auto& arg : bg_flush_args) {
+ ColumnFamilyData* cfd = arg.cfd_;
+ const MutableCFOptions& mutable_cf_options =
+ *cfd->GetLatestMutableCFOptions();
+ SuperVersionContext* superversion_context = arg.superversion_context_;
+ s = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
+ job_context, superversion_context,
+ log_buffer);
+ if (!s.ok()) {
+ break;
+ }
+ }
+ return s;
+}
+
void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) {
info.cf_name = cfd->GetName();
// TODO(yhchiang): make db_paths dynamic in case flush does not
// go to L0 in the future.
- info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path,
+ info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
file_meta->fd.GetNumber());
info.thread_id = env_->GetThreadID();
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop;
- info.smallest_seqno = file_meta->smallest_seqno;
- info.largest_seqno = file_meta->largest_seqno;
+ info.smallest_seqno = file_meta->fd.smallest_seqno;
+ info.largest_seqno = file_meta->fd.largest_seqno;
info.table_properties = prop;
+ info.flush_reason = cfd->GetFlushReason();
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushBegin(this, info);
}
mutex_.Lock();
// no need to signal bg_cv_ as it will be signaled at the end of the
// flush process.
+#else
+ (void)cfd;
+ (void)file_meta;
+ (void)mutable_cf_options;
+ (void)job_id;
+ (void)prop;
#endif // ROCKSDB_LITE
}
info.cf_name = cfd->GetName();
// TODO(yhchiang): make db_paths dynamic in case flush does not
// go to L0 in the future.
- info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path,
+ info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
file_meta->fd.GetNumber());
info.thread_id = env_->GetThreadID();
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop;
- info.smallest_seqno = file_meta->smallest_seqno;
- info.largest_seqno = file_meta->largest_seqno;
+ info.smallest_seqno = file_meta->fd.smallest_seqno;
+ info.largest_seqno = file_meta->fd.largest_seqno;
info.table_properties = prop;
+ info.flush_reason = cfd->GetFlushReason();
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushCompleted(this, info);
}
mutex_.Lock();
// no need to signal bg_cv_ as it will be signaled at the end of the
// flush process.
+#else
+ (void)cfd;
+ (void)file_meta;
+ (void)mutable_cf_options;
+ (void)job_id;
+ (void)prop;
#endif // ROCKSDB_LITE
}
Status DBImpl::CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) {
- if (options.target_path_id >= immutable_db_options_.db_paths.size()) {
+ auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+ auto cfd = cfh->cfd();
+
+ if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
return Status::InvalidArgument("Invalid target path ID");
}
- auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
- auto cfd = cfh->cfd();
bool exclusive = options.exclusive_manual_compaction;
- Status s = FlushMemTable(cfd, FlushOptions());
- if (!s.ok()) {
- LogFlush(immutable_db_options_.info_log);
- return s;
+ bool flush_needed = true;
+ if (begin != nullptr && end != nullptr) {
+ // TODO(ajkr): We could also optimize away the flush in certain cases where
+ // one/both sides of the interval are unbounded. But it requires more
+ // changes to RangesOverlapWithMemtables.
+ Range range(*begin, *end);
+ SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
+ cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
+ CleanupSuperVersion(super_version);
+ }
+
+ Status s;
+ if (flush_needed) {
+ FlushOptions fo;
+ fo.allow_write_stall = options.allow_write_stall;
+ s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
+ false /* writes_stopped*/);
+ if (!s.ok()) {
+ LogFlush(immutable_db_options_.info_log);
+ return s;
+ }
}
int max_level_with_files = 0;
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
cfd->NumberLevels() > 1) {
// Always compact all files together.
- s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
- cfd->NumberLevels() - 1, options.target_path_id,
- begin, end, exclusive);
final_output_level = cfd->NumberLevels() - 1;
+ // if bottom most level is reserved
+ if (immutable_db_options_.allow_ingest_behind) {
+ final_output_level--;
+ }
+ s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
+ final_output_level, options.target_path_id,
+ options.max_subcompactions, begin, end, exclusive);
} else {
for (int level = 0; level <= max_level_with_files; level++) {
int output_level;
}
}
s = RunManualCompaction(cfd, level, output_level, options.target_path_id,
- begin, end, exclusive);
+ options.max_subcompactions, begin, end, exclusive);
if (!s.ok()) {
break;
}
return s;
}
-Status DBImpl::CompactFiles(
- const CompactionOptions& compact_options,
- ColumnFamilyHandle* column_family,
- const std::vector<std::string>& input_file_names,
- const int output_level, const int output_path_id) {
+Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
+ ColumnFamilyHandle* column_family,
+ const std::vector<std::string>& input_file_names,
+ const int output_level, const int output_path_id,
+ std::vector<std::string>* const output_file_names) {
#ifdef ROCKSDB_LITE
- // not supported in lite version
+ (void)compact_options;
+ (void)column_family;
+ (void)input_file_names;
+ (void)output_level;
+ (void)output_path_id;
+ (void)output_file_names;
+ // not supported in lite version
return Status::NotSupported("Not supported in ROCKSDB LITE");
#else
if (column_family == nullptr) {
// IngestExternalFile() calls to finish.
WaitForIngestFile();
- s = CompactFilesImpl(compact_options, cfd, sv->current,
- input_file_names, output_level,
- output_path_id, &job_context, &log_buffer);
+ s = CompactFilesImpl(compact_options, cfd, sv->current, input_file_names,
+ output_file_names, output_level, output_path_id,
+ &job_context, &log_buffer);
}
if (sv->Unref()) {
mutex_.Lock();
} // release the mutex
// delete unnecessary files if any, this is done outside the mutex
- if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
+ if (job_context.HaveSomethingToClean() ||
+ job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
// Have to flush the info logs before bg_compaction_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the deconstructor of DB can kick in and destroy all the
Status DBImpl::CompactFilesImpl(
const CompactionOptions& compact_options, ColumnFamilyData* cfd,
Version* version, const std::vector<std::string>& input_file_names,
- const int output_level, int output_path_id, JobContext* job_context,
- LogBuffer* log_buffer) {
+ std::vector<std::string>* const output_file_names, const int output_level,
+ int output_path_id, JobContext* job_context, LogBuffer* log_buffer) {
mutex_.AssertHeld();
if (shutting_down_.load(std::memory_order_acquire)) {
version->GetColumnFamilyMetaData(&cf_meta);
if (output_path_id < 0) {
- if (immutable_db_options_.db_paths.size() == 1U) {
+ if (cfd->ioptions()->cf_paths.size() == 1U) {
output_path_id = 0;
} else {
return Status::NotSupported(
"files are already being compacted");
}
}
+ bool sfm_reserved_compact_space = false;
+ // First check if we have enough room to do the compaction
+ bool enough_room = EnoughRoomForCompaction(
+ cfd, input_files, &sfm_reserved_compact_space, log_buffer);
+
+ if (!enough_room) {
+ // m's vars will get set properly at the end of this function,
+ // as long as status == CompactionTooLarge
+ return Status::CompactionTooLarge();
+ }
// At this point, CompactFiles will be run.
bg_compaction_scheduled_++;
c.reset(cfd->compaction_picker()->CompactFiles(
compact_options, input_files, output_level, version->storage_info(),
*cfd->GetLatestMutableCFOptions(), output_path_id));
- if (!c) {
- return Status::Aborted("Another Level 0 compaction is running");
- }
+ // we already sanitized the set of input files and checked for conflicts
+ // without releasing the lock, so we're guaranteed a compaction can be formed.
+ assert(c != nullptr);
+
c->SetInputVersion(version);
// deletion compaction currently not allowed in CompactFiles.
assert(!c->deletion_compaction());
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
+ auto snapshot_checker = snapshot_checker_.get();
+ if (use_custom_gc_ && snapshot_checker == nullptr) {
+ snapshot_checker = DisableGCSnapshotChecker::Instance();
+ }
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job(
- job_context->job_id, c.get(), immutable_db_options_, env_options_,
- versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
- directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_,
- snapshot_seqs, earliest_write_conflict_snapshot, table_cache_,
- &event_logger_, c->mutable_cf_options()->paranoid_file_checks,
+ job_context->job_id, c.get(), immutable_db_options_,
+ env_options_for_compaction_, versions_.get(), &shutting_down_,
+ preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
+ GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_,
+ &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
+ snapshot_checker, table_cache_, &event_logger_,
+ c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
nullptr); // Here we pass a nullptr for CompactionJobStats because
// CompactFiles does not trigger OnCompactionCompleted(),
Status status = compaction_job.Install(*c->mutable_cf_options());
if (status.ok()) {
- InstallSuperVersionAndScheduleWorkWrapper(
- c->column_family_data(), job_context, *c->mutable_cf_options());
+ InstallSuperVersionAndScheduleWork(
+ c->column_family_data(), &job_context->superversion_contexts[0],
+ *c->mutable_cf_options(), FlushReason::kManualCompaction);
}
c->ReleaseCompactionFiles(s);
+#ifndef ROCKSDB_LITE
+ // Need to make sure SstFileManager does its bookkeeping
+ auto sfm = static_cast<SstFileManagerImpl*>(
+ immutable_db_options_.sst_file_manager.get());
+ if (sfm && sfm_reserved_compact_space) {
+ sfm->OnCompactionCompletion(c.get());
+ }
+#endif // ROCKSDB_LITE
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
"[%s] [JOB %d] Compaction error: %s",
c->column_family_data()->GetName().c_str(),
job_context->job_id, status.ToString().c_str());
- if (immutable_db_options_.paranoid_checks && bg_error_.ok()) {
- bg_error_ = status;
+ error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
+ }
+
+ if (output_file_names != nullptr) {
+ for (const auto newf : c->edit()->GetNewFiles()) {
+ (*output_file_names)
+ .push_back(TableFileName(c->immutable_cf_options()->cf_paths,
+ newf.second.fd.GetNumber(),
+ newf.second.fd.GetPathId()));
}
}
if (bg_compaction_scheduled_ == 0) {
bg_cv_.SignalAll();
}
+ TEST_SYNC_POINT("CompactFilesImpl:End");
return status;
}
Status DBImpl::PauseBackgroundWork() {
InstrumentedMutexLock guard_lock(&mutex_);
bg_compaction_paused_++;
- while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_ > 0) {
+ while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
+ bg_flush_scheduled_ > 0) {
bg_cv_.Wait();
}
bg_work_paused_++;
}
void DBImpl::NotifyOnCompactionCompleted(
- ColumnFamilyData* cfd, Compaction *c, const Status &st,
- const CompactionJobStats& compaction_job_stats,
- const int job_id) {
+ ColumnFamilyData* cfd, Compaction* c, const Status& st,
+ const CompactionJobStats& compaction_job_stats, const int job_id) {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.listeners.size() == 0U) {
return;
if (shutting_down_.load(std::memory_order_acquire)) {
return;
}
+ Version* current = cfd->current();
+ current->Ref();
// release lock while notifying events
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
info.compression = c->output_compression();
for (size_t i = 0; i < c->num_input_levels(); ++i) {
for (const auto fmd : *c->inputs(i)) {
- auto fn = TableFileName(immutable_db_options_.db_paths,
+ auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
fmd->fd.GetNumber(), fmd->fd.GetPathId());
info.input_files.push_back(fn);
if (info.table_properties.count(fn) == 0) {
std::shared_ptr<const TableProperties> tp;
- auto s = cfd->current()->GetTableProperties(&tp, fmd, &fn);
+ auto s = current->GetTableProperties(&tp, fmd, &fn);
if (s.ok()) {
info.table_properties[fn] = tp;
}
}
}
for (const auto newf : c->edit()->GetNewFiles()) {
- info.output_files.push_back(TableFileName(immutable_db_options_.db_paths,
- newf.second.fd.GetNumber(),
- newf.second.fd.GetPathId()));
+ info.output_files.push_back(TableFileName(
+ c->immutable_cf_options()->cf_paths, newf.second.fd.GetNumber(),
+ newf.second.fd.GetPathId()));
}
for (auto listener : immutable_db_options_.listeners) {
listener->OnCompactionCompleted(this, info);
}
}
mutex_.Lock();
+ current->Unref();
// no need to signal bg_cv_ as it will be signaled at the end of the
// flush process.
+#else
+ (void)cfd;
+ (void)c;
+ (void)st;
+ (void)compaction_job_stats;
+ (void)job_id;
#endif // ROCKSDB_LITE
}
return Status::InvalidArgument("Target level exceeds number of levels");
}
- std::unique_ptr<SuperVersion> superversion_to_free;
- std::unique_ptr<SuperVersion> new_superversion(new SuperVersion());
+ SuperVersionContext sv_context(/* create_superversion */ true);
Status status;
edit.DeleteFile(level, f->fd.GetNumber());
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
- f->smallest_seqno, f->largest_seqno,
+ f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction);
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
directories_.GetDbDir());
- superversion_to_free.reset(InstallSuperVersionAndScheduleWork(
- cfd, new_superversion.release(), mutable_cf_options));
+ InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
cfd->GetName().c_str(), status.ToString().data());
}
}
+ sv_context.Clean();
refitting_level_ = false;
return status;
return cfh->cfd()->NumberLevels();
}
-int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* column_family) {
+int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) {
return 0;
}
int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
InstrumentedMutexLock l(&mutex_);
- return cfh->cfd()->GetSuperVersion()->
- mutable_cf_options.level0_stop_writes_trigger;
+ return cfh->cfd()
+ ->GetSuperVersion()
+ ->mutable_cf_options.level0_stop_writes_trigger;
}
Status DBImpl::Flush(const FlushOptions& flush_options,
ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
- return FlushMemTable(cfh->cfd(), flush_options);
+ ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
+ cfh->GetName().c_str());
+ Status s =
+ FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
+ ROCKS_LOG_INFO(immutable_db_options_.info_log,
+ "[%s] Manual flush finished, status: %s\n",
+ cfh->GetName().c_str(), s.ToString().c_str());
+ return s;
+}
+
+
+Status DBImpl::FlushAllCFs(FlushReason flush_reason) {
+ Status s;
+ WriteContext context;
+ WriteThread::Writer w;
+
+ mutex_.AssertHeld();
+ write_thread_.EnterUnbatched(&w, &mutex_);
+
+ FlushRequest flush_req;
+ for (auto cfd : *versions_->GetColumnFamilySet()) {
+ if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() &&
+ cached_recoverable_state_empty_.load()) {
+ // Nothing to flush
+ continue;
+ }
+
+ // SwitchMemtable() will release and reacquire mutex during execution
+ s = SwitchMemtable(cfd, &context);
+ if (!s.ok()) {
+ break;
+ }
+
+ cfd->imm()->FlushRequested();
+
+ flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID());
+ }
+
+ // schedule flush
+ if (s.ok() && !flush_req.empty()) {
+ SchedulePendingFlush(flush_req, flush_reason);
+ MaybeScheduleFlushOrCompaction();
+ }
+
+ write_thread_.ExitUnbatched(&w);
+
+ if (s.ok()) {
+ for (auto& flush : flush_req) {
+ auto cfd = flush.first;
+ auto flush_memtable_id = flush.second;
+ while (cfd->imm()->NumNotFlushed() > 0 &&
+ cfd->imm()->GetEarliestMemTableID() <= flush_memtable_id) {
+ if (!error_handler_.GetRecoveryError().ok()) {
+ break;
+ }
+ if (cfd->IsDropped()) {
+ // FlushJob cannot flush a dropped CF, if we did not break here
+ // we will loop forever since cfd->imm()->NumNotFlushed() will never
+ // drop to zero
+ continue;
+ }
+ cfd->Ref();
+ bg_cv_.Wait();
+ cfd->Unref();
+ }
+ }
+ }
+
+ flush_req.clear();
+ return s;
}
Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
int output_level, uint32_t output_path_id,
+ uint32_t max_subcompactions,
const Slice* begin, const Slice* end,
bool exclusive, bool disallow_trivial_move) {
assert(input_level == ColumnFamilyData::kCompactAllLevels ||
bool scheduled = false;
bool manual_conflict = false;
- ManualCompaction manual;
+ ManualCompactionState manual;
manual.cfd = cfd;
manual.input_level = input_level;
manual.output_level = output_level;
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
manual.begin = nullptr;
} else {
- begin_storage.SetMaxPossibleForUserKey(*begin);
+ begin_storage.SetMinPossibleForUserKey(*begin);
manual.begin = &begin_storage;
}
if (end == nullptr ||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
manual.end = nullptr;
} else {
- end_storage.SetMinPossibleForUserKey(*end);
+ end_storage.SetMaxPossibleForUserKey(*end);
manual.end = &end_storage;
}
AddManualCompaction(&manual);
TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
if (exclusive) {
- while (bg_compaction_scheduled_ > 0) {
+ while (bg_bottom_compaction_scheduled_ > 0 ||
+ bg_compaction_scheduled_ > 0) {
+ TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[%s] Manual compaction waiting for all other scheduled background "
while (!manual.done) {
assert(HasPendingManualCompaction());
manual_conflict = false;
+ Compaction* compaction = nullptr;
if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
scheduled ||
- ((manual.manual_end = &manual.tmp_storage1)&&(
- (manual.compaction = manual.cfd->CompactRange(
- *manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
- manual.output_level, manual.output_path_id, manual.begin,
- manual.end, &manual.manual_end, &manual_conflict)) ==
- nullptr) &&
- manual_conflict)) {
+ (((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
+ ((compaction = manual.cfd->CompactRange(
+ *manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
+ manual.output_level, manual.output_path_id, max_subcompactions,
+ manual.begin, manual.end, &manual.manual_end,
+ &manual_conflict)) == nullptr &&
+ manual_conflict))) {
// exclusive manual compactions should not see a conflict during
// CompactRange
assert(!exclusive || !manual_conflict);
manual.incomplete = false;
}
} else if (!scheduled) {
- if (manual.compaction == nullptr) {
+ if (compaction == nullptr) {
manual.done = true;
bg_cv_.SignalAll();
continue;
}
ca = new CompactionArg;
ca->db = this;
- ca->m = &manual;
+ ca->prepicked_compaction = new PrepickedCompaction;
+ ca->prepicked_compaction->manual_compaction_state = &manual;
+ ca->prepicked_compaction->compaction = compaction;
manual.incomplete = false;
bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_options,
- bool writes_stopped) {
+ FlushReason flush_reason, bool writes_stopped) {
Status s;
+ uint64_t flush_memtable_id = 0;
+ if (!flush_options.allow_write_stall) {
+ bool flush_needed = true;
+ s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
+ TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
+ if (!s.ok() || !flush_needed) {
+ return s;
+ }
+ }
+ FlushRequest flush_req;
{
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);
- if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty()) {
- // Nothing to flush
- return Status::OK();
- }
-
WriteThread::Writer w;
if (!writes_stopped) {
write_thread_.EnterUnbatched(&w, &mutex_);
}
- // SwitchMemtable() will release and reacquire mutex
- // during execution
- s = SwitchMemtable(cfd, &context);
+ if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
+ !cached_recoverable_state_empty_.load()) {
+ s = SwitchMemtable(cfd, &context);
+ flush_memtable_id = cfd->imm()->GetLatestMemTableID();
+ flush_req.emplace_back(cfd, flush_memtable_id);
+ }
+
+ if (s.ok() && !flush_req.empty()) {
+ for (auto& elem : flush_req) {
+ ColumnFamilyData* loop_cfd = elem.first;
+ loop_cfd->imm()->FlushRequested();
+ }
+ SchedulePendingFlush(flush_req, flush_reason);
+ MaybeScheduleFlushOrCompaction();
+ }
if (!writes_stopped) {
write_thread_.ExitUnbatched(&w);
}
-
- cfd->imm()->FlushRequested();
-
- // schedule flush
- SchedulePendingFlush(cfd);
- MaybeScheduleFlushOrCompaction();
}
if (s.ok() && flush_options.wait) {
- // Wait until the compaction completes
- s = WaitForFlushMemTable(cfd);
+ autovector<ColumnFamilyData*> cfds;
+ autovector<const uint64_t*> flush_memtable_ids;
+ for (auto& iter : flush_req) {
+ cfds.push_back(iter.first);
+ flush_memtable_ids.push_back(&(iter.second));
+ }
+ s = WaitForFlushMemTables(cfds, flush_memtable_ids);
}
+ TEST_SYNC_POINT("FlushMemTableFinished");
return s;
}
-Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
- Status s;
+// Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
+// cause write stall, for example if one memtable is being flushed already.
+// This method tries to avoid write stall (similar to CompactRange() behavior)
+// it emulates how the SuperVersion / LSM would change if flush happens, checks
+// it against various constrains and delays flush if it'd cause write stall.
+// Called should check status and flush_needed to see if flush already happened.
+Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
+ bool* flush_needed) {
+ {
+ *flush_needed = true;
+ InstrumentedMutexLock l(&mutex_);
+ uint64_t orig_active_memtable_id = cfd->mem()->GetID();
+ WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
+ do {
+ if (write_stall_condition != WriteStallCondition::kNormal) {
+ TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
+ ROCKS_LOG_INFO(immutable_db_options_.info_log,
+ "[%s] WaitUntilFlushWouldNotStallWrites"
+ " waiting on stall conditions to clear",
+ cfd->GetName().c_str());
+ bg_cv_.Wait();
+ }
+ if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) {
+ return Status::ShutdownInProgress();
+ }
+
+ uint64_t earliest_memtable_id =
+ std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
+ if (earliest_memtable_id > orig_active_memtable_id) {
+ // We waited so long that the memtable we were originally waiting on was
+ // flushed.
+ *flush_needed = false;
+ return Status::OK();
+ }
+
+ const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
+ const auto* vstorage = cfd->current()->storage_info();
+
+ // Skip stalling check if we're below auto-flush and auto-compaction
+ // triggers. If it stalled in these conditions, that'd mean the stall
+ // triggers are so low that stalling is needed for any background work. In
+ // that case we shouldn't wait since background work won't be scheduled.
+ if (cfd->imm()->NumNotFlushed() <
+ cfd->ioptions()->min_write_buffer_number_to_merge &&
+ vstorage->l0_delay_trigger_count() <
+ mutable_cf_options.level0_file_num_compaction_trigger) {
+ break;
+ }
+
+ // check whether one extra immutable memtable or an extra L0 file would
+ // cause write stalling mode to be entered. It could still enter stall
+ // mode due to pending compaction bytes, but that's less common
+ write_stall_condition =
+ ColumnFamilyData::GetWriteStallConditionAndCause(
+ cfd->imm()->NumNotFlushed() + 1,
+ vstorage->l0_delay_trigger_count() + 1,
+ vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
+ .first;
+ } while (write_stall_condition != WriteStallCondition::kNormal);
+ }
+ return Status::OK();
+}
+
+// Wait for memtables to be flushed for multiple column families.
+// let N = cfds.size()
+// for i in [0, N),
+// 1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
+// have to be flushed for THIS column family;
+// 2) if flush_memtable_ids[i] is null, then all memtables in THIS column
+// family have to be flushed.
+// Finish waiting when ALL column families finish flushing memtables.
+Status DBImpl::WaitForFlushMemTables(
+ const autovector<ColumnFamilyData*>& cfds,
+ const autovector<const uint64_t*>& flush_memtable_ids) {
+ int num = static_cast<int>(cfds.size());
// Wait until the compaction completes
InstrumentedMutexLock l(&mutex_);
- while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok()) {
+ while (!error_handler_.IsDBStopped()) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
- if (cfd->IsDropped()) {
- // FlushJob cannot flush a dropped CF, if we did not break here
- // we will loop forever since cfd->imm()->NumNotFlushed() will never
- // drop to zero
+ // Number of column families that have been dropped.
+ int num_dropped = 0;
+ // Number of column families that have finished flush.
+ int num_finished = 0;
+ for (int i = 0; i < num; ++i) {
+ if (cfds[i]->IsDropped()) {
+ ++num_dropped;
+ } else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
+ (flush_memtable_ids[i] != nullptr &&
+ cfds[i]->imm()->GetEarliestMemTableID() >
+ *flush_memtable_ids[i])) {
+ ++num_finished;
+ }
+ }
+ if (1 == num_dropped && 1 == num) {
return Status::InvalidArgument("Cannot flush a dropped CF");
}
+ // Column families involved in this flush request have either been dropped
+ // or finished flush. Then it's time to finish waiting.
+ if (num_dropped + num_finished == num) {
+ break;
+ }
bg_cv_.Wait();
}
- if (!bg_error_.ok()) {
- s = bg_error_;
+ Status s;
+ if (error_handler_.IsDBStopped()) {
+ s = error_handler_.GetBGError();
}
return s;
}
if (bg_work_paused_ > 0) {
// we paused the background work
return;
+ } else if (error_handler_.IsBGWorkStopped() &&
+ !error_handler_.IsRecoveryInProgress()) {
+ // There has been a hard error and this call is not part of the recovery
+ // sequence. Bail out here so we don't get into an endless loop of
+ // scheduling BG work which will again call this function
+ return;
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
return;
}
-
- while (unscheduled_flushes_ > 0 &&
- bg_flush_scheduled_ < immutable_db_options_.max_background_flushes) {
- unscheduled_flushes_--;
+ auto bg_job_limits = GetBGJobLimits();
+ bool is_flush_pool_empty =
+ env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
+ while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
+ bg_flush_scheduled_ < bg_job_limits.max_flushes) {
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
}
- auto bg_compactions_allowed = BGCompactionsAllowed();
-
- // special case -- if max_background_flushes == 0, then schedule flush on a
- // compaction thread
- if (immutable_db_options_.max_background_flushes == 0) {
+ // special case -- if high-pri (flush) thread pool is empty, then schedule
+ // flushes in low-pri (compaction) thread pool.
+ if (is_flush_pool_empty) {
while (unscheduled_flushes_ > 0 &&
bg_flush_scheduled_ + bg_compaction_scheduled_ <
- bg_compactions_allowed) {
- unscheduled_flushes_--;
+ bg_job_limits.max_flushes) {
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this);
}
if (bg_compaction_paused_ > 0) {
// we paused the background compaction
return;
+ } else if (error_handler_.IsBGWorkStopped()) {
+ // Compaction is not part of the recovery sequence from a hard error. We
+ // might get here because recovery might do a flush and install a new
+ // super version, which will try to schedule pending compactions. Bail
+ // out here and let the higher level recovery handle compactions
+ return;
}
if (HasExclusiveManualCompaction()) {
// only manual compactions are allowed to run. don't schedule automatic
// compactions
+ TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
return;
}
- while (bg_compaction_scheduled_ < bg_compactions_allowed &&
+ while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
unscheduled_compactions_ > 0) {
CompactionArg* ca = new CompactionArg;
ca->db = this;
- ca->m = nullptr;
+ ca->prepicked_compaction = nullptr;
bg_compaction_scheduled_++;
unscheduled_compactions_--;
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
}
}
-int DBImpl::BGCompactionsAllowed() const {
+DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
mutex_.AssertHeld();
- if (write_controller_.NeedSpeedupCompaction()) {
- return mutable_db_options_.max_background_compactions;
+ return GetBGJobLimits(immutable_db_options_.max_background_flushes,
+ mutable_db_options_.max_background_compactions,
+ mutable_db_options_.max_background_jobs,
+ write_controller_.NeedSpeedupCompaction());
+}
+
+DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
+ int max_background_compactions,
+ int max_background_jobs,
+ bool parallelize_compactions) {
+ BGJobLimits res;
+ if (max_background_flushes == -1 && max_background_compactions == -1) {
+ // for our first stab implementing max_background_jobs, simply allocate a
+ // quarter of the threads to flushes.
+ res.max_flushes = std::max(1, max_background_jobs / 4);
+ res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
} else {
- return mutable_db_options_.base_background_compactions;
+ // compatibility code in case users haven't migrated to max_background_jobs,
+ // which automatically computes flush/compaction limits
+ res.max_flushes = std::max(1, max_background_flushes);
+ res.max_compactions = std::max(1, max_background_compactions);
}
+ if (!parallelize_compactions) {
+ // throttle background compactions until we deem necessary
+ res.max_compactions = 1;
+ }
+ return res;
}
void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
- assert(!cfd->pending_compaction());
+ assert(!cfd->queued_for_compaction());
cfd->Ref();
compaction_queue_.push_back(cfd);
- cfd->set_pending_compaction(true);
+ cfd->set_queued_for_compaction(true);
}
ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
assert(!compaction_queue_.empty());
auto cfd = *compaction_queue_.begin();
compaction_queue_.pop_front();
- assert(cfd->pending_compaction());
- cfd->set_pending_compaction(false);
+ assert(cfd->queued_for_compaction());
+ cfd->set_queued_for_compaction(false);
return cfd;
}
-void DBImpl::AddToFlushQueue(ColumnFamilyData* cfd) {
- assert(!cfd->pending_flush());
- cfd->Ref();
- flush_queue_.push_back(cfd);
- cfd->set_pending_flush(true);
-}
-
-ColumnFamilyData* DBImpl::PopFirstFromFlushQueue() {
+DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
assert(!flush_queue_.empty());
- auto cfd = *flush_queue_.begin();
+ FlushRequest flush_req = flush_queue_.front();
+ assert(unscheduled_flushes_ >= static_cast<int>(flush_req.size()));
+ unscheduled_flushes_ -= static_cast<int>(flush_req.size());
flush_queue_.pop_front();
- assert(cfd->pending_flush());
- cfd->set_pending_flush(false);
- return cfd;
+ // TODO: need to unset flush reason?
+ return flush_req;
}
-void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd) {
- if (!cfd->pending_flush() && cfd->imm()->IsFlushPending()) {
- AddToFlushQueue(cfd);
- ++unscheduled_flushes_;
+void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
+ FlushReason flush_reason) {
+ if (flush_req.empty()) {
+ return;
+ }
+ for (auto& iter : flush_req) {
+ ColumnFamilyData* cfd = iter.first;
+ cfd->Ref();
+ cfd->SetFlushReason(flush_reason);
}
+ unscheduled_flushes_ += static_cast<int>(flush_req.size());
+ flush_queue_.push_back(flush_req);
}
void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
- if (!cfd->pending_compaction() && cfd->NeedsCompaction()) {
+ if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
}
}
-void DBImpl::SchedulePendingPurge(std::string fname, FileType type,
- uint64_t number, uint32_t path_id,
- int job_id) {
+void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
+ FileType type, uint64_t number, int job_id) {
mutex_.AssertHeld();
- PurgeFileInfo file_info(fname, type, number, path_id, job_id);
+ PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
purge_queue_.push_back(std::move(file_info));
}
delete reinterpret_cast<CompactionArg*>(arg);
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
- reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(ca.m);
+ auto prepicked_compaction =
+ static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
+ reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(
+ prepicked_compaction, Env::Priority::LOW);
+ delete prepicked_compaction;
+}
+
+void DBImpl::BGWorkBottomCompaction(void* arg) {
+ CompactionArg ca = *(static_cast<CompactionArg*>(arg));
+ delete static_cast<CompactionArg*>(arg);
+ IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
+ TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
+ auto* prepicked_compaction = ca.prepicked_compaction;
+ assert(prepicked_compaction && prepicked_compaction->compaction &&
+ !prepicked_compaction->manual_compaction_state);
+ ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
+ delete prepicked_compaction;
}
void DBImpl::BGWorkPurge(void* db) {
void DBImpl::UnscheduleCallback(void* arg) {
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
delete reinterpret_cast<CompactionArg*>(arg);
- if ((ca.m != nullptr) && (ca.m->compaction != nullptr)) {
- delete ca.m->compaction;
+ if (ca.prepicked_compaction != nullptr) {
+ if (ca.prepicked_compaction->compaction != nullptr) {
+ delete ca.prepicked_compaction->compaction;
+ }
+ delete ca.prepicked_compaction;
}
TEST_SYNC_POINT("DBImpl::UnscheduleCallback");
}
Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
- LogBuffer* log_buffer) {
+ LogBuffer* log_buffer, FlushReason* reason) {
mutex_.AssertHeld();
- Status status = bg_error_;
- if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
- status = Status::ShutdownInProgress();
+ Status status;
+ *reason = FlushReason::kOthers;
+ // If BG work is stopped due to an error, but a recovery is in progress,
+ // that means this flush is part of the recovery. So allow it to go through
+ if (!error_handler_.IsBGWorkStopped()) {
+ if (shutting_down_.load(std::memory_order_acquire)) {
+ status = Status::ShutdownInProgress();
+ }
+ } else if (!error_handler_.IsRecoveryInProgress()) {
+ status = error_handler_.GetBGError();
}
if (!status.ok()) {
return status;
}
- ColumnFamilyData* cfd = nullptr;
+ autovector<BGFlushArg> bg_flush_args;
+ std::vector<SuperVersionContext>& superversion_contexts =
+ job_context->superversion_contexts;
while (!flush_queue_.empty()) {
// This cfd is already referenced
- auto first_cfd = PopFirstFromFlushQueue();
-
- if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) {
- // can't flush this CF, try next one
- if (first_cfd->Unref()) {
- delete first_cfd;
+ const FlushRequest& flush_req = PopFirstFromFlushQueue();
+ superversion_contexts.clear();
+ superversion_contexts.reserve(flush_req.size());
+
+ for (const auto& iter : flush_req) {
+ ColumnFamilyData* cfd = iter.first;
+ if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
+ // can't flush this CF, try next one
+ if (cfd->Unref()) {
+ delete cfd;
+ }
+ continue;
}
- continue;
+ superversion_contexts.emplace_back(SuperVersionContext(true));
+ bg_flush_args.emplace_back(cfd, iter.second,
+ &(superversion_contexts.back()));
+ }
+ if (!bg_flush_args.empty()) {
+ break;
}
-
- // found a flush!
- cfd = first_cfd;
- break;
}
- if (cfd != nullptr) {
- const MutableCFOptions mutable_cf_options =
- *cfd->GetLatestMutableCFOptions();
- ROCKS_LOG_BUFFER(
- log_buffer,
- "Calling FlushMemTableToOutputFile with column "
- "family [%s], flush slots available %d, compaction slots allowed %d, "
- "compaction slots scheduled %d",
- cfd->GetName().c_str(), immutable_db_options_.max_background_flushes,
- bg_flush_scheduled_, BGCompactionsAllowed() - bg_compaction_scheduled_);
- status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
- job_context, log_buffer);
- if (cfd->Unref()) {
- delete cfd;
+ if (!bg_flush_args.empty()) {
+ auto bg_job_limits = GetBGJobLimits();
+ for (const auto& arg : bg_flush_args) {
+ ColumnFamilyData* cfd = arg.cfd_;
+ ROCKS_LOG_BUFFER(
+ log_buffer,
+ "Calling FlushMemTableToOutputFile with column "
+ "family [%s], flush slots available %d, compaction slots available "
+ "%d, "
+ "flush slots scheduled %d, compaction slots scheduled %d",
+ cfd->GetName().c_str(), bg_job_limits.max_flushes,
+ bg_job_limits.max_compactions, bg_flush_scheduled_,
+ bg_compaction_scheduled_);
+ }
+ status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
+ job_context, log_buffer);
+ // All the CFDs in the FlushReq must have the same flush reason, so just
+ // grab the first one
+ *reason = bg_flush_args[0].cfd_->GetFlushReason();
+ for (auto& arg : bg_flush_args) {
+ ColumnFamilyData* cfd = arg.cfd_;
+ if (cfd->Unref()) {
+ delete cfd;
+ arg.cfd_ = nullptr;
+ }
}
}
return status;
void DBImpl::BackgroundCallFlush() {
bool made_progress = false;
JobContext job_context(next_job_id_.fetch_add(1), true);
- assert(bg_flush_scheduled_);
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
immutable_db_options_.info_log.get());
{
InstrumentedMutexLock l(&mutex_);
+ assert(bg_flush_scheduled_);
num_running_flushes_++;
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
+ FlushReason reason;
- Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer);
- if (!s.ok() && !s.IsShutdownInProgress()) {
+ Status s =
+ BackgroundFlush(&made_progress, &job_context, &log_buffer, &reason);
+ if (!s.ok() && !s.IsShutdownInProgress() &&
+ reason != FlushReason::kErrorRecovery) {
// Wait a little bit before retrying background flush in
// case this is an environmental problem and we do not want to
// chew up resources for failed flushes for the duration of
// the problem.
uint64_t error_cnt =
- default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
+ default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
// created. Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
// delete unnecessary files if any, this is done outside the mutex
- if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
+ if (job_context.HaveSomethingToClean() ||
+ job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock();
+ TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
// Have to flush the info logs before bg_flush_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the deconstructor of DB can kick in and destroy all the
}
}
-void DBImpl::BackgroundCallCompaction(void* arg) {
+void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
+ Env::Priority bg_thread_pri) {
bool made_progress = false;
- ManualCompaction* m = reinterpret_cast<ManualCompaction*>(arg);
JobContext job_context(next_job_id_.fetch_add(1), true);
TEST_SYNC_POINT("BackgroundCallCompaction:0");
MaybeDumpStats();
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
- assert(bg_compaction_scheduled_);
- Status s =
- BackgroundCompaction(&made_progress, &job_context, &log_buffer, m);
+ assert((bg_thread_pri == Env::Priority::BOTTOM &&
+ bg_bottom_compaction_scheduled_) ||
+ (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
+ Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
+ prepicked_compaction);
TEST_SYNC_POINT("BackgroundCallCompaction:1");
if (!s.ok() && !s.IsShutdownInProgress()) {
// Wait a little bit before retrying background compaction in
// have created (they might not be all recorded in job_context in case of a
// failure). Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
+ TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
// delete unnecessary files if any, this is done outside the mutex
- if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
+ if (job_context.HaveSomethingToClean() ||
+ job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock();
// Have to flush the info logs before bg_compaction_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
log_buffer.FlushBufferToLog();
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
+ TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
}
job_context.Clean();
mutex_.Lock();
assert(num_running_compactions_ > 0);
num_running_compactions_--;
- bg_compaction_scheduled_--;
+ if (bg_thread_pri == Env::Priority::LOW) {
+ bg_compaction_scheduled_--;
+ } else {
+ assert(bg_thread_pri == Env::Priority::BOTTOM);
+ bg_bottom_compaction_scheduled_--;
+ }
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();
- if (made_progress || bg_compaction_scheduled_ == 0 ||
- HasPendingManualCompaction()) {
+ if (made_progress ||
+ (bg_compaction_scheduled_ == 0 &&
+ bg_bottom_compaction_scheduled_ == 0) ||
+ HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
// signal if
// * made_progress -- need to wakeup DelayWrite
- // * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
+ // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
// * HasPendingManualCompaction -- need to wakeup RunManualCompaction
// If none of this is true, there is no need to signal since nobody is
// waiting for it
Status DBImpl::BackgroundCompaction(bool* made_progress,
JobContext* job_context,
- LogBuffer* log_buffer, void* arg) {
- ManualCompaction* manual_compaction =
- reinterpret_cast<ManualCompaction*>(arg);
+ LogBuffer* log_buffer,
+ PrepickedCompaction* prepicked_compaction) {
+ ManualCompactionState* manual_compaction =
+ prepicked_compaction == nullptr
+ ? nullptr
+ : prepicked_compaction->manual_compaction_state;
*made_progress = false;
mutex_.AssertHeld();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
bool is_manual = (manual_compaction != nullptr);
+ unique_ptr<Compaction> c;
+ if (prepicked_compaction != nullptr &&
+ prepicked_compaction->compaction != nullptr) {
+ c.reset(prepicked_compaction->compaction);
+ }
+ bool is_prepicked = is_manual || c;
// (manual_compaction->in_progress == false);
bool trivial_move_disallowed =
is_manual && manual_compaction->disallow_trivial_move;
CompactionJobStats compaction_job_stats;
- Status status = bg_error_;
- if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
- status = Status::ShutdownInProgress();
+ Status status;
+ if (!error_handler_.IsBGWorkStopped()) {
+ if (shutting_down_.load(std::memory_order_acquire)) {
+ status = Status::ShutdownInProgress();
+ }
+ } else {
+ status = error_handler_.GetBGError();
+ // If we get here, it means a hard error happened after this compaction
+ // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got
+ // a chance to execute. Since we didn't pop a cfd from the compaction
+ // queue, increment unscheduled_compactions_
+ unscheduled_compactions_++;
}
if (!status.ok()) {
manual_compaction->status = status;
manual_compaction->done = true;
manual_compaction->in_progress = false;
- delete manual_compaction->compaction;
manual_compaction = nullptr;
}
return status;
manual_compaction->in_progress = true;
}
- unique_ptr<Compaction> c;
// InternalKey manual_end_storage;
// InternalKey* manual_end = &manual_end_storage;
+ bool sfm_reserved_compact_space = false;
if (is_manual) {
- ManualCompaction* m = manual_compaction;
+ ManualCompactionState* m = manual_compaction;
assert(m->in_progress);
- c.reset(std::move(m->compaction));
if (!c) {
m->done = true;
m->manual_end = nullptr;
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"));
} else {
- ROCKS_LOG_BUFFER(
- log_buffer,
- "[%s] Manual compaction from level-%d to level-%d from %s .. "
- "%s; will stop at %s\n",
- m->cfd->GetName().c_str(), m->input_level, c->output_level(),
- (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
- (m->end ? m->end->DebugString().c_str() : "(end)"),
- ((m->done || m->manual_end == nullptr)
- ? "(end)"
- : m->manual_end->DebugString().c_str()));
- }
- } else if (!compaction_queue_.empty()) {
+ // First check if we have enough room to do the compaction
+ bool enough_room = EnoughRoomForCompaction(
+ m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
+
+ if (!enough_room) {
+ // Then don't do the compaction
+ c->ReleaseCompactionFiles(status);
+ c.reset();
+ // m's vars will get set properly at the end of this function,
+ // as long as status == CompactionTooLarge
+ status = Status::CompactionTooLarge();
+ } else {
+ ROCKS_LOG_BUFFER(
+ log_buffer,
+ "[%s] Manual compaction from level-%d to level-%d from %s .. "
+ "%s; will stop at %s\n",
+ m->cfd->GetName().c_str(), m->input_level, c->output_level(),
+ (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
+ (m->end ? m->end->DebugString().c_str() : "(end)"),
+ ((m->done || m->manual_end == nullptr)
+ ? "(end)"
+ : m->manual_end->DebugString().c_str()));
+ }
+ }
+ } else if (!is_prepicked && !compaction_queue_.empty()) {
+ if (HasExclusiveManualCompaction()) {
+ // Can't compact right now, but try again later
+ TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
+
+ // Stay in the compaction queue.
+ unscheduled_compactions_++;
+
+ return Status::OK();
+ }
+
// cfd is referenced here
auto cfd = PopFirstFromCompactionQueue();
// We unreference here because the following code will take a Ref() on
return Status::OK();
}
- if (HaveManualCompaction(cfd)) {
- // Can't compact right now, but try again later
- TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
- return Status::OK();
- }
-
// Pick up latest mutable CF Options and use it throughout the
// compaction job
// Compaction makes a copy of the latest MutableCFOptions. It should be used
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
+
if (c != nullptr) {
- // update statistics
- MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
- c->inputs(0)->size());
- // There are three things that can change compaction score:
- // 1) When flush or compaction finish. This case is covered by
- // InstallSuperVersionAndScheduleWork
- // 2) When MutableCFOptions changes. This case is also covered by
- // InstallSuperVersionAndScheduleWork, because this is when the new
- // options take effect.
- // 3) When we Pick a new compaction, we "remove" those files being
- // compacted from the calculation, which then influences compaction
- // score. Here we check if we need the new compaction even without the
- // files that are currently being compacted. If we need another
- // compaction, we might be able to execute it in parallel, so we add it
- // to the queue and schedule a new thread.
- if (cfd->NeedsCompaction()) {
- // Yes, we need more compactions!
+ bool enough_room = EnoughRoomForCompaction(
+ cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
+
+ if (!enough_room) {
+ // Then don't do the compaction
+ c->ReleaseCompactionFiles(status);
+ c->column_family_data()
+ ->current()
+ ->storage_info()
+ ->ComputeCompactionScore(*(c->immutable_cf_options()),
+ *(c->mutable_cf_options()));
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
- MaybeScheduleFlushOrCompaction();
+
+ c.reset();
+ // Don't need to sleep here, because BackgroundCallCompaction
+ // will sleep if !s.ok()
+ status = Status::CompactionTooLarge();
+ } else {
+ // update statistics
+ MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
+ c->inputs(0)->size());
+ // There are three things that can change compaction score:
+ // 1) When flush or compaction finish. This case is covered by
+ // InstallSuperVersionAndScheduleWork
+ // 2) When MutableCFOptions changes. This case is also covered by
+ // InstallSuperVersionAndScheduleWork, because this is when the new
+ // options take effect.
+ // 3) When we Pick a new compaction, we "remove" those files being
+ // compacted from the calculation, which then influences compaction
+ // score. Here we check if we need the new compaction even without the
+ // files that are currently being compacted. If we need another
+ // compaction, we might be able to execute it in parallel, so we add
+ // it to the queue and schedule a new thread.
+ if (cfd->NeedsCompaction()) {
+ // Yes, we need more compactions!
+ AddToCompactionQueue(cfd);
+ ++unscheduled_compactions_;
+ MaybeScheduleFlushOrCompaction();
+ }
}
}
}
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
- InstallSuperVersionAndScheduleWorkWrapper(
- c->column_family_data(), job_context, *c->mutable_cf_options());
+ InstallSuperVersionAndScheduleWork(
+ c->column_family_data(), &job_context->superversion_contexts[0],
+ *c->mutable_cf_options(), FlushReason::kAutoCompaction);
ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(),
c->num_input_files(0));
c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
- f->largest, f->smallest_seqno, f->largest_seqno,
- f->marked_for_compaction);
-
- ROCKS_LOG_BUFFER(log_buffer, "[%s] Moving #%" PRIu64
- " to level-%d %" PRIu64 " bytes\n",
- c->column_family_data()->GetName().c_str(),
- f->fd.GetNumber(), c->output_level(),
- f->fd.GetFileSize());
+ f->largest, f->fd.smallest_seqno,
+ f->fd.largest_seqno, f->marked_for_compaction);
+
+ ROCKS_LOG_BUFFER(
+ log_buffer,
+ "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
+ c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
+ c->output_level(), f->fd.GetFileSize());
++moved_files;
moved_bytes += f->fd.GetFileSize();
}
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
// Use latest MutableCFOptions
- InstallSuperVersionAndScheduleWorkWrapper(
- c->column_family_data(), job_context, *c->mutable_cf_options());
+ InstallSuperVersionAndScheduleWork(
+ c->column_family_data(), &job_context->superversion_contexts[0],
+ *c->mutable_cf_options(), FlushReason::kAutoCompaction);
VersionStorageInfo::LevelSummaryStorage tmp;
c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
// Clear Instrument
ThreadStatusUtil::ResetThreadStatus();
+ } else if (!is_prepicked && c->output_level() > 0 &&
+ c->output_level() ==
+ c->column_family_data()
+ ->current()
+ ->storage_info()
+ ->MaxOutputLevel(
+ immutable_db_options_.allow_ingest_behind) &&
+ env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
+ // Forward compactions involving last level to the bottom pool if it exists,
+ // such that compactions unlikely to contribute to write stalls can be
+ // delayed or deprioritized.
+ TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
+ CompactionArg* ca = new CompactionArg;
+ ca->db = this;
+ ca->prepicked_compaction = new PrepickedCompaction;
+ ca->prepicked_compaction->compaction = c.release();
+ ca->prepicked_compaction->manual_compaction_state = nullptr;
+ ++bg_bottom_compaction_scheduled_;
+ env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
+ this, &DBImpl::UnscheduleCallback);
} else {
- int output_level __attribute__((unused)) = c->output_level();
+ int output_level __attribute__((__unused__));
+ output_level = c->output_level();
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
&output_level);
-
SequenceNumber earliest_write_conflict_snapshot;
std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot);
+ auto snapshot_checker = snapshot_checker_.get();
+ if (use_custom_gc_ && snapshot_checker == nullptr) {
+ snapshot_checker = DisableGCSnapshotChecker::Instance();
+ }
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job(
- job_context->job_id, c.get(), immutable_db_options_, env_options_,
- versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
- directories_.GetDataDir(c->output_path_id()), stats_, &mutex_,
- &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
- table_cache_, &event_logger_,
+ job_context->job_id, c.get(), immutable_db_options_,
+ env_options_for_compaction_, versions_.get(), &shutting_down_,
+ preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
+ GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
+ &mutex_, &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
+ snapshot_checker, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats);
status = compaction_job.Install(*c->mutable_cf_options());
if (status.ok()) {
- InstallSuperVersionAndScheduleWorkWrapper(
- c->column_family_data(), job_context, *c->mutable_cf_options());
+ InstallSuperVersionAndScheduleWork(
+ c->column_family_data(), &job_context->superversion_contexts[0],
+ *c->mutable_cf_options(), FlushReason::kAutoCompaction);
}
*made_progress = true;
}
if (c != nullptr) {
c->ReleaseCompactionFiles(status);
*made_progress = true;
- NotifyOnCompactionCompleted(
- c->column_family_data(), c.get(), status,
- compaction_job_stats, job_context->job_id);
+
+#ifndef ROCKSDB_LITE
+ // Need to make sure SstFileManager does its bookkeeping
+ auto sfm = static_cast<SstFileManagerImpl*>(
+ immutable_db_options_.sst_file_manager.get());
+ if (sfm && sfm_reserved_compact_space) {
+ sfm->OnCompactionCompletion(c.get());
+ }
+#endif // ROCKSDB_LITE
+
+ NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
+ compaction_job_stats, job_context->job_id);
}
- // this will unref its input_version and column_family_data
- c.reset();
- if (status.ok()) {
+ if (status.ok() || status.IsCompactionTooLarge()) {
// Done
} else if (status.IsShutdownInProgress()) {
// Ignore compaction errors found during shutting down
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
status.ToString().c_str());
- if (immutable_db_options_.paranoid_checks && bg_error_.ok()) {
- bg_error_ = status;
+ error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
+ if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
+ // Put this cfd back in the compaction queue so we can retry after some
+ // time
+ auto cfd = c->column_family_data();
+ assert(cfd != nullptr);
+ // Since this compaction failed, we need to recompute the score so it
+ // takes the original input files into account
+ c->column_family_data()
+ ->current()
+ ->storage_info()
+ ->ComputeCompactionScore(*(c->immutable_cf_options()),
+ *(c->mutable_cf_options()));
+ if (!cfd->queued_for_compaction()) {
+ AddToCompactionQueue(cfd);
+ ++unscheduled_compactions_;
+ }
}
}
+ // this will unref its input_version and column_family_data
+ c.reset();
if (is_manual) {
- ManualCompaction* m = manual_compaction;
+ ManualCompactionState* m = manual_compaction;
if (!status.ok()) {
m->status = status;
m->done = true;
m->begin = &m->tmp_storage;
m->incomplete = true;
}
- m->in_progress = false; // not being processed anymore
+ m->in_progress = false; // not being processed anymore
}
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
return status;
return (!manual_compaction_dequeue_.empty());
}
-void DBImpl::AddManualCompaction(DBImpl::ManualCompaction* m) {
+void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
manual_compaction_dequeue_.push_back(m);
}
-void DBImpl::RemoveManualCompaction(DBImpl::ManualCompaction* m) {
+void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
// Remove from queue
- std::deque<ManualCompaction*>::iterator it =
+ std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) {
if (m == (*it)) {
return;
}
-bool DBImpl::ShouldntRunManualCompaction(ManualCompaction* m) {
+bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
if (num_running_ingest_file_ > 0) {
// We need to wait for other IngestExternalFile() calls to finish
// before running a manual compaction.
return true;
}
if (m->exclusive) {
- return (bg_compaction_scheduled_ > 0);
+ return (bg_bottom_compaction_scheduled_ > 0 ||
+ bg_compaction_scheduled_ > 0);
}
- std::deque<ManualCompaction*>::iterator it =
+ std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin();
bool seen = false;
while (it != manual_compaction_dequeue_.end()) {
bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
// Remove from priority queue
- std::deque<ManualCompaction*>::iterator it =
+ std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) {
if ((*it)->exclusive) {
}
if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
// Allow automatic compaction if manual compaction is
- // is in progress
+ // in progress
return true;
}
it++;
bool DBImpl::HasExclusiveManualCompaction() {
// Remove from priority queue
- std::deque<ManualCompaction*>::iterator it =
+ std::deque<ManualCompactionState*>::iterator it =
manual_compaction_dequeue_.begin();
while (it != manual_compaction_dequeue_.end()) {
if ((*it)->exclusive) {
return false;
}
-bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) {
+bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
if ((m->exclusive) || (m1->exclusive)) {
return true;
}
return true;
}
-// JobContext gets created and destructed outside of the lock --
-// we
-// use this convinently to:
+// SuperVersionContext gets created and destructed outside of the lock --
+// we use this conveniently to:
// * malloc one SuperVersion() outside of the lock -- new_superversion
// * delete SuperVersion()s outside of the lock -- superversions_to_free
//
// However, if InstallSuperVersionAndScheduleWork() gets called twice with the
-// same job_context, we can't reuse the SuperVersion() that got
+// same sv_context, we can't reuse the SuperVersion() that got
// malloced because
// first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free
-void DBImpl::InstallSuperVersionAndScheduleWorkWrapper(
- ColumnFamilyData* cfd, JobContext* job_context,
- const MutableCFOptions& mutable_cf_options) {
- mutex_.AssertHeld();
- SuperVersion* old_superversion = InstallSuperVersionAndScheduleWork(
- cfd, job_context->new_superversion, mutable_cf_options);
- job_context->new_superversion = nullptr;
- job_context->superversions_to_free.push_back(old_superversion);
-}
-SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork(
- ColumnFamilyData* cfd, SuperVersion* new_sv,
- const MutableCFOptions& mutable_cf_options) {
+void DBImpl::InstallSuperVersionAndScheduleWork(
+ ColumnFamilyData* cfd, SuperVersionContext* sv_context,
+ const MutableCFOptions& mutable_cf_options,
+ FlushReason /* flush_reason */) {
+ // TODO(yanqin) investigate if 'flush_reason' can be removed since it's not
+ // used.
mutex_.AssertHeld();
// Update max_total_in_memory_state_
old_sv->mutable_cf_options.max_write_buffer_number;
}
- auto* old = cfd->InstallSuperVersion(
- new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options);
+ // this branch is unlikely to step in
+ if (UNLIKELY(sv_context->new_superversion == nullptr)) {
+ sv_context->NewSuperVersion();
+ }
+ cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
// Whenever we install new SuperVersion, we might need to issue new flushes or
// compactions.
- SchedulePendingFlush(cfd);
SchedulePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
// Update max_total_in_memory_state_
- max_total_in_memory_state_ =
- max_total_in_memory_state_ - old_memtable_size +
- mutable_cf_options.write_buffer_size *
- mutable_cf_options.max_write_buffer_number;
- return old;
+ max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size +
+ mutable_cf_options.write_buffer_size *
+ mutable_cf_options.max_write_buffer_number;
+}
+
+// ShouldPurge is called by FindObsoleteFiles when doing a full scan,
+// and db mutex (mutex_) should already be held. This function performs a
+// linear scan of an vector (files_grabbed_for_purge_) in search of a
+// certain element. We expect FindObsoleteFiles with full scan to occur once
+// every 10 hours by default, and the size of the vector is small.
+// Therefore, the cost is affordable even if the mutex is held.
+// Actually, the current implementation of FindObsoleteFiles with
+// full_scan=true can issue I/O requests to obtain list of files in
+// directories, e.g. env_->getChildren while holding db mutex.
+// In the future, if we want to reduce the cost of search, we may try to keep
+// the vector sorted.
+bool DBImpl::ShouldPurge(uint64_t file_number) const {
+ for (auto fn : files_grabbed_for_purge_) {
+ if (file_number == fn) {
+ return false;
+ }
+ }
+ for (const auto& purge_file_info : purge_queue_) {
+ if (purge_file_info.number == file_number) {
+ return false;
+ }
+ }
+ return true;
+}
+
+// MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
+// (mutex_) should already be held.
+void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
+ files_grabbed_for_purge_.emplace_back(file_number);
+}
+
+void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
+ InstrumentedMutexLock l(&mutex_);
+ // snapshot_checker_ should only set once. If we need to set it multiple
+ // times, we need to make sure the old one is not deleted while it is still
+ // using by a compaction job.
+ assert(!snapshot_checker_);
+ snapshot_checker_.reset(snapshot_checker);
}
} // namespace rocksdb