1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "file/sst_file_manager_impl.h"
11 #include "db/db_impl/db_impl.h"
12 #include "env/composite_env_wrapper.h"
13 #include "port/port.h"
14 #include "rocksdb/env.h"
15 #include "rocksdb/sst_file_manager.h"
16 #include "test_util/sync_point.h"
17 #include "util/mutexlock.h"
19 namespace ROCKSDB_NAMESPACE
{
22 SstFileManagerImpl::SstFileManagerImpl(Env
* env
, std::shared_ptr
<FileSystem
> fs
,
23 std::shared_ptr
<Logger
> logger
,
24 int64_t rate_bytes_per_sec
,
25 double max_trash_db_ratio
,
26 uint64_t bytes_max_delete_chunk
)
31 in_progress_files_size_(0),
32 compaction_buffer_size_(0),
33 cur_compactions_reserved_size_(0),
34 max_allowed_space_(0),
35 delete_scheduler_(env
, fs_
.get(), rate_bytes_per_sec
, logger
.get(), this,
36 max_trash_db_ratio
, bytes_max_delete_chunk
),
40 reserved_disk_buffer_(0),
41 free_space_trigger_(0),
42 cur_instance_(nullptr) {}
44 SstFileManagerImpl::~SstFileManagerImpl() {
46 bg_err_
.PermitUncheckedError();
49 void SstFileManagerImpl::Close() {
63 Status
SstFileManagerImpl::OnAddFile(const std::string
& file_path
,
66 Status s
= fs_
->GetFileSize(file_path
, IOOptions(), &file_size
, nullptr);
69 OnAddFileImpl(file_path
, file_size
, compaction
);
71 TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
75 Status
SstFileManagerImpl::OnAddFile(const std::string
& file_path
,
76 uint64_t file_size
, bool compaction
) {
78 OnAddFileImpl(file_path
, file_size
, compaction
);
79 TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
83 Status
SstFileManagerImpl::OnDeleteFile(const std::string
& file_path
) {
86 OnDeleteFileImpl(file_path
);
88 TEST_SYNC_POINT("SstFileManagerImpl::OnDeleteFile");
92 void SstFileManagerImpl::OnCompactionCompletion(Compaction
* c
) {
94 uint64_t size_added_by_compaction
= 0;
95 for (size_t i
= 0; i
< c
->num_input_levels(); i
++) {
96 for (size_t j
= 0; j
< c
->num_input_files(i
); j
++) {
97 FileMetaData
* filemeta
= c
->input(i
, j
);
98 size_added_by_compaction
+= filemeta
->fd
.GetFileSize();
101 cur_compactions_reserved_size_
-= size_added_by_compaction
;
103 auto new_files
= c
->edit()->GetNewFiles();
104 for (auto& new_file
: new_files
) {
105 auto fn
= TableFileName(c
->immutable_cf_options()->cf_paths
,
106 new_file
.second
.fd
.GetNumber(),
107 new_file
.second
.fd
.GetPathId());
108 if (in_progress_files_
.find(fn
) != in_progress_files_
.end()) {
109 auto tracked_file
= tracked_files_
.find(fn
);
110 assert(tracked_file
!= tracked_files_
.end());
111 in_progress_files_size_
-= tracked_file
->second
;
112 in_progress_files_
.erase(fn
);
117 Status
SstFileManagerImpl::OnMoveFile(const std::string
& old_path
,
118 const std::string
& new_path
,
119 uint64_t* file_size
) {
122 if (file_size
!= nullptr) {
123 *file_size
= tracked_files_
[old_path
];
125 OnAddFileImpl(new_path
, tracked_files_
[old_path
], false);
126 OnDeleteFileImpl(old_path
);
128 TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile");
132 void SstFileManagerImpl::SetMaxAllowedSpaceUsage(uint64_t max_allowed_space
) {
134 max_allowed_space_
= max_allowed_space
;
137 void SstFileManagerImpl::SetCompactionBufferSize(
138 uint64_t compaction_buffer_size
) {
140 compaction_buffer_size_
= compaction_buffer_size
;
143 bool SstFileManagerImpl::IsMaxAllowedSpaceReached() {
145 if (max_allowed_space_
<= 0) {
148 return total_files_size_
>= max_allowed_space_
;
151 bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() {
153 if (max_allowed_space_
<= 0) {
156 return total_files_size_
+ cur_compactions_reserved_size_
>=
160 bool SstFileManagerImpl::EnoughRoomForCompaction(
161 ColumnFamilyData
* cfd
, const std::vector
<CompactionInputFiles
>& inputs
,
164 uint64_t size_added_by_compaction
= 0;
165 // First check if we even have the space to do the compaction
166 for (size_t i
= 0; i
< inputs
.size(); i
++) {
167 for (size_t j
= 0; j
< inputs
[i
].size(); j
++) {
168 FileMetaData
* filemeta
= inputs
[i
][j
];
169 size_added_by_compaction
+= filemeta
->fd
.GetFileSize();
173 // Update cur_compactions_reserved_size_ so concurrent compaction
174 // don't max out space
175 size_t needed_headroom
=
176 cur_compactions_reserved_size_
+ size_added_by_compaction
+
177 compaction_buffer_size_
;
178 if (max_allowed_space_
!= 0 &&
179 (needed_headroom
+ total_files_size_
> max_allowed_space_
)) {
183 // Implement more aggressive checks only if this DB instance has already
184 // seen a NoSpace() error. This is tin order to contain a single potentially
185 // misbehaving DB instance and prevent it from slowing down compactions of
186 // other DB instances
187 if (bg_error
== Status::NoSpace() && CheckFreeSpace()) {
189 TableFileName(cfd
->ioptions()->cf_paths
, inputs
[0][0]->fd
.GetNumber(),
190 inputs
[0][0]->fd
.GetPathId());
191 uint64_t free_space
= 0;
192 Status s
= fs_
->GetFreeSpace(fn
, IOOptions(), &free_space
, nullptr);
193 s
.PermitUncheckedError(); // TODO: Check the status
194 // needed_headroom is based on current size reserved by compactions,
195 // minus any files created by running compactions as they would count
196 // against the reserved size. If user didn't specify any compaction
197 // buffer, add reserved_disk_buffer_ that's calculated by default so the
198 // compaction doesn't end up leaving nothing for logs and flush SSTs
199 if (compaction_buffer_size_
== 0) {
200 needed_headroom
+= reserved_disk_buffer_
;
202 needed_headroom
-= in_progress_files_size_
;
203 if (free_space
< needed_headroom
+ size_added_by_compaction
) {
204 // We hit the condition of not enough disk space
205 ROCKS_LOG_ERROR(logger_
,
206 "free space [%" PRIu64
207 " bytes] is less than "
208 "needed headroom [%" ROCKSDB_PRIszt
" bytes]\n",
209 free_space
, needed_headroom
);
214 cur_compactions_reserved_size_
+= size_added_by_compaction
;
215 // Take a snapshot of cur_compactions_reserved_size_ for when we encounter
217 free_space_trigger_
= cur_compactions_reserved_size_
;
221 uint64_t SstFileManagerImpl::GetCompactionsReservedSize() {
223 return cur_compactions_reserved_size_
;
226 uint64_t SstFileManagerImpl::GetTotalSize() {
228 return total_files_size_
;
231 std::unordered_map
<std::string
, uint64_t>
232 SstFileManagerImpl::GetTrackedFiles() {
234 return tracked_files_
;
237 int64_t SstFileManagerImpl::GetDeleteRateBytesPerSecond() {
238 return delete_scheduler_
.GetRateBytesPerSecond();
241 void SstFileManagerImpl::SetDeleteRateBytesPerSecond(int64_t delete_rate
) {
242 return delete_scheduler_
.SetRateBytesPerSecond(delete_rate
);
245 double SstFileManagerImpl::GetMaxTrashDBRatio() {
246 return delete_scheduler_
.GetMaxTrashDBRatio();
249 void SstFileManagerImpl::SetMaxTrashDBRatio(double r
) {
250 return delete_scheduler_
.SetMaxTrashDBRatio(r
);
253 uint64_t SstFileManagerImpl::GetTotalTrashSize() {
254 return delete_scheduler_
.GetTotalTrashSize();
257 void SstFileManagerImpl::ReserveDiskBuffer(uint64_t size
,
258 const std::string
& path
) {
261 reserved_disk_buffer_
+= size
;
267 void SstFileManagerImpl::ClearError() {
275 uint64_t free_space
= 0;
276 Status s
= fs_
->GetFreeSpace(path_
, IOOptions(), &free_space
, nullptr);
277 free_space
= max_allowed_space_
> 0
278 ? std::min(max_allowed_space_
, free_space
)
281 // In case of multi-DB instances, some of them may have experienced a
282 // soft error and some a hard error. In the SstFileManagerImpl, a hard
283 // error will basically override previously reported soft errors. Once
284 // we clear the hard error, we don't keep track of previous errors for
286 if (bg_err_
.severity() == Status::Severity::kHardError
) {
287 if (free_space
< reserved_disk_buffer_
) {
288 ROCKS_LOG_ERROR(logger_
,
289 "free space [%" PRIu64
290 " bytes] is less than "
291 "required disk buffer [%" PRIu64
" bytes]\n",
292 free_space
, reserved_disk_buffer_
);
293 ROCKS_LOG_ERROR(logger_
, "Cannot clear hard error\n");
294 s
= Status::NoSpace();
296 } else if (bg_err_
.severity() == Status::Severity::kSoftError
) {
297 if (free_space
< free_space_trigger_
) {
298 ROCKS_LOG_WARN(logger_
,
299 "free space [%" PRIu64
300 " bytes] is less than "
301 "free space for compaction trigger [%" PRIu64
303 free_space
, free_space_trigger_
);
304 ROCKS_LOG_WARN(logger_
, "Cannot clear soft error\n");
305 s
= Status::NoSpace();
310 // Someone could have called CancelErrorRecovery() and the list could have
311 // become empty, so check again here
312 if (s
.ok() && !error_handler_list_
.empty()) {
313 auto error_handler
= error_handler_list_
.front();
314 // Since we will release the mutex, set cur_instance_ to signal to the
315 // shutdown thread, if it calls // CancelErrorRecovery() the meantime,
316 // to indicate that this DB instance is busy. The DB instance is
317 // guaranteed to not be deleted before RecoverFromBGError() returns,
318 // since the ErrorHandler::recovery_in_prog_ flag would be true
319 cur_instance_
= error_handler
;
321 s
= error_handler
->RecoverFromBGError();
322 TEST_SYNC_POINT("SstFileManagerImpl::ErrorCleared");
324 // The DB instance might have been deleted while we were
325 // waiting for the mutex, so check cur_instance_ to make sure its
328 // Check for error again, since the instance may have recovered but
329 // immediately got another error. If that's the case, and the new
330 // error is also a NoSpace() non-fatal error, leave the instance in
332 Status err
= cur_instance_
->GetBGError();
333 if (s
.ok() && err
== Status::NoSpace() &&
334 err
.severity() < Status::Severity::kFatalError
) {
337 cur_instance_
= nullptr;
340 if (s
.ok() || s
.IsShutdownInProgress() ||
341 (!s
.ok() && s
.severity() >= Status::Severity::kFatalError
)) {
342 // If shutdown is in progress, abandon this handler instance
343 // and continue with the others
344 error_handler_list_
.pop_front();
348 if (!error_handler_list_
.empty()) {
349 // If there are more instances to be recovered, reschedule after 5
351 int64_t wait_until
= env_
->NowMicros() + 5000000;
352 cv_
.TimedWait(wait_until
);
355 // Check again for error_handler_list_ empty, as a DB instance shutdown
356 // could have removed it from the queue while we were in timed wait
357 if (error_handler_list_
.empty()) {
358 ROCKS_LOG_INFO(logger_
, "Clearing error\n");
359 bg_err_
= Status::OK();
365 void SstFileManagerImpl::StartErrorRecovery(ErrorHandler
* handler
,
368 if (bg_error
.severity() == Status::Severity::kSoftError
) {
370 // Setting bg_err_ basically means we're in degraded mode
371 // Assume that all pending compactions will fail similarly. The trigger
372 // for clearing this condition is set to current compaction reserved
373 // size, so we stop checking disk space available in
374 // EnoughRoomForCompaction once this much free space is available
377 } else if (bg_error
.severity() == Status::Severity::kHardError
) {
383 // If this is the first instance of this error, kick of a thread to poll
384 // and recover from this condition
385 if (error_handler_list_
.empty()) {
386 error_handler_list_
.push_back(handler
);
387 // Release lock before calling join. Its ok to do so because
388 // error_handler_list_ is now non-empty, so no other invocation of this
389 // function will execute this piece of code
394 // Start a new thread. The previous one would have exited.
395 bg_thread_
.reset(new port::Thread(&SstFileManagerImpl::ClearError
, this));
398 // Check if this DB instance is already in the list
399 for (auto iter
= error_handler_list_
.begin();
400 iter
!= error_handler_list_
.end(); ++iter
) {
401 if ((*iter
) == handler
) {
405 error_handler_list_
.push_back(handler
);
409 bool SstFileManagerImpl::CancelErrorRecovery(ErrorHandler
* handler
) {
412 if (cur_instance_
== handler
) {
413 // This instance is currently busy attempting to recover
414 // Nullify it so the recovery thread doesn't attempt to access it again
415 cur_instance_
= nullptr;
419 for (auto iter
= error_handler_list_
.begin();
420 iter
!= error_handler_list_
.end(); ++iter
) {
421 if ((*iter
) == handler
) {
422 error_handler_list_
.erase(iter
);
429 Status
SstFileManagerImpl::ScheduleFileDeletion(
430 const std::string
& file_path
, const std::string
& path_to_sync
,
431 const bool force_bg
) {
432 TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::ScheduleFileDeletion",
433 const_cast<std::string
*>(&file_path
));
434 return delete_scheduler_
.DeleteFile(file_path
, path_to_sync
,
438 void SstFileManagerImpl::WaitForEmptyTrash() {
439 delete_scheduler_
.WaitForEmptyTrash();
442 void SstFileManagerImpl::OnAddFileImpl(const std::string
& file_path
,
443 uint64_t file_size
, bool compaction
) {
444 auto tracked_file
= tracked_files_
.find(file_path
);
445 if (tracked_file
!= tracked_files_
.end()) {
446 // File was added before, we will just update the size
448 total_files_size_
-= tracked_file
->second
;
449 total_files_size_
+= file_size
;
450 cur_compactions_reserved_size_
-= file_size
;
452 total_files_size_
+= file_size
;
454 // Keep track of the size of files created by in-progress compactions.
455 // When calculating whether there's enough headroom for new compactions,
456 // this will be subtracted from cur_compactions_reserved_size_.
457 // Otherwise, compactions will be double counted.
458 in_progress_files_size_
+= file_size
;
459 in_progress_files_
.insert(file_path
);
462 tracked_files_
[file_path
] = file_size
;
465 void SstFileManagerImpl::OnDeleteFileImpl(const std::string
& file_path
) {
466 auto tracked_file
= tracked_files_
.find(file_path
);
467 if (tracked_file
== tracked_files_
.end()) {
468 // File is not tracked
469 assert(in_progress_files_
.find(file_path
) == in_progress_files_
.end());
473 total_files_size_
-= tracked_file
->second
;
474 // Check if it belonged to an in-progress compaction
475 if (in_progress_files_
.find(file_path
) != in_progress_files_
.end()) {
476 in_progress_files_size_
-= tracked_file
->second
;
477 in_progress_files_
.erase(file_path
);
479 tracked_files_
.erase(tracked_file
);
482 SstFileManager
* NewSstFileManager(Env
* env
, std::shared_ptr
<Logger
> info_log
,
483 std::string trash_dir
,
484 int64_t rate_bytes_per_sec
,
485 bool delete_existing_trash
, Status
* status
,
486 double max_trash_db_ratio
,
487 uint64_t bytes_max_delete_chunk
) {
488 std::shared_ptr
<FileSystem
> fs
;
490 if (env
== Env::Default()) {
491 fs
= FileSystem::Default();
493 fs
.reset(new LegacyFileSystemWrapper(env
));
496 return NewSstFileManager(env
, fs
, info_log
, trash_dir
, rate_bytes_per_sec
,
497 delete_existing_trash
, status
, max_trash_db_ratio
,
498 bytes_max_delete_chunk
);
501 SstFileManager
* NewSstFileManager(Env
* env
, std::shared_ptr
<FileSystem
> fs
,
502 std::shared_ptr
<Logger
> info_log
,
503 const std::string
& trash_dir
,
504 int64_t rate_bytes_per_sec
,
505 bool delete_existing_trash
, Status
* status
,
506 double max_trash_db_ratio
,
507 uint64_t bytes_max_delete_chunk
) {
508 SstFileManagerImpl
* res
=
509 new SstFileManagerImpl(env
, fs
, info_log
, rate_bytes_per_sec
,
510 max_trash_db_ratio
, bytes_max_delete_chunk
);
512 // trash_dir is deprecated and not needed anymore, but if user passed it
513 // we will still remove files in it.
514 Status s
= Status::OK();
515 if (delete_existing_trash
&& trash_dir
!= "") {
516 std::vector
<std::string
> files_in_trash
;
517 s
= fs
->GetChildren(trash_dir
, IOOptions(), &files_in_trash
, nullptr);
519 for (const std::string
& trash_file
: files_in_trash
) {
520 if (trash_file
== "." || trash_file
== "..") {
524 std::string path_in_trash
= trash_dir
+ "/" + trash_file
;
525 res
->OnAddFile(path_in_trash
);
527 res
->ScheduleFileDeletion(path_in_trash
, trash_dir
);
528 if (s
.ok() && !file_delete
.ok()) {
538 // No one passed us a Status, so they must not care about the error...
539 s
.PermitUncheckedError();
547 SstFileManager
* NewSstFileManager(Env
* /*env*/,
548 std::shared_ptr
<Logger
> /*info_log*/,
549 std::string
/*trash_dir*/,
550 int64_t /*rate_bytes_per_sec*/,
551 bool /*delete_existing_trash*/,
552 Status
* status
, double /*max_trash_db_ratio*/,
553 uint64_t /*bytes_max_delete_chunk*/) {
556 Status::NotSupported("SstFileManager is not supported in ROCKSDB_LITE");
561 #endif // ROCKSDB_LITE
563 } // namespace ROCKSDB_NAMESPACE