#include <vector>
#include "db/db_impl/db_impl.h"
-#include "env/composite_env_wrapper.h"
+#include "logging/logging.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/sst_file_manager.h"
namespace ROCKSDB_NAMESPACE {
#ifndef ROCKSDB_LITE
-SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr<FileSystem> fs,
- std::shared_ptr<Logger> logger,
- int64_t rate_bytes_per_sec,
- double max_trash_db_ratio,
- uint64_t bytes_max_delete_chunk)
- : env_(env),
+SstFileManagerImpl::SstFileManagerImpl(
+ const std::shared_ptr<SystemClock>& clock,
+ const std::shared_ptr<FileSystem>& fs,
+ const std::shared_ptr<Logger>& logger, int64_t rate_bytes_per_sec,
+ double max_trash_db_ratio, uint64_t bytes_max_delete_chunk)
+ : clock_(clock),
fs_(fs),
logger_(logger),
total_files_size_(0),
- in_progress_files_size_(0),
compaction_buffer_size_(0),
cur_compactions_reserved_size_(0),
max_allowed_space_(0),
- delete_scheduler_(env, fs_.get(), rate_bytes_per_sec, logger.get(), this,
- max_trash_db_ratio, bytes_max_delete_chunk),
+ delete_scheduler_(clock_.get(), fs_.get(), rate_bytes_per_sec,
+ logger.get(), this, max_trash_db_ratio,
+ bytes_max_delete_chunk),
cv_(&mu_),
closing_(false),
bg_thread_(nullptr),
}
}
-Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
- bool compaction) {
+Status SstFileManagerImpl::OnAddFile(const std::string& file_path) {
uint64_t file_size;
Status s = fs_->GetFileSize(file_path, IOOptions(), &file_size, nullptr);
if (s.ok()) {
MutexLock l(&mu_);
- OnAddFileImpl(file_path, file_size, compaction);
+ OnAddFileImpl(file_path, file_size);
}
- TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
+ TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::OnAddFile",
+ const_cast<std::string*>(&file_path));
return s;
}
Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
- uint64_t file_size, bool compaction) {
+ uint64_t file_size) {
MutexLock l(&mu_);
- OnAddFileImpl(file_path, file_size, compaction);
- TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
+ OnAddFileImpl(file_path, file_size);
+ TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::OnAddFile",
+ const_cast<std::string*>(&file_path));
return Status::OK();
}
MutexLock l(&mu_);
OnDeleteFileImpl(file_path);
}
- TEST_SYNC_POINT("SstFileManagerImpl::OnDeleteFile");
+ TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::OnDeleteFile",
+ const_cast<std::string*>(&file_path));
return Status::OK();
}
}
}
cur_compactions_reserved_size_ -= size_added_by_compaction;
-
- auto new_files = c->edit()->GetNewFiles();
- for (auto& new_file : new_files) {
- auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
- new_file.second.fd.GetNumber(),
- new_file.second.fd.GetPathId());
- if (in_progress_files_.find(fn) != in_progress_files_.end()) {
- auto tracked_file = tracked_files_.find(fn);
- assert(tracked_file != tracked_files_.end());
- in_progress_files_size_ -= tracked_file->second;
- in_progress_files_.erase(fn);
- }
- }
}
Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
if (file_size != nullptr) {
*file_size = tracked_files_[old_path];
}
- OnAddFileImpl(new_path, tracked_files_[old_path], false);
+ OnAddFileImpl(new_path, tracked_files_[old_path]);
OnDeleteFileImpl(old_path);
}
TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile");
bool SstFileManagerImpl::EnoughRoomForCompaction(
ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
- Status bg_error) {
+ const Status& bg_error) {
MutexLock l(&mu_);
uint64_t size_added_by_compaction = 0;
// First check if we even have the space to do the compaction
// Update cur_compactions_reserved_size_ so concurrent compaction
// don't max out space
- size_t needed_headroom =
- cur_compactions_reserved_size_ + size_added_by_compaction +
- compaction_buffer_size_;
+ size_t needed_headroom = cur_compactions_reserved_size_ +
+ size_added_by_compaction + compaction_buffer_size_;
if (max_allowed_space_ != 0 &&
(needed_headroom + total_files_size_ > max_allowed_space_)) {
return false;
// seen a NoSpace() error. This is tin order to contain a single potentially
// misbehaving DB instance and prevent it from slowing down compactions of
// other DB instances
- if (bg_error == Status::NoSpace() && CheckFreeSpace()) {
+ if (bg_error.IsNoSpace() && CheckFreeSpace()) {
auto fn =
TableFileName(cfd->ioptions()->cf_paths, inputs[0][0]->fd.GetNumber(),
inputs[0][0]->fd.GetPathId());
if (compaction_buffer_size_ == 0) {
needed_headroom += reserved_disk_buffer_;
}
- needed_headroom -= in_progress_files_size_;
if (free_space < needed_headroom + size_added_by_compaction) {
// We hit the condition of not enough disk space
ROCKS_LOG_ERROR(logger_,
while (true) {
MutexLock l(&mu_);
- if (closing_) {
+ if (error_handler_list_.empty() || closing_) {
return;
}
// Someone could have called CancelErrorRecovery() and the list could have
// become empty, so check again here
- if (s.ok() && !error_handler_list_.empty()) {
+ if (s.ok()) {
+ assert(!error_handler_list_.empty());
auto error_handler = error_handler_list_.front();
// Since we will release the mutex, set cur_instance_ to signal to the
// shutdown thread, if it calls // CancelErrorRecovery() the meantime,
// error is also a NoSpace() non-fatal error, leave the instance in
// the list
Status err = cur_instance_->GetBGError();
- if (s.ok() && err == Status::NoSpace() &&
+ if (s.ok() && err.subcode() == IOStatus::SubCode::kNoSpace &&
err.severity() < Status::Severity::kFatalError) {
s = err;
}
if (!error_handler_list_.empty()) {
// If there are more instances to be recovered, reschedule after 5
// seconds
- int64_t wait_until = env_->NowMicros() + 5000000;
+ int64_t wait_until = clock_->NowMicros() + 5000000;
cv_.TimedWait(wait_until);
}
return false;
}
-Status SstFileManagerImpl::ScheduleFileDeletion(
- const std::string& file_path, const std::string& path_to_sync,
- const bool force_bg) {
+Status SstFileManagerImpl::ScheduleFileDeletion(const std::string& file_path,
+ const std::string& path_to_sync,
+ const bool force_bg) {
TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::ScheduleFileDeletion",
const_cast<std::string*>(&file_path));
- return delete_scheduler_.DeleteFile(file_path, path_to_sync,
- force_bg);
+ return delete_scheduler_.DeleteFile(file_path, path_to_sync, force_bg);
}
void SstFileManagerImpl::WaitForEmptyTrash() {
}
void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path,
- uint64_t file_size, bool compaction) {
+ uint64_t file_size) {
auto tracked_file = tracked_files_.find(file_path);
if (tracked_file != tracked_files_.end()) {
// File was added before, we will just update the size
- assert(!compaction);
total_files_size_ -= tracked_file->second;
total_files_size_ += file_size;
cur_compactions_reserved_size_ -= file_size;
} else {
total_files_size_ += file_size;
- if (compaction) {
- // Keep track of the size of files created by in-progress compactions.
- // When calculating whether there's enough headroom for new compactions,
- // this will be subtracted from cur_compactions_reserved_size_.
- // Otherwise, compactions will be double counted.
- in_progress_files_size_ += file_size;
- in_progress_files_.insert(file_path);
- }
}
tracked_files_[file_path] = file_size;
}
auto tracked_file = tracked_files_.find(file_path);
if (tracked_file == tracked_files_.end()) {
// File is not tracked
- assert(in_progress_files_.find(file_path) == in_progress_files_.end());
return;
}
total_files_size_ -= tracked_file->second;
- // Check if it belonged to an in-progress compaction
- if (in_progress_files_.find(file_path) != in_progress_files_.end()) {
- in_progress_files_size_ -= tracked_file->second;
- in_progress_files_.erase(file_path);
- }
tracked_files_.erase(tracked_file);
}
bool delete_existing_trash, Status* status,
double max_trash_db_ratio,
uint64_t bytes_max_delete_chunk) {
- std::shared_ptr<FileSystem> fs;
-
- if (env == Env::Default()) {
- fs = FileSystem::Default();
- } else {
- fs.reset(new LegacyFileSystemWrapper(env));
- }
-
+ const auto& fs = env->GetFileSystem();
return NewSstFileManager(env, fs, info_log, trash_dir, rate_bytes_per_sec,
delete_existing_trash, status, max_trash_db_ratio,
bytes_max_delete_chunk);
bool delete_existing_trash, Status* status,
double max_trash_db_ratio,
uint64_t bytes_max_delete_chunk) {
+ const auto& clock = env->GetSystemClock();
SstFileManagerImpl* res =
- new SstFileManagerImpl(env, fs, info_log, rate_bytes_per_sec,
+ new SstFileManagerImpl(clock, fs, info_log, rate_bytes_per_sec,
max_trash_db_ratio, bytes_max_delete_chunk);
// trash_dir is deprecated and not needed anymore, but if user passed it
s = fs->GetChildren(trash_dir, IOOptions(), &files_in_trash, nullptr);
if (s.ok()) {
for (const std::string& trash_file : files_in_trash) {
- if (trash_file == "." || trash_file == "..") {
- continue;
- }
-
std::string path_in_trash = trash_dir + "/" + trash_file;
res->OnAddFile(path_in_trash);
Status file_delete =