#include "logging/logging.h"
#include "port/port.h"
#include "rocksdb/env.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/system_clock.h"
#include "test_util/sync_point.h"
#include "util/mutexlock.h"
namespace ROCKSDB_NAMESPACE {
-DeleteScheduler::DeleteScheduler(Env* env, FileSystem* fs,
+DeleteScheduler::DeleteScheduler(SystemClock* clock, FileSystem* fs,
int64_t rate_bytes_per_sec, Logger* info_log,
SstFileManagerImpl* sst_file_manager,
double max_trash_db_ratio,
uint64_t bytes_max_delete_chunk)
- : env_(env),
+ : clock_(clock),
fs_(fs),
total_trash_size_(0),
rate_bytes_per_sec_(rate_bytes_per_sec),
if (bg_thread_) {
bg_thread_->join();
}
+ for (const auto& it : bg_errors_) {
+ it.second.PermitUncheckedError();
+ }
}
Status DeleteScheduler::DeleteFile(const std::string& file_path,
const std::string& dir_to_sync,
const bool force_bg) {
- Status s;
- if (rate_bytes_per_sec_.load() <= 0 || (!force_bg &&
- total_trash_size_.load() >
- sst_file_manager_->GetTotalSize() * max_trash_db_ratio_.load())) {
+ if (rate_bytes_per_sec_.load() <= 0 ||
+ (!force_bg &&
+ total_trash_size_.load() >
+ sst_file_manager_->GetTotalSize() * max_trash_db_ratio_.load())) {
// Rate limiting is disabled or trash size makes up more than
// max_trash_db_ratio_ (default 25%) of the total DB size
TEST_SYNC_POINT("DeleteScheduler::DeleteFile");
- s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
+ Status s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
if (s.ok()) {
s = sst_file_manager_->OnDeleteFile(file_path);
ROCKS_LOG_INFO(info_log_,
// Move file to trash
std::string trash_file;
- s = MarkAsTrash(file_path, &trash_file);
+ Status s = MarkAsTrash(file_path, &trash_file);
ROCKS_LOG_INFO(info_log_, "Mark file: %s as trash -- %s", trash_file.c_str(),
s.ToString().c_str());
// Update the total trash size
uint64_t trash_file_size = 0;
- fs_->GetFileSize(trash_file, IOOptions(), &trash_file_size, nullptr);
- total_trash_size_.fetch_add(trash_file_size);
+ IOStatus io_s =
+ fs_->GetFileSize(trash_file, IOOptions(), &trash_file_size, nullptr);
+ if (io_s.ok()) {
+ total_trash_size_.fetch_add(trash_file_size);
+ }
+ //**TODO: What should we do if we failed to
+ // get the file size?
// Add file to delete queue
{
Status s;
// Check if there are any files marked as trash in this path
std::vector<std::string> files_in_path;
- s = env->GetChildren(path, &files_in_path);
+ const auto& fs = env->GetFileSystem();
+ IOOptions io_opts;
+ io_opts.do_not_recurse = true;
+ s = fs->GetChildren(path, io_opts, &files_in_path,
+ /*IODebugContext*=*/nullptr);
if (!s.ok()) {
return s;
}
return Status::InvalidArgument("file_path is corrupted");
}
- Status s;
if (DeleteScheduler::IsTrashFile(file_path)) {
// This is already a trash file
*trash_file = file_path;
- return s;
+ return Status::OK();
}
*trash_file = file_path + kTrashExtension;
// TODO(tec) : Implement Env::RenameFileIfNotExist and remove
// file_move_mu mutex.
int cnt = 0;
+ Status s;
InstrumentedMutexLock l(&file_move_mu_);
while (true) {
s = fs_->FileExists(*trash_file, IOOptions(), nullptr);
cnt++;
}
if (s.ok()) {
- sst_file_manager_->OnMoveFile(file_path, *trash_file);
+ s = sst_file_manager_->OnMoveFile(file_path, *trash_file);
}
return s;
}
}
// Delete all files in queue_
- uint64_t start_time = env_->NowMicros();
+ uint64_t start_time = clock_->NowMicros();
uint64_t total_deleted_bytes = 0;
int64_t current_delete_rate = rate_bytes_per_sec_.load();
while (!queue_.empty() && !closing_) {
if (current_delete_rate != rate_bytes_per_sec_.load()) {
// User changed the delete rate
current_delete_rate = rate_bytes_per_sec_.load();
- start_time = env_->NowMicros();
+ start_time = clock_->NowMicros();
total_deleted_bytes = 0;
ROCKS_LOG_INFO(info_log_, "rate_bytes_per_sec is changed to %" PRIi64,
current_delete_rate);
if (my_status.ok()) {
if (num_hard_links == 1) {
std::unique_ptr<FSWritableFile> wf;
- my_status = fs_->ReopenWritableFile(path_in_trash, FileOptions(),
- &wf, nullptr);
+ my_status = fs_->ReopenWritableFile(path_in_trash, FileOptions(), &wf,
+ nullptr);
if (my_status.ok()) {
my_status = wf->Truncate(file_size - bytes_max_delete_chunk_,
IOOptions(), nullptr);
s = fs_->NewDirectory(dir_to_sync, IOOptions(), &dir_obj, nullptr);
}
if (s.ok()) {
- s = dir_obj->Fsync(IOOptions(), nullptr);
+ s = dir_obj->FsyncWithDirOptions(
+ IOOptions(), nullptr,
+ DirFsyncOptions(DirFsyncOptions::FsyncReason::kFileDeleted));
TEST_SYNC_POINT_CALLBACK(
"DeleteScheduler::DeleteTrashFile::AfterSyncDir",
reinterpret_cast<void*>(const_cast<std::string*>(&dir_to_sync)));