1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "file/sst_file_manager_impl.h"
11 #include "db/db_impl/db_impl.h"
12 #include "env/composite_env_wrapper.h"
13 #include "port/port.h"
14 #include "rocksdb/env.h"
15 #include "rocksdb/sst_file_manager.h"
16 #include "test_util/sync_point.h"
17 #include "util/mutexlock.h"
19 namespace ROCKSDB_NAMESPACE
{
22 SstFileManagerImpl::SstFileManagerImpl(Env
* env
, std::shared_ptr
<FileSystem
> fs
,
23 std::shared_ptr
<Logger
> logger
,
24 int64_t rate_bytes_per_sec
,
25 double max_trash_db_ratio
,
26 uint64_t bytes_max_delete_chunk
)
31 in_progress_files_size_(0),
32 compaction_buffer_size_(0),
33 cur_compactions_reserved_size_(0),
34 max_allowed_space_(0),
35 delete_scheduler_(env
, fs_
.get(), rate_bytes_per_sec
, logger
.get(), this,
36 max_trash_db_ratio
, bytes_max_delete_chunk
),
40 reserved_disk_buffer_(0),
41 free_space_trigger_(0),
42 cur_instance_(nullptr) {}
44 SstFileManagerImpl::~SstFileManagerImpl() {
48 void SstFileManagerImpl::Close() {
62 Status
SstFileManagerImpl::OnAddFile(const std::string
& file_path
,
65 Status s
= fs_
->GetFileSize(file_path
, IOOptions(), &file_size
, nullptr);
68 OnAddFileImpl(file_path
, file_size
, compaction
);
70 TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
74 Status
SstFileManagerImpl::OnAddFile(const std::string
& file_path
,
75 uint64_t file_size
, bool compaction
) {
77 OnAddFileImpl(file_path
, file_size
, compaction
);
78 TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
82 Status
SstFileManagerImpl::OnDeleteFile(const std::string
& file_path
) {
85 OnDeleteFileImpl(file_path
);
87 TEST_SYNC_POINT("SstFileManagerImpl::OnDeleteFile");
91 void SstFileManagerImpl::OnCompactionCompletion(Compaction
* c
) {
93 uint64_t size_added_by_compaction
= 0;
94 for (size_t i
= 0; i
< c
->num_input_levels(); i
++) {
95 for (size_t j
= 0; j
< c
->num_input_files(i
); j
++) {
96 FileMetaData
* filemeta
= c
->input(i
, j
);
97 size_added_by_compaction
+= filemeta
->fd
.GetFileSize();
100 cur_compactions_reserved_size_
-= size_added_by_compaction
;
102 auto new_files
= c
->edit()->GetNewFiles();
103 for (auto& new_file
: new_files
) {
104 auto fn
= TableFileName(c
->immutable_cf_options()->cf_paths
,
105 new_file
.second
.fd
.GetNumber(),
106 new_file
.second
.fd
.GetPathId());
107 if (in_progress_files_
.find(fn
) != in_progress_files_
.end()) {
108 auto tracked_file
= tracked_files_
.find(fn
);
109 assert(tracked_file
!= tracked_files_
.end());
110 in_progress_files_size_
-= tracked_file
->second
;
111 in_progress_files_
.erase(fn
);
116 Status
SstFileManagerImpl::OnMoveFile(const std::string
& old_path
,
117 const std::string
& new_path
,
118 uint64_t* file_size
) {
121 if (file_size
!= nullptr) {
122 *file_size
= tracked_files_
[old_path
];
124 OnAddFileImpl(new_path
, tracked_files_
[old_path
], false);
125 OnDeleteFileImpl(old_path
);
127 TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile");
131 void SstFileManagerImpl::SetMaxAllowedSpaceUsage(uint64_t max_allowed_space
) {
133 max_allowed_space_
= max_allowed_space
;
136 void SstFileManagerImpl::SetCompactionBufferSize(
137 uint64_t compaction_buffer_size
) {
139 compaction_buffer_size_
= compaction_buffer_size
;
142 bool SstFileManagerImpl::IsMaxAllowedSpaceReached() {
144 if (max_allowed_space_
<= 0) {
147 return total_files_size_
>= max_allowed_space_
;
150 bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() {
152 if (max_allowed_space_
<= 0) {
155 return total_files_size_
+ cur_compactions_reserved_size_
>=
159 bool SstFileManagerImpl::EnoughRoomForCompaction(
160 ColumnFamilyData
* cfd
, const std::vector
<CompactionInputFiles
>& inputs
,
163 uint64_t size_added_by_compaction
= 0;
164 // First check if we even have the space to do the compaction
165 for (size_t i
= 0; i
< inputs
.size(); i
++) {
166 for (size_t j
= 0; j
< inputs
[i
].size(); j
++) {
167 FileMetaData
* filemeta
= inputs
[i
][j
];
168 size_added_by_compaction
+= filemeta
->fd
.GetFileSize();
172 // Update cur_compactions_reserved_size_ so concurrent compaction
173 // don't max out space
174 size_t needed_headroom
=
175 cur_compactions_reserved_size_
+ size_added_by_compaction
+
176 compaction_buffer_size_
;
177 if (max_allowed_space_
!= 0 &&
178 (needed_headroom
+ total_files_size_
> max_allowed_space_
)) {
182 // Implement more aggressive checks only if this DB instance has already
183 // seen a NoSpace() error. This is tin order to contain a single potentially
184 // misbehaving DB instance and prevent it from slowing down compactions of
185 // other DB instances
186 if (CheckFreeSpace() && bg_error
== Status::NoSpace()) {
188 TableFileName(cfd
->ioptions()->cf_paths
, inputs
[0][0]->fd
.GetNumber(),
189 inputs
[0][0]->fd
.GetPathId());
190 uint64_t free_space
= 0;
191 fs_
->GetFreeSpace(fn
, IOOptions(), &free_space
, nullptr);
192 // needed_headroom is based on current size reserved by compactions,
193 // minus any files created by running compactions as they would count
194 // against the reserved size. If user didn't specify any compaction
195 // buffer, add reserved_disk_buffer_ that's calculated by default so the
196 // compaction doesn't end up leaving nothing for logs and flush SSTs
197 if (compaction_buffer_size_
== 0) {
198 needed_headroom
+= reserved_disk_buffer_
;
200 needed_headroom
-= in_progress_files_size_
;
201 if (free_space
< needed_headroom
+ size_added_by_compaction
) {
202 // We hit the condition of not enough disk space
203 ROCKS_LOG_ERROR(logger_
,
204 "free space [%" PRIu64
205 " bytes] is less than "
206 "needed headroom [%" ROCKSDB_PRIszt
" bytes]\n",
207 free_space
, needed_headroom
);
212 cur_compactions_reserved_size_
+= size_added_by_compaction
;
213 // Take a snapshot of cur_compactions_reserved_size_ for when we encounter
215 free_space_trigger_
= cur_compactions_reserved_size_
;
219 uint64_t SstFileManagerImpl::GetCompactionsReservedSize() {
221 return cur_compactions_reserved_size_
;
224 uint64_t SstFileManagerImpl::GetTotalSize() {
226 return total_files_size_
;
229 std::unordered_map
<std::string
, uint64_t>
230 SstFileManagerImpl::GetTrackedFiles() {
232 return tracked_files_
;
235 int64_t SstFileManagerImpl::GetDeleteRateBytesPerSecond() {
236 return delete_scheduler_
.GetRateBytesPerSecond();
239 void SstFileManagerImpl::SetDeleteRateBytesPerSecond(int64_t delete_rate
) {
240 return delete_scheduler_
.SetRateBytesPerSecond(delete_rate
);
243 double SstFileManagerImpl::GetMaxTrashDBRatio() {
244 return delete_scheduler_
.GetMaxTrashDBRatio();
247 void SstFileManagerImpl::SetMaxTrashDBRatio(double r
) {
248 return delete_scheduler_
.SetMaxTrashDBRatio(r
);
251 uint64_t SstFileManagerImpl::GetTotalTrashSize() {
252 return delete_scheduler_
.GetTotalTrashSize();
255 void SstFileManagerImpl::ReserveDiskBuffer(uint64_t size
,
256 const std::string
& path
) {
259 reserved_disk_buffer_
+= size
;
265 void SstFileManagerImpl::ClearError() {
273 uint64_t free_space
= 0;
274 Status s
= fs_
->GetFreeSpace(path_
, IOOptions(), &free_space
, nullptr);
275 free_space
= max_allowed_space_
> 0
276 ? std::min(max_allowed_space_
, free_space
)
279 // In case of multi-DB instances, some of them may have experienced a
280 // soft error and some a hard error. In the SstFileManagerImpl, a hard
281 // error will basically override previously reported soft errors. Once
282 // we clear the hard error, we don't keep track of previous errors for
284 if (bg_err_
.severity() == Status::Severity::kHardError
) {
285 if (free_space
< reserved_disk_buffer_
) {
286 ROCKS_LOG_ERROR(logger_
,
287 "free space [%" PRIu64
288 " bytes] is less than "
289 "required disk buffer [%" PRIu64
" bytes]\n",
290 free_space
, reserved_disk_buffer_
);
291 ROCKS_LOG_ERROR(logger_
, "Cannot clear hard error\n");
292 s
= Status::NoSpace();
294 } else if (bg_err_
.severity() == Status::Severity::kSoftError
) {
295 if (free_space
< free_space_trigger_
) {
296 ROCKS_LOG_WARN(logger_
,
297 "free space [%" PRIu64
298 " bytes] is less than "
299 "free space for compaction trigger [%" PRIu64
301 free_space
, free_space_trigger_
);
302 ROCKS_LOG_WARN(logger_
, "Cannot clear soft error\n");
303 s
= Status::NoSpace();
308 // Someone could have called CancelErrorRecovery() and the list could have
309 // become empty, so check again here
310 if (s
.ok() && !error_handler_list_
.empty()) {
311 auto error_handler
= error_handler_list_
.front();
312 // Since we will release the mutex, set cur_instance_ to signal to the
313 // shutdown thread, if it calls // CancelErrorRecovery() the meantime,
314 // to indicate that this DB instance is busy. The DB instance is
315 // guaranteed to not be deleted before RecoverFromBGError() returns,
316 // since the ErrorHandler::recovery_in_prog_ flag would be true
317 cur_instance_
= error_handler
;
319 s
= error_handler
->RecoverFromBGError();
320 TEST_SYNC_POINT("SstFileManagerImpl::ErrorCleared");
322 // The DB instance might have been deleted while we were
323 // waiting for the mutex, so check cur_instance_ to make sure its
326 // Check for error again, since the instance may have recovered but
327 // immediately got another error. If that's the case, and the new
328 // error is also a NoSpace() non-fatal error, leave the instance in
330 Status err
= cur_instance_
->GetBGError();
331 if (s
.ok() && err
== Status::NoSpace() &&
332 err
.severity() < Status::Severity::kFatalError
) {
335 cur_instance_
= nullptr;
338 if (s
.ok() || s
.IsShutdownInProgress() ||
339 (!s
.ok() && s
.severity() >= Status::Severity::kFatalError
)) {
340 // If shutdown is in progress, abandon this handler instance
341 // and continue with the others
342 error_handler_list_
.pop_front();
346 if (!error_handler_list_
.empty()) {
347 // If there are more instances to be recovered, reschedule after 5
349 int64_t wait_until
= env_
->NowMicros() + 5000000;
350 cv_
.TimedWait(wait_until
);
353 // Check again for error_handler_list_ empty, as a DB instance shutdown
354 // could have removed it from the queue while we were in timed wait
355 if (error_handler_list_
.empty()) {
356 ROCKS_LOG_INFO(logger_
, "Clearing error\n");
357 bg_err_
= Status::OK();
363 void SstFileManagerImpl::StartErrorRecovery(ErrorHandler
* handler
,
366 if (bg_error
.severity() == Status::Severity::kSoftError
) {
368 // Setting bg_err_ basically means we're in degraded mode
369 // Assume that all pending compactions will fail similarly. The trigger
370 // for clearing this condition is set to current compaction reserved
371 // size, so we stop checking disk space available in
372 // EnoughRoomForCompaction once this much free space is available
375 } else if (bg_error
.severity() == Status::Severity::kHardError
) {
381 // If this is the first instance of this error, kick of a thread to poll
382 // and recover from this condition
383 if (error_handler_list_
.empty()) {
384 error_handler_list_
.push_back(handler
);
385 // Release lock before calling join. Its ok to do so because
386 // error_handler_list_ is now non-empty, so no other invocation of this
387 // function will execute this piece of code
392 // Start a new thread. The previous one would have exited.
393 bg_thread_
.reset(new port::Thread(&SstFileManagerImpl::ClearError
, this));
396 // Check if this DB instance is already in the list
397 for (auto iter
= error_handler_list_
.begin();
398 iter
!= error_handler_list_
.end(); ++iter
) {
399 if ((*iter
) == handler
) {
403 error_handler_list_
.push_back(handler
);
407 bool SstFileManagerImpl::CancelErrorRecovery(ErrorHandler
* handler
) {
410 if (cur_instance_
== handler
) {
411 // This instance is currently busy attempting to recover
412 // Nullify it so the recovery thread doesn't attempt to access it again
413 cur_instance_
= nullptr;
417 for (auto iter
= error_handler_list_
.begin();
418 iter
!= error_handler_list_
.end(); ++iter
) {
419 if ((*iter
) == handler
) {
420 error_handler_list_
.erase(iter
);
427 Status
SstFileManagerImpl::ScheduleFileDeletion(
428 const std::string
& file_path
, const std::string
& path_to_sync
,
429 const bool force_bg
) {
430 TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::ScheduleFileDeletion",
431 const_cast<std::string
*>(&file_path
));
432 return delete_scheduler_
.DeleteFile(file_path
, path_to_sync
,
436 void SstFileManagerImpl::WaitForEmptyTrash() {
437 delete_scheduler_
.WaitForEmptyTrash();
440 void SstFileManagerImpl::OnAddFileImpl(const std::string
& file_path
,
441 uint64_t file_size
, bool compaction
) {
442 auto tracked_file
= tracked_files_
.find(file_path
);
443 if (tracked_file
!= tracked_files_
.end()) {
444 // File was added before, we will just update the size
446 total_files_size_
-= tracked_file
->second
;
447 total_files_size_
+= file_size
;
448 cur_compactions_reserved_size_
-= file_size
;
450 total_files_size_
+= file_size
;
452 // Keep track of the size of files created by in-progress compactions.
453 // When calculating whether there's enough headroom for new compactions,
454 // this will be subtracted from cur_compactions_reserved_size_.
455 // Otherwise, compactions will be double counted.
456 in_progress_files_size_
+= file_size
;
457 in_progress_files_
.insert(file_path
);
460 tracked_files_
[file_path
] = file_size
;
463 void SstFileManagerImpl::OnDeleteFileImpl(const std::string
& file_path
) {
464 auto tracked_file
= tracked_files_
.find(file_path
);
465 if (tracked_file
== tracked_files_
.end()) {
466 // File is not tracked
467 assert(in_progress_files_
.find(file_path
) == in_progress_files_
.end());
471 total_files_size_
-= tracked_file
->second
;
472 // Check if it belonged to an in-progress compaction
473 if (in_progress_files_
.find(file_path
) != in_progress_files_
.end()) {
474 in_progress_files_size_
-= tracked_file
->second
;
475 in_progress_files_
.erase(file_path
);
477 tracked_files_
.erase(tracked_file
);
480 SstFileManager
* NewSstFileManager(Env
* env
, std::shared_ptr
<Logger
> info_log
,
481 std::string trash_dir
,
482 int64_t rate_bytes_per_sec
,
483 bool delete_existing_trash
, Status
* status
,
484 double max_trash_db_ratio
,
485 uint64_t bytes_max_delete_chunk
) {
486 std::shared_ptr
<FileSystem
> fs
;
488 if (env
== Env::Default()) {
489 fs
= FileSystem::Default();
491 fs
.reset(new LegacyFileSystemWrapper(env
));
494 return NewSstFileManager(env
, fs
, info_log
, trash_dir
, rate_bytes_per_sec
,
495 delete_existing_trash
, status
, max_trash_db_ratio
,
496 bytes_max_delete_chunk
);
499 SstFileManager
* NewSstFileManager(Env
* env
, std::shared_ptr
<FileSystem
> fs
,
500 std::shared_ptr
<Logger
> info_log
,
501 const std::string
& trash_dir
,
502 int64_t rate_bytes_per_sec
,
503 bool delete_existing_trash
, Status
* status
,
504 double max_trash_db_ratio
,
505 uint64_t bytes_max_delete_chunk
) {
506 SstFileManagerImpl
* res
=
507 new SstFileManagerImpl(env
, fs
, info_log
, rate_bytes_per_sec
,
508 max_trash_db_ratio
, bytes_max_delete_chunk
);
510 // trash_dir is deprecated and not needed anymore, but if user passed it
511 // we will still remove files in it.
513 if (delete_existing_trash
&& trash_dir
!= "") {
514 std::vector
<std::string
> files_in_trash
;
515 s
= fs
->GetChildren(trash_dir
, IOOptions(), &files_in_trash
, nullptr);
517 for (const std::string
& trash_file
: files_in_trash
) {
518 if (trash_file
== "." || trash_file
== "..") {
522 std::string path_in_trash
= trash_dir
+ "/" + trash_file
;
523 res
->OnAddFile(path_in_trash
);
525 res
->ScheduleFileDeletion(path_in_trash
, trash_dir
);
526 if (s
.ok() && !file_delete
.ok()) {
542 SstFileManager
* NewSstFileManager(Env
* /*env*/,
543 std::shared_ptr
<Logger
> /*info_log*/,
544 std::string
/*trash_dir*/,
545 int64_t /*rate_bytes_per_sec*/,
546 bool /*delete_existing_trash*/,
547 Status
* status
, double /*max_trash_db_ratio*/,
548 uint64_t /*bytes_max_delete_chunk*/) {
551 Status::NotSupported("SstFileManager is not supported in ROCKSDB_LITE");
556 #endif // ROCKSDB_LITE
558 } // namespace ROCKSDB_NAMESPACE