]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/file/sst_file_manager_impl.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / file / sst_file_manager_impl.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "file/sst_file_manager_impl.h"
7
8 #include <cinttypes>
9 #include <vector>
10
11 #include "db/db_impl/db_impl.h"
12 #include "env/composite_env_wrapper.h"
13 #include "port/port.h"
14 #include "rocksdb/env.h"
15 #include "rocksdb/sst_file_manager.h"
16 #include "test_util/sync_point.h"
17 #include "util/mutexlock.h"
18
19 namespace ROCKSDB_NAMESPACE {
20
21 #ifndef ROCKSDB_LITE
22 SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr<FileSystem> fs,
23 std::shared_ptr<Logger> logger,
24 int64_t rate_bytes_per_sec,
25 double max_trash_db_ratio,
26 uint64_t bytes_max_delete_chunk)
27 : env_(env),
28 fs_(fs),
29 logger_(logger),
30 total_files_size_(0),
31 in_progress_files_size_(0),
32 compaction_buffer_size_(0),
33 cur_compactions_reserved_size_(0),
34 max_allowed_space_(0),
35 delete_scheduler_(env, fs_.get(), rate_bytes_per_sec, logger.get(), this,
36 max_trash_db_ratio, bytes_max_delete_chunk),
37 cv_(&mu_),
38 closing_(false),
39 bg_thread_(nullptr),
40 reserved_disk_buffer_(0),
41 free_space_trigger_(0),
42 cur_instance_(nullptr) {}
43
44 SstFileManagerImpl::~SstFileManagerImpl() {
45 Close();
46 }
47
48 void SstFileManagerImpl::Close() {
49 {
50 MutexLock l(&mu_);
51 if (closing_) {
52 return;
53 }
54 closing_ = true;
55 cv_.SignalAll();
56 }
57 if (bg_thread_) {
58 bg_thread_->join();
59 }
60 }
61
62 Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
63 bool compaction) {
64 uint64_t file_size;
65 Status s = fs_->GetFileSize(file_path, IOOptions(), &file_size, nullptr);
66 if (s.ok()) {
67 MutexLock l(&mu_);
68 OnAddFileImpl(file_path, file_size, compaction);
69 }
70 TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
71 return s;
72 }
73
74 Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
75 uint64_t file_size, bool compaction) {
76 MutexLock l(&mu_);
77 OnAddFileImpl(file_path, file_size, compaction);
78 TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
79 return Status::OK();
80 }
81
82 Status SstFileManagerImpl::OnDeleteFile(const std::string& file_path) {
83 {
84 MutexLock l(&mu_);
85 OnDeleteFileImpl(file_path);
86 }
87 TEST_SYNC_POINT("SstFileManagerImpl::OnDeleteFile");
88 return Status::OK();
89 }
90
91 void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) {
92 MutexLock l(&mu_);
93 uint64_t size_added_by_compaction = 0;
94 for (size_t i = 0; i < c->num_input_levels(); i++) {
95 for (size_t j = 0; j < c->num_input_files(i); j++) {
96 FileMetaData* filemeta = c->input(i, j);
97 size_added_by_compaction += filemeta->fd.GetFileSize();
98 }
99 }
100 cur_compactions_reserved_size_ -= size_added_by_compaction;
101
102 auto new_files = c->edit()->GetNewFiles();
103 for (auto& new_file : new_files) {
104 auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
105 new_file.second.fd.GetNumber(),
106 new_file.second.fd.GetPathId());
107 if (in_progress_files_.find(fn) != in_progress_files_.end()) {
108 auto tracked_file = tracked_files_.find(fn);
109 assert(tracked_file != tracked_files_.end());
110 in_progress_files_size_ -= tracked_file->second;
111 in_progress_files_.erase(fn);
112 }
113 }
114 }
115
116 Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
117 const std::string& new_path,
118 uint64_t* file_size) {
119 {
120 MutexLock l(&mu_);
121 if (file_size != nullptr) {
122 *file_size = tracked_files_[old_path];
123 }
124 OnAddFileImpl(new_path, tracked_files_[old_path], false);
125 OnDeleteFileImpl(old_path);
126 }
127 TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile");
128 return Status::OK();
129 }
130
131 void SstFileManagerImpl::SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) {
132 MutexLock l(&mu_);
133 max_allowed_space_ = max_allowed_space;
134 }
135
136 void SstFileManagerImpl::SetCompactionBufferSize(
137 uint64_t compaction_buffer_size) {
138 MutexLock l(&mu_);
139 compaction_buffer_size_ = compaction_buffer_size;
140 }
141
142 bool SstFileManagerImpl::IsMaxAllowedSpaceReached() {
143 MutexLock l(&mu_);
144 if (max_allowed_space_ <= 0) {
145 return false;
146 }
147 return total_files_size_ >= max_allowed_space_;
148 }
149
150 bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() {
151 MutexLock l(&mu_);
152 if (max_allowed_space_ <= 0) {
153 return false;
154 }
155 return total_files_size_ + cur_compactions_reserved_size_ >=
156 max_allowed_space_;
157 }
158
159 bool SstFileManagerImpl::EnoughRoomForCompaction(
160 ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
161 Status bg_error) {
162 MutexLock l(&mu_);
163 uint64_t size_added_by_compaction = 0;
164 // First check if we even have the space to do the compaction
165 for (size_t i = 0; i < inputs.size(); i++) {
166 for (size_t j = 0; j < inputs[i].size(); j++) {
167 FileMetaData* filemeta = inputs[i][j];
168 size_added_by_compaction += filemeta->fd.GetFileSize();
169 }
170 }
171
172 // Update cur_compactions_reserved_size_ so concurrent compaction
173 // don't max out space
174 size_t needed_headroom =
175 cur_compactions_reserved_size_ + size_added_by_compaction +
176 compaction_buffer_size_;
177 if (max_allowed_space_ != 0 &&
178 (needed_headroom + total_files_size_ > max_allowed_space_)) {
179 return false;
180 }
181
182 // Implement more aggressive checks only if this DB instance has already
183 // seen a NoSpace() error. This is tin order to contain a single potentially
184 // misbehaving DB instance and prevent it from slowing down compactions of
185 // other DB instances
186 if (CheckFreeSpace() && bg_error == Status::NoSpace()) {
187 auto fn =
188 TableFileName(cfd->ioptions()->cf_paths, inputs[0][0]->fd.GetNumber(),
189 inputs[0][0]->fd.GetPathId());
190 uint64_t free_space = 0;
191 fs_->GetFreeSpace(fn, IOOptions(), &free_space, nullptr);
192 // needed_headroom is based on current size reserved by compactions,
193 // minus any files created by running compactions as they would count
194 // against the reserved size. If user didn't specify any compaction
195 // buffer, add reserved_disk_buffer_ that's calculated by default so the
196 // compaction doesn't end up leaving nothing for logs and flush SSTs
197 if (compaction_buffer_size_ == 0) {
198 needed_headroom += reserved_disk_buffer_;
199 }
200 needed_headroom -= in_progress_files_size_;
201 if (free_space < needed_headroom + size_added_by_compaction) {
202 // We hit the condition of not enough disk space
203 ROCKS_LOG_ERROR(logger_,
204 "free space [%" PRIu64
205 " bytes] is less than "
206 "needed headroom [%" ROCKSDB_PRIszt " bytes]\n",
207 free_space, needed_headroom);
208 return false;
209 }
210 }
211
212 cur_compactions_reserved_size_ += size_added_by_compaction;
213 // Take a snapshot of cur_compactions_reserved_size_ for when we encounter
214 // a NoSpace error.
215 free_space_trigger_ = cur_compactions_reserved_size_;
216 return true;
217 }
218
219 uint64_t SstFileManagerImpl::GetCompactionsReservedSize() {
220 MutexLock l(&mu_);
221 return cur_compactions_reserved_size_;
222 }
223
224 uint64_t SstFileManagerImpl::GetTotalSize() {
225 MutexLock l(&mu_);
226 return total_files_size_;
227 }
228
229 std::unordered_map<std::string, uint64_t>
230 SstFileManagerImpl::GetTrackedFiles() {
231 MutexLock l(&mu_);
232 return tracked_files_;
233 }
234
235 int64_t SstFileManagerImpl::GetDeleteRateBytesPerSecond() {
236 return delete_scheduler_.GetRateBytesPerSecond();
237 }
238
239 void SstFileManagerImpl::SetDeleteRateBytesPerSecond(int64_t delete_rate) {
240 return delete_scheduler_.SetRateBytesPerSecond(delete_rate);
241 }
242
243 double SstFileManagerImpl::GetMaxTrashDBRatio() {
244 return delete_scheduler_.GetMaxTrashDBRatio();
245 }
246
247 void SstFileManagerImpl::SetMaxTrashDBRatio(double r) {
248 return delete_scheduler_.SetMaxTrashDBRatio(r);
249 }
250
251 uint64_t SstFileManagerImpl::GetTotalTrashSize() {
252 return delete_scheduler_.GetTotalTrashSize();
253 }
254
255 void SstFileManagerImpl::ReserveDiskBuffer(uint64_t size,
256 const std::string& path) {
257 MutexLock l(&mu_);
258
259 reserved_disk_buffer_ += size;
260 if (path_.empty()) {
261 path_ = path;
262 }
263 }
264
265 void SstFileManagerImpl::ClearError() {
266 while (true) {
267 MutexLock l(&mu_);
268
269 if (closing_) {
270 return;
271 }
272
273 uint64_t free_space = 0;
274 Status s = fs_->GetFreeSpace(path_, IOOptions(), &free_space, nullptr);
275 free_space = max_allowed_space_ > 0
276 ? std::min(max_allowed_space_, free_space)
277 : free_space;
278 if (s.ok()) {
279 // In case of multi-DB instances, some of them may have experienced a
280 // soft error and some a hard error. In the SstFileManagerImpl, a hard
281 // error will basically override previously reported soft errors. Once
282 // we clear the hard error, we don't keep track of previous errors for
283 // now
284 if (bg_err_.severity() == Status::Severity::kHardError) {
285 if (free_space < reserved_disk_buffer_) {
286 ROCKS_LOG_ERROR(logger_,
287 "free space [%" PRIu64
288 " bytes] is less than "
289 "required disk buffer [%" PRIu64 " bytes]\n",
290 free_space, reserved_disk_buffer_);
291 ROCKS_LOG_ERROR(logger_, "Cannot clear hard error\n");
292 s = Status::NoSpace();
293 }
294 } else if (bg_err_.severity() == Status::Severity::kSoftError) {
295 if (free_space < free_space_trigger_) {
296 ROCKS_LOG_WARN(logger_,
297 "free space [%" PRIu64
298 " bytes] is less than "
299 "free space for compaction trigger [%" PRIu64
300 " bytes]\n",
301 free_space, free_space_trigger_);
302 ROCKS_LOG_WARN(logger_, "Cannot clear soft error\n");
303 s = Status::NoSpace();
304 }
305 }
306 }
307
308 // Someone could have called CancelErrorRecovery() and the list could have
309 // become empty, so check again here
310 if (s.ok() && !error_handler_list_.empty()) {
311 auto error_handler = error_handler_list_.front();
312 // Since we will release the mutex, set cur_instance_ to signal to the
313 // shutdown thread, if it calls // CancelErrorRecovery() the meantime,
314 // to indicate that this DB instance is busy. The DB instance is
315 // guaranteed to not be deleted before RecoverFromBGError() returns,
316 // since the ErrorHandler::recovery_in_prog_ flag would be true
317 cur_instance_ = error_handler;
318 mu_.Unlock();
319 s = error_handler->RecoverFromBGError();
320 TEST_SYNC_POINT("SstFileManagerImpl::ErrorCleared");
321 mu_.Lock();
322 // The DB instance might have been deleted while we were
323 // waiting for the mutex, so check cur_instance_ to make sure its
324 // still non-null
325 if (cur_instance_) {
326 // Check for error again, since the instance may have recovered but
327 // immediately got another error. If that's the case, and the new
328 // error is also a NoSpace() non-fatal error, leave the instance in
329 // the list
330 Status err = cur_instance_->GetBGError();
331 if (s.ok() && err == Status::NoSpace() &&
332 err.severity() < Status::Severity::kFatalError) {
333 s = err;
334 }
335 cur_instance_ = nullptr;
336 }
337
338 if (s.ok() || s.IsShutdownInProgress() ||
339 (!s.ok() && s.severity() >= Status::Severity::kFatalError)) {
340 // If shutdown is in progress, abandon this handler instance
341 // and continue with the others
342 error_handler_list_.pop_front();
343 }
344 }
345
346 if (!error_handler_list_.empty()) {
347 // If there are more instances to be recovered, reschedule after 5
348 // seconds
349 int64_t wait_until = env_->NowMicros() + 5000000;
350 cv_.TimedWait(wait_until);
351 }
352
353 // Check again for error_handler_list_ empty, as a DB instance shutdown
354 // could have removed it from the queue while we were in timed wait
355 if (error_handler_list_.empty()) {
356 ROCKS_LOG_INFO(logger_, "Clearing error\n");
357 bg_err_ = Status::OK();
358 return;
359 }
360 }
361 }
362
363 void SstFileManagerImpl::StartErrorRecovery(ErrorHandler* handler,
364 Status bg_error) {
365 MutexLock l(&mu_);
366 if (bg_error.severity() == Status::Severity::kSoftError) {
367 if (bg_err_.ok()) {
368 // Setting bg_err_ basically means we're in degraded mode
369 // Assume that all pending compactions will fail similarly. The trigger
370 // for clearing this condition is set to current compaction reserved
371 // size, so we stop checking disk space available in
372 // EnoughRoomForCompaction once this much free space is available
373 bg_err_ = bg_error;
374 }
375 } else if (bg_error.severity() == Status::Severity::kHardError) {
376 bg_err_ = bg_error;
377 } else {
378 assert(false);
379 }
380
381 // If this is the first instance of this error, kick of a thread to poll
382 // and recover from this condition
383 if (error_handler_list_.empty()) {
384 error_handler_list_.push_back(handler);
385 // Release lock before calling join. Its ok to do so because
386 // error_handler_list_ is now non-empty, so no other invocation of this
387 // function will execute this piece of code
388 mu_.Unlock();
389 if (bg_thread_) {
390 bg_thread_->join();
391 }
392 // Start a new thread. The previous one would have exited.
393 bg_thread_.reset(new port::Thread(&SstFileManagerImpl::ClearError, this));
394 mu_.Lock();
395 } else {
396 // Check if this DB instance is already in the list
397 for (auto iter = error_handler_list_.begin();
398 iter != error_handler_list_.end(); ++iter) {
399 if ((*iter) == handler) {
400 return;
401 }
402 }
403 error_handler_list_.push_back(handler);
404 }
405 }
406
407 bool SstFileManagerImpl::CancelErrorRecovery(ErrorHandler* handler) {
408 MutexLock l(&mu_);
409
410 if (cur_instance_ == handler) {
411 // This instance is currently busy attempting to recover
412 // Nullify it so the recovery thread doesn't attempt to access it again
413 cur_instance_ = nullptr;
414 return false;
415 }
416
417 for (auto iter = error_handler_list_.begin();
418 iter != error_handler_list_.end(); ++iter) {
419 if ((*iter) == handler) {
420 error_handler_list_.erase(iter);
421 return true;
422 }
423 }
424 return false;
425 }
426
427 Status SstFileManagerImpl::ScheduleFileDeletion(
428 const std::string& file_path, const std::string& path_to_sync,
429 const bool force_bg) {
430 TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::ScheduleFileDeletion",
431 const_cast<std::string*>(&file_path));
432 return delete_scheduler_.DeleteFile(file_path, path_to_sync,
433 force_bg);
434 }
435
436 void SstFileManagerImpl::WaitForEmptyTrash() {
437 delete_scheduler_.WaitForEmptyTrash();
438 }
439
440 void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path,
441 uint64_t file_size, bool compaction) {
442 auto tracked_file = tracked_files_.find(file_path);
443 if (tracked_file != tracked_files_.end()) {
444 // File was added before, we will just update the size
445 assert(!compaction);
446 total_files_size_ -= tracked_file->second;
447 total_files_size_ += file_size;
448 cur_compactions_reserved_size_ -= file_size;
449 } else {
450 total_files_size_ += file_size;
451 if (compaction) {
452 // Keep track of the size of files created by in-progress compactions.
453 // When calculating whether there's enough headroom for new compactions,
454 // this will be subtracted from cur_compactions_reserved_size_.
455 // Otherwise, compactions will be double counted.
456 in_progress_files_size_ += file_size;
457 in_progress_files_.insert(file_path);
458 }
459 }
460 tracked_files_[file_path] = file_size;
461 }
462
463 void SstFileManagerImpl::OnDeleteFileImpl(const std::string& file_path) {
464 auto tracked_file = tracked_files_.find(file_path);
465 if (tracked_file == tracked_files_.end()) {
466 // File is not tracked
467 assert(in_progress_files_.find(file_path) == in_progress_files_.end());
468 return;
469 }
470
471 total_files_size_ -= tracked_file->second;
472 // Check if it belonged to an in-progress compaction
473 if (in_progress_files_.find(file_path) != in_progress_files_.end()) {
474 in_progress_files_size_ -= tracked_file->second;
475 in_progress_files_.erase(file_path);
476 }
477 tracked_files_.erase(tracked_file);
478 }
479
480 SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<Logger> info_log,
481 std::string trash_dir,
482 int64_t rate_bytes_per_sec,
483 bool delete_existing_trash, Status* status,
484 double max_trash_db_ratio,
485 uint64_t bytes_max_delete_chunk) {
486 std::shared_ptr<FileSystem> fs;
487
488 if (env == Env::Default()) {
489 fs = FileSystem::Default();
490 } else {
491 fs.reset(new LegacyFileSystemWrapper(env));
492 }
493
494 return NewSstFileManager(env, fs, info_log, trash_dir, rate_bytes_per_sec,
495 delete_existing_trash, status, max_trash_db_ratio,
496 bytes_max_delete_chunk);
497 }
498
499 SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<FileSystem> fs,
500 std::shared_ptr<Logger> info_log,
501 const std::string& trash_dir,
502 int64_t rate_bytes_per_sec,
503 bool delete_existing_trash, Status* status,
504 double max_trash_db_ratio,
505 uint64_t bytes_max_delete_chunk) {
506 SstFileManagerImpl* res =
507 new SstFileManagerImpl(env, fs, info_log, rate_bytes_per_sec,
508 max_trash_db_ratio, bytes_max_delete_chunk);
509
510 // trash_dir is deprecated and not needed anymore, but if user passed it
511 // we will still remove files in it.
512 Status s;
513 if (delete_existing_trash && trash_dir != "") {
514 std::vector<std::string> files_in_trash;
515 s = fs->GetChildren(trash_dir, IOOptions(), &files_in_trash, nullptr);
516 if (s.ok()) {
517 for (const std::string& trash_file : files_in_trash) {
518 if (trash_file == "." || trash_file == "..") {
519 continue;
520 }
521
522 std::string path_in_trash = trash_dir + "/" + trash_file;
523 res->OnAddFile(path_in_trash);
524 Status file_delete =
525 res->ScheduleFileDeletion(path_in_trash, trash_dir);
526 if (s.ok() && !file_delete.ok()) {
527 s = file_delete;
528 }
529 }
530 }
531 }
532
533 if (status) {
534 *status = s;
535 }
536
537 return res;
538 }
539
540 #else
541
542 SstFileManager* NewSstFileManager(Env* /*env*/,
543 std::shared_ptr<Logger> /*info_log*/,
544 std::string /*trash_dir*/,
545 int64_t /*rate_bytes_per_sec*/,
546 bool /*delete_existing_trash*/,
547 Status* status, double /*max_trash_db_ratio*/,
548 uint64_t /*bytes_max_delete_chunk*/) {
549 if (status) {
550 *status =
551 Status::NotSupported("SstFileManager is not supported in ROCKSDB_LITE");
552 }
553 return nullptr;
554 }
555
556 #endif // ROCKSDB_LITE
557
558 } // namespace ROCKSDB_NAMESPACE