]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/file/sst_file_manager_impl.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / file / sst_file_manager_impl.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae 5
f67539c2 6#include "file/sst_file_manager_impl.h"
7c673cae 7
f67539c2 8#include <cinttypes>
7c673cae
FG
9#include <vector>
10
f67539c2
TL
11#include "db/db_impl/db_impl.h"
12#include "env/composite_env_wrapper.h"
7c673cae
FG
13#include "port/port.h"
14#include "rocksdb/env.h"
15#include "rocksdb/sst_file_manager.h"
f67539c2 16#include "test_util/sync_point.h"
7c673cae 17#include "util/mutexlock.h"
7c673cae 18
f67539c2 19namespace ROCKSDB_NAMESPACE {
7c673cae
FG
20
21#ifndef ROCKSDB_LITE
f67539c2
TL
22SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr<FileSystem> fs,
23 std::shared_ptr<Logger> logger,
11fdf7f2
TL
24 int64_t rate_bytes_per_sec,
25 double max_trash_db_ratio,
26 uint64_t bytes_max_delete_chunk)
7c673cae 27 : env_(env),
f67539c2 28 fs_(fs),
7c673cae
FG
29 logger_(logger),
30 total_files_size_(0),
11fdf7f2
TL
31 in_progress_files_size_(0),
32 compaction_buffer_size_(0),
33 cur_compactions_reserved_size_(0),
7c673cae 34 max_allowed_space_(0),
f67539c2 35 delete_scheduler_(env, fs_.get(), rate_bytes_per_sec, logger.get(), this,
11fdf7f2
TL
36 max_trash_db_ratio, bytes_max_delete_chunk),
37 cv_(&mu_),
38 closing_(false),
39 bg_thread_(nullptr),
40 reserved_disk_buffer_(0),
41 free_space_trigger_(0),
f67539c2 42 cur_instance_(nullptr) {}
7c673cae 43
11fdf7f2
TL
44SstFileManagerImpl::~SstFileManagerImpl() {
45 Close();
20effc67 46 bg_err_.PermitUncheckedError();
11fdf7f2 47}
7c673cae 48
11fdf7f2
TL
49void SstFileManagerImpl::Close() {
50 {
51 MutexLock l(&mu_);
52 if (closing_) {
53 return;
54 }
55 closing_ = true;
56 cv_.SignalAll();
57 }
58 if (bg_thread_) {
59 bg_thread_->join();
60 }
61}
62
63Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
64 bool compaction) {
7c673cae 65 uint64_t file_size;
f67539c2 66 Status s = fs_->GetFileSize(file_path, IOOptions(), &file_size, nullptr);
7c673cae
FG
67 if (s.ok()) {
68 MutexLock l(&mu_);
11fdf7f2 69 OnAddFileImpl(file_path, file_size, compaction);
7c673cae
FG
70 }
71 TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
72 return s;
73}
74
f67539c2
TL
75Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
76 uint64_t file_size, bool compaction) {
77 MutexLock l(&mu_);
78 OnAddFileImpl(file_path, file_size, compaction);
79 TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
80 return Status::OK();
81}
82
7c673cae
FG
83Status SstFileManagerImpl::OnDeleteFile(const std::string& file_path) {
84 {
85 MutexLock l(&mu_);
86 OnDeleteFileImpl(file_path);
87 }
88 TEST_SYNC_POINT("SstFileManagerImpl::OnDeleteFile");
89 return Status::OK();
90}
91
11fdf7f2
TL
92void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) {
93 MutexLock l(&mu_);
94 uint64_t size_added_by_compaction = 0;
95 for (size_t i = 0; i < c->num_input_levels(); i++) {
96 for (size_t j = 0; j < c->num_input_files(i); j++) {
97 FileMetaData* filemeta = c->input(i, j);
98 size_added_by_compaction += filemeta->fd.GetFileSize();
99 }
100 }
101 cur_compactions_reserved_size_ -= size_added_by_compaction;
102
103 auto new_files = c->edit()->GetNewFiles();
104 for (auto& new_file : new_files) {
105 auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
106 new_file.second.fd.GetNumber(),
107 new_file.second.fd.GetPathId());
108 if (in_progress_files_.find(fn) != in_progress_files_.end()) {
109 auto tracked_file = tracked_files_.find(fn);
110 assert(tracked_file != tracked_files_.end());
111 in_progress_files_size_ -= tracked_file->second;
112 in_progress_files_.erase(fn);
113 }
114 }
115}
116
7c673cae 117Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
11fdf7f2
TL
118 const std::string& new_path,
119 uint64_t* file_size) {
7c673cae
FG
120 {
121 MutexLock l(&mu_);
11fdf7f2
TL
122 if (file_size != nullptr) {
123 *file_size = tracked_files_[old_path];
124 }
125 OnAddFileImpl(new_path, tracked_files_[old_path], false);
7c673cae
FG
126 OnDeleteFileImpl(old_path);
127 }
128 TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile");
129 return Status::OK();
130}
131
132void SstFileManagerImpl::SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) {
133 MutexLock l(&mu_);
134 max_allowed_space_ = max_allowed_space;
135}
136
11fdf7f2
TL
137void SstFileManagerImpl::SetCompactionBufferSize(
138 uint64_t compaction_buffer_size) {
139 MutexLock l(&mu_);
140 compaction_buffer_size_ = compaction_buffer_size;
141}
142
7c673cae
FG
143bool SstFileManagerImpl::IsMaxAllowedSpaceReached() {
144 MutexLock l(&mu_);
145 if (max_allowed_space_ <= 0) {
146 return false;
147 }
148 return total_files_size_ >= max_allowed_space_;
149}
150
11fdf7f2
TL
151bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() {
152 MutexLock l(&mu_);
153 if (max_allowed_space_ <= 0) {
154 return false;
155 }
156 return total_files_size_ + cur_compactions_reserved_size_ >=
157 max_allowed_space_;
158}
159
160bool SstFileManagerImpl::EnoughRoomForCompaction(
161 ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
162 Status bg_error) {
163 MutexLock l(&mu_);
164 uint64_t size_added_by_compaction = 0;
165 // First check if we even have the space to do the compaction
166 for (size_t i = 0; i < inputs.size(); i++) {
167 for (size_t j = 0; j < inputs[i].size(); j++) {
168 FileMetaData* filemeta = inputs[i][j];
169 size_added_by_compaction += filemeta->fd.GetFileSize();
170 }
171 }
172
173 // Update cur_compactions_reserved_size_ so concurrent compaction
174 // don't max out space
175 size_t needed_headroom =
176 cur_compactions_reserved_size_ + size_added_by_compaction +
177 compaction_buffer_size_;
178 if (max_allowed_space_ != 0 &&
179 (needed_headroom + total_files_size_ > max_allowed_space_)) {
180 return false;
181 }
182
183 // Implement more aggressive checks only if this DB instance has already
184 // seen a NoSpace() error. This is tin order to contain a single potentially
185 // misbehaving DB instance and prevent it from slowing down compactions of
186 // other DB instances
20effc67 187 if (bg_error == Status::NoSpace() && CheckFreeSpace()) {
11fdf7f2
TL
188 auto fn =
189 TableFileName(cfd->ioptions()->cf_paths, inputs[0][0]->fd.GetNumber(),
190 inputs[0][0]->fd.GetPathId());
191 uint64_t free_space = 0;
20effc67
TL
192 Status s = fs_->GetFreeSpace(fn, IOOptions(), &free_space, nullptr);
193 s.PermitUncheckedError(); // TODO: Check the status
11fdf7f2
TL
194 // needed_headroom is based on current size reserved by compactions,
195 // minus any files created by running compactions as they would count
196 // against the reserved size. If user didn't specify any compaction
197 // buffer, add reserved_disk_buffer_ that's calculated by default so the
198 // compaction doesn't end up leaving nothing for logs and flush SSTs
199 if (compaction_buffer_size_ == 0) {
200 needed_headroom += reserved_disk_buffer_;
201 }
202 needed_headroom -= in_progress_files_size_;
203 if (free_space < needed_headroom + size_added_by_compaction) {
204 // We hit the condition of not enough disk space
494da23a
TL
205 ROCKS_LOG_ERROR(logger_,
206 "free space [%" PRIu64
207 " bytes] is less than "
208 "needed headroom [%" ROCKSDB_PRIszt " bytes]\n",
209 free_space, needed_headroom);
11fdf7f2
TL
210 return false;
211 }
212 }
213
214 cur_compactions_reserved_size_ += size_added_by_compaction;
215 // Take a snapshot of cur_compactions_reserved_size_ for when we encounter
216 // a NoSpace error.
217 free_space_trigger_ = cur_compactions_reserved_size_;
218 return true;
219}
220
221uint64_t SstFileManagerImpl::GetCompactionsReservedSize() {
222 MutexLock l(&mu_);
223 return cur_compactions_reserved_size_;
224}
225
7c673cae
FG
226uint64_t SstFileManagerImpl::GetTotalSize() {
227 MutexLock l(&mu_);
228 return total_files_size_;
229}
230
231std::unordered_map<std::string, uint64_t>
232SstFileManagerImpl::GetTrackedFiles() {
233 MutexLock l(&mu_);
234 return tracked_files_;
235}
236
237int64_t SstFileManagerImpl::GetDeleteRateBytesPerSecond() {
238 return delete_scheduler_.GetRateBytesPerSecond();
239}
240
241void SstFileManagerImpl::SetDeleteRateBytesPerSecond(int64_t delete_rate) {
242 return delete_scheduler_.SetRateBytesPerSecond(delete_rate);
243}
244
11fdf7f2
TL
245double SstFileManagerImpl::GetMaxTrashDBRatio() {
246 return delete_scheduler_.GetMaxTrashDBRatio();
247}
248
249void SstFileManagerImpl::SetMaxTrashDBRatio(double r) {
250 return delete_scheduler_.SetMaxTrashDBRatio(r);
251}
252
253uint64_t SstFileManagerImpl::GetTotalTrashSize() {
254 return delete_scheduler_.GetTotalTrashSize();
255}
256
257void SstFileManagerImpl::ReserveDiskBuffer(uint64_t size,
258 const std::string& path) {
259 MutexLock l(&mu_);
260
261 reserved_disk_buffer_ += size;
262 if (path_.empty()) {
263 path_ = path;
264 }
265}
266
267void SstFileManagerImpl::ClearError() {
268 while (true) {
269 MutexLock l(&mu_);
270
271 if (closing_) {
272 return;
273 }
274
f67539c2
TL
275 uint64_t free_space = 0;
276 Status s = fs_->GetFreeSpace(path_, IOOptions(), &free_space, nullptr);
277 free_space = max_allowed_space_ > 0
278 ? std::min(max_allowed_space_, free_space)
279 : free_space;
11fdf7f2
TL
280 if (s.ok()) {
281 // In case of multi-DB instances, some of them may have experienced a
282 // soft error and some a hard error. In the SstFileManagerImpl, a hard
283 // error will basically override previously reported soft errors. Once
284 // we clear the hard error, we don't keep track of previous errors for
285 // now
286 if (bg_err_.severity() == Status::Severity::kHardError) {
287 if (free_space < reserved_disk_buffer_) {
494da23a
TL
288 ROCKS_LOG_ERROR(logger_,
289 "free space [%" PRIu64
290 " bytes] is less than "
291 "required disk buffer [%" PRIu64 " bytes]\n",
292 free_space, reserved_disk_buffer_);
11fdf7f2
TL
293 ROCKS_LOG_ERROR(logger_, "Cannot clear hard error\n");
294 s = Status::NoSpace();
295 }
296 } else if (bg_err_.severity() == Status::Severity::kSoftError) {
297 if (free_space < free_space_trigger_) {
494da23a
TL
298 ROCKS_LOG_WARN(logger_,
299 "free space [%" PRIu64
300 " bytes] is less than "
301 "free space for compaction trigger [%" PRIu64
302 " bytes]\n",
303 free_space, free_space_trigger_);
11fdf7f2
TL
304 ROCKS_LOG_WARN(logger_, "Cannot clear soft error\n");
305 s = Status::NoSpace();
306 }
307 }
308 }
309
310 // Someone could have called CancelErrorRecovery() and the list could have
311 // become empty, so check again here
312 if (s.ok() && !error_handler_list_.empty()) {
313 auto error_handler = error_handler_list_.front();
314 // Since we will release the mutex, set cur_instance_ to signal to the
315 // shutdown thread, if it calls // CancelErrorRecovery() the meantime,
316 // to indicate that this DB instance is busy. The DB instance is
317 // guaranteed to not be deleted before RecoverFromBGError() returns,
318 // since the ErrorHandler::recovery_in_prog_ flag would be true
319 cur_instance_ = error_handler;
320 mu_.Unlock();
321 s = error_handler->RecoverFromBGError();
f67539c2 322 TEST_SYNC_POINT("SstFileManagerImpl::ErrorCleared");
11fdf7f2
TL
323 mu_.Lock();
324 // The DB instance might have been deleted while we were
325 // waiting for the mutex, so check cur_instance_ to make sure its
326 // still non-null
327 if (cur_instance_) {
328 // Check for error again, since the instance may have recovered but
329 // immediately got another error. If that's the case, and the new
330 // error is also a NoSpace() non-fatal error, leave the instance in
331 // the list
332 Status err = cur_instance_->GetBGError();
333 if (s.ok() && err == Status::NoSpace() &&
334 err.severity() < Status::Severity::kFatalError) {
335 s = err;
336 }
337 cur_instance_ = nullptr;
338 }
339
340 if (s.ok() || s.IsShutdownInProgress() ||
341 (!s.ok() && s.severity() >= Status::Severity::kFatalError)) {
342 // If shutdown is in progress, abandon this handler instance
343 // and continue with the others
344 error_handler_list_.pop_front();
345 }
346 }
347
348 if (!error_handler_list_.empty()) {
349 // If there are more instances to be recovered, reschedule after 5
350 // seconds
351 int64_t wait_until = env_->NowMicros() + 5000000;
352 cv_.TimedWait(wait_until);
353 }
354
355 // Check again for error_handler_list_ empty, as a DB instance shutdown
356 // could have removed it from the queue while we were in timed wait
357 if (error_handler_list_.empty()) {
358 ROCKS_LOG_INFO(logger_, "Clearing error\n");
359 bg_err_ = Status::OK();
360 return;
361 }
362 }
363}
364
365void SstFileManagerImpl::StartErrorRecovery(ErrorHandler* handler,
366 Status bg_error) {
367 MutexLock l(&mu_);
368 if (bg_error.severity() == Status::Severity::kSoftError) {
369 if (bg_err_.ok()) {
370 // Setting bg_err_ basically means we're in degraded mode
371 // Assume that all pending compactions will fail similarly. The trigger
372 // for clearing this condition is set to current compaction reserved
373 // size, so we stop checking disk space available in
374 // EnoughRoomForCompaction once this much free space is available
375 bg_err_ = bg_error;
376 }
377 } else if (bg_error.severity() == Status::Severity::kHardError) {
378 bg_err_ = bg_error;
379 } else {
380 assert(false);
381 }
382
383 // If this is the first instance of this error, kick of a thread to poll
384 // and recover from this condition
385 if (error_handler_list_.empty()) {
386 error_handler_list_.push_back(handler);
387 // Release lock before calling join. Its ok to do so because
388 // error_handler_list_ is now non-empty, so no other invocation of this
389 // function will execute this piece of code
390 mu_.Unlock();
391 if (bg_thread_) {
392 bg_thread_->join();
393 }
394 // Start a new thread. The previous one would have exited.
395 bg_thread_.reset(new port::Thread(&SstFileManagerImpl::ClearError, this));
396 mu_.Lock();
397 } else {
398 // Check if this DB instance is already in the list
399 for (auto iter = error_handler_list_.begin();
400 iter != error_handler_list_.end(); ++iter) {
401 if ((*iter) == handler) {
402 return;
403 }
404 }
405 error_handler_list_.push_back(handler);
406 }
407}
408
409bool SstFileManagerImpl::CancelErrorRecovery(ErrorHandler* handler) {
410 MutexLock l(&mu_);
411
412 if (cur_instance_ == handler) {
413 // This instance is currently busy attempting to recover
414 // Nullify it so the recovery thread doesn't attempt to access it again
415 cur_instance_ = nullptr;
416 return false;
417 }
418
419 for (auto iter = error_handler_list_.begin();
420 iter != error_handler_list_.end(); ++iter) {
421 if ((*iter) == handler) {
422 error_handler_list_.erase(iter);
423 return true;
424 }
425 }
426 return false;
427}
428
429Status SstFileManagerImpl::ScheduleFileDeletion(
494da23a
TL
430 const std::string& file_path, const std::string& path_to_sync,
431 const bool force_bg) {
f67539c2
TL
432 TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::ScheduleFileDeletion",
433 const_cast<std::string*>(&file_path));
494da23a
TL
434 return delete_scheduler_.DeleteFile(file_path, path_to_sync,
435 force_bg);
7c673cae
FG
436}
437
438void SstFileManagerImpl::WaitForEmptyTrash() {
439 delete_scheduler_.WaitForEmptyTrash();
440}
441
442void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path,
11fdf7f2 443 uint64_t file_size, bool compaction) {
7c673cae
FG
444 auto tracked_file = tracked_files_.find(file_path);
445 if (tracked_file != tracked_files_.end()) {
446 // File was added before, we will just update the size
11fdf7f2 447 assert(!compaction);
7c673cae
FG
448 total_files_size_ -= tracked_file->second;
449 total_files_size_ += file_size;
11fdf7f2 450 cur_compactions_reserved_size_ -= file_size;
7c673cae
FG
451 } else {
452 total_files_size_ += file_size;
11fdf7f2
TL
453 if (compaction) {
454 // Keep track of the size of files created by in-progress compactions.
455 // When calculating whether there's enough headroom for new compactions,
456 // this will be subtracted from cur_compactions_reserved_size_.
457 // Otherwise, compactions will be double counted.
458 in_progress_files_size_ += file_size;
459 in_progress_files_.insert(file_path);
460 }
7c673cae
FG
461 }
462 tracked_files_[file_path] = file_size;
463}
464
465void SstFileManagerImpl::OnDeleteFileImpl(const std::string& file_path) {
466 auto tracked_file = tracked_files_.find(file_path);
467 if (tracked_file == tracked_files_.end()) {
468 // File is not tracked
11fdf7f2 469 assert(in_progress_files_.find(file_path) == in_progress_files_.end());
7c673cae
FG
470 return;
471 }
472
473 total_files_size_ -= tracked_file->second;
11fdf7f2
TL
474 // Check if it belonged to an in-progress compaction
475 if (in_progress_files_.find(file_path) != in_progress_files_.end()) {
476 in_progress_files_size_ -= tracked_file->second;
477 in_progress_files_.erase(file_path);
478 }
7c673cae
FG
479 tracked_files_.erase(tracked_file);
480}
481
482SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<Logger> info_log,
483 std::string trash_dir,
484 int64_t rate_bytes_per_sec,
11fdf7f2
TL
485 bool delete_existing_trash, Status* status,
486 double max_trash_db_ratio,
487 uint64_t bytes_max_delete_chunk) {
f67539c2
TL
488 std::shared_ptr<FileSystem> fs;
489
490 if (env == Env::Default()) {
491 fs = FileSystem::Default();
492 } else {
493 fs.reset(new LegacyFileSystemWrapper(env));
494 }
495
496 return NewSstFileManager(env, fs, info_log, trash_dir, rate_bytes_per_sec,
497 delete_existing_trash, status, max_trash_db_ratio,
498 bytes_max_delete_chunk);
499}
500
501SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<FileSystem> fs,
502 std::shared_ptr<Logger> info_log,
503 const std::string& trash_dir,
504 int64_t rate_bytes_per_sec,
505 bool delete_existing_trash, Status* status,
506 double max_trash_db_ratio,
507 uint64_t bytes_max_delete_chunk) {
7c673cae 508 SstFileManagerImpl* res =
f67539c2 509 new SstFileManagerImpl(env, fs, info_log, rate_bytes_per_sec,
11fdf7f2 510 max_trash_db_ratio, bytes_max_delete_chunk);
7c673cae 511
11fdf7f2
TL
512 // trash_dir is deprecated and not needed anymore, but if user passed it
513 // we will still remove files in it.
20effc67 514 Status s = Status::OK();
11fdf7f2
TL
515 if (delete_existing_trash && trash_dir != "") {
516 std::vector<std::string> files_in_trash;
f67539c2 517 s = fs->GetChildren(trash_dir, IOOptions(), &files_in_trash, nullptr);
11fdf7f2
TL
518 if (s.ok()) {
519 for (const std::string& trash_file : files_in_trash) {
520 if (trash_file == "." || trash_file == "..") {
521 continue;
522 }
523
524 std::string path_in_trash = trash_dir + "/" + trash_file;
525 res->OnAddFile(path_in_trash);
526 Status file_delete =
527 res->ScheduleFileDeletion(path_in_trash, trash_dir);
528 if (s.ok() && !file_delete.ok()) {
529 s = file_delete;
7c673cae
FG
530 }
531 }
532 }
533 }
534
535 if (status) {
536 *status = s;
20effc67
TL
537 } else {
538 // No one passed us a Status, so they must not care about the error...
539 s.PermitUncheckedError();
7c673cae
FG
540 }
541
542 return res;
543}
544
545#else
546
11fdf7f2
TL
547SstFileManager* NewSstFileManager(Env* /*env*/,
548 std::shared_ptr<Logger> /*info_log*/,
549 std::string /*trash_dir*/,
550 int64_t /*rate_bytes_per_sec*/,
551 bool /*delete_existing_trash*/,
552 Status* status, double /*max_trash_db_ratio*/,
553 uint64_t /*bytes_max_delete_chunk*/) {
7c673cae
FG
554 if (status) {
555 *status =
556 Status::NotSupported("SstFileManager is not supported in ROCKSDB_LITE");
557 }
558 return nullptr;
559}
560
561#endif // ROCKSDB_LITE
562
f67539c2 563} // namespace ROCKSDB_NAMESPACE