]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_impl/db_impl_files.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl_files.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10
11 #include <cinttypes>
12 #include <set>
13 #include <unordered_set>
14 #include "db/event_helpers.h"
15 #include "db/memtable_list.h"
16 #include "file/file_util.h"
17 #include "file/filename.h"
18 #include "file/sst_file_manager_impl.h"
19 #include "util/autovector.h"
20
21 namespace ROCKSDB_NAMESPACE {
22
23 uint64_t DBImpl::MinLogNumberToKeep() {
24 if (allow_2pc()) {
25 return versions_->min_log_number_to_keep_2pc();
26 } else {
27 return versions_->MinLogNumberWithUnflushedData();
28 }
29 }
30
31 uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
32 mutex_.AssertHeld();
33 if (!pending_outputs_.empty()) {
34 return *pending_outputs_.begin();
35 }
36 return std::numeric_limits<uint64_t>::max();
37 }
38
39 Status DBImpl::DisableFileDeletions() {
40 InstrumentedMutexLock l(&mutex_);
41 return DisableFileDeletionsWithLock();
42 }
43
44 Status DBImpl::DisableFileDeletionsWithLock() {
45 mutex_.AssertHeld();
46 ++disable_delete_obsolete_files_;
47 if (disable_delete_obsolete_files_ == 1) {
48 ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Disabled");
49 } else {
50 ROCKS_LOG_WARN(immutable_db_options_.info_log,
51 "File Deletions Disabled, but already disabled. Counter: %d",
52 disable_delete_obsolete_files_);
53 }
54 return Status::OK();
55 }
56
57 Status DBImpl::EnableFileDeletions(bool force) {
58 // Job id == 0 means that this is not our background process, but rather
59 // user thread
60 JobContext job_context(0);
61 int saved_counter; // initialize on all paths
62 {
63 InstrumentedMutexLock l(&mutex_);
64 if (force) {
65 // if force, we need to enable file deletions right away
66 disable_delete_obsolete_files_ = 0;
67 } else if (disable_delete_obsolete_files_ > 0) {
68 --disable_delete_obsolete_files_;
69 }
70 saved_counter = disable_delete_obsolete_files_;
71 if (saved_counter == 0) {
72 FindObsoleteFiles(&job_context, true);
73 bg_cv_.SignalAll();
74 }
75 }
76 if (saved_counter == 0) {
77 ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled");
78 if (job_context.HaveSomethingToDelete()) {
79 PurgeObsoleteFiles(job_context);
80 }
81 } else {
82 ROCKS_LOG_WARN(immutable_db_options_.info_log,
83 "File Deletions Enable, but not really enabled. Counter: %d",
84 saved_counter);
85 }
86 job_context.Clean();
87 LogFlush(immutable_db_options_.info_log);
88 return Status::OK();
89 }
90
91 bool DBImpl::IsFileDeletionsEnabled() const {
92 return 0 == disable_delete_obsolete_files_;
93 }
94
95 // * Returns the list of live files in 'sst_live' and 'blob_live'.
96 // If it's doing full scan:
97 // * Returns the list of all files in the filesystem in
98 // 'full_scan_candidate_files'.
99 // Otherwise, gets obsolete files from VersionSet.
100 // no_full_scan = true -- never do the full scan using GetChildren()
101 // force = false -- don't force the full scan, except every
102 // mutable_db_options_.delete_obsolete_files_period_micros
103 // force = true -- force the full scan
104 void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
105 bool no_full_scan) {
106 mutex_.AssertHeld();
107
108 // if deletion is disabled, do nothing
109 if (disable_delete_obsolete_files_ > 0) {
110 return;
111 }
112
113 bool doing_the_full_scan = false;
114
115 // logic for figuring out if we're doing the full scan
116 if (no_full_scan) {
117 doing_the_full_scan = false;
118 } else if (force ||
119 mutable_db_options_.delete_obsolete_files_period_micros == 0) {
120 doing_the_full_scan = true;
121 } else {
122 const uint64_t now_micros = env_->NowMicros();
123 if ((delete_obsolete_files_last_run_ +
124 mutable_db_options_.delete_obsolete_files_period_micros) <
125 now_micros) {
126 doing_the_full_scan = true;
127 delete_obsolete_files_last_run_ = now_micros;
128 }
129 }
130
131 // don't delete files that might be currently written to from compaction
132 // threads
133 // Since job_context->min_pending_output is set, until file scan finishes,
134 // mutex_ cannot be released. Otherwise, we might see no min_pending_output
135 // here but later find newer generated unfinalized files while scanning.
136 job_context->min_pending_output = MinObsoleteSstNumberToKeep();
137
138 // Get obsolete files. This function will also update the list of
139 // pending files in VersionSet().
140 versions_->GetObsoleteFiles(
141 &job_context->sst_delete_files, &job_context->blob_delete_files,
142 &job_context->manifest_delete_files, job_context->min_pending_output);
143
144 // Mark the elements in job_context->sst_delete_files and
145 // job_context->blob_delete_files as "grabbed for purge" so that other threads
146 // calling FindObsoleteFiles with full_scan=true will not add these files to
147 // candidate list for purge.
148 for (const auto& sst_to_del : job_context->sst_delete_files) {
149 MarkAsGrabbedForPurge(sst_to_del.metadata->fd.GetNumber());
150 }
151
152 for (const auto& blob_file : job_context->blob_delete_files) {
153 MarkAsGrabbedForPurge(blob_file.GetBlobFileNumber());
154 }
155
156 // store the current filenum, lognum, etc
157 job_context->manifest_file_number = versions_->manifest_file_number();
158 job_context->pending_manifest_file_number =
159 versions_->pending_manifest_file_number();
160 job_context->log_number = MinLogNumberToKeep();
161 job_context->prev_log_number = versions_->prev_log_number();
162
163 versions_->AddLiveFiles(&job_context->sst_live, &job_context->blob_live);
164 if (doing_the_full_scan) {
165 InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
166 dbname_);
167 std::set<std::string> paths;
168 for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
169 path_id++) {
170 paths.insert(immutable_db_options_.db_paths[path_id].path);
171 }
172
173 // Note that if cf_paths is not specified in the ColumnFamilyOptions
174 // of a particular column family, we use db_paths as the cf_paths
175 // setting. Hence, there can be multiple duplicates of files from db_paths
176 // in the following code. The duplicate are removed while identifying
177 // unique files in PurgeObsoleteFiles.
178 for (auto cfd : *versions_->GetColumnFamilySet()) {
179 for (size_t path_id = 0; path_id < cfd->ioptions()->cf_paths.size();
180 path_id++) {
181 auto& path = cfd->ioptions()->cf_paths[path_id].path;
182
183 if (paths.find(path) == paths.end()) {
184 paths.insert(path);
185 }
186 }
187 }
188
189 for (auto& path : paths) {
190 // set of all files in the directory. We'll exclude files that are still
191 // alive in the subsequent processings.
192 std::vector<std::string> files;
193 env_->GetChildren(path, &files).PermitUncheckedError(); // Ignore errors
194 for (const std::string& file : files) {
195 uint64_t number;
196 FileType type;
197 // 1. If we cannot parse the file name, we skip;
198 // 2. If the file with file_number equals number has already been
199 // grabbed for purge by another compaction job, or it has already been
200 // schedule for purge, we also skip it if we
201 // are doing full scan in order to avoid double deletion of the same
202 // file under race conditions. See
203 // https://github.com/facebook/rocksdb/issues/3573
204 if (!ParseFileName(file, &number, info_log_prefix.prefix, &type) ||
205 !ShouldPurge(number)) {
206 continue;
207 }
208
209 // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
210 job_context->full_scan_candidate_files.emplace_back("/" + file, path);
211 }
212 }
213
214 // Add log files in wal_dir
215 if (immutable_db_options_.wal_dir != dbname_) {
216 std::vector<std::string> log_files;
217 env_->GetChildren(immutable_db_options_.wal_dir,
218 &log_files)
219 .PermitUncheckedError(); // Ignore errors
220 for (const std::string& log_file : log_files) {
221 job_context->full_scan_candidate_files.emplace_back(
222 log_file, immutable_db_options_.wal_dir);
223 }
224 }
225 // Add info log files in db_log_dir
226 if (!immutable_db_options_.db_log_dir.empty() &&
227 immutable_db_options_.db_log_dir != dbname_) {
228 std::vector<std::string> info_log_files;
229 // Ignore errors
230 env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files)
231 .PermitUncheckedError();
232 for (std::string& log_file : info_log_files) {
233 job_context->full_scan_candidate_files.emplace_back(
234 log_file, immutable_db_options_.db_log_dir);
235 }
236 }
237 }
238
239 // logs_ is empty when called during recovery, in which case there can't yet
240 // be any tracked obsolete logs
241 if (!alive_log_files_.empty() && !logs_.empty()) {
242 uint64_t min_log_number = job_context->log_number;
243 size_t num_alive_log_files = alive_log_files_.size();
244 // find newly obsoleted log files
245 while (alive_log_files_.begin()->number < min_log_number) {
246 auto& earliest = *alive_log_files_.begin();
247 if (immutable_db_options_.recycle_log_file_num >
248 log_recycle_files_.size()) {
249 ROCKS_LOG_INFO(immutable_db_options_.info_log,
250 "adding log %" PRIu64 " to recycle list\n",
251 earliest.number);
252 log_recycle_files_.push_back(earliest.number);
253 } else {
254 job_context->log_delete_files.push_back(earliest.number);
255 }
256 if (job_context->size_log_to_delete == 0) {
257 job_context->prev_total_log_size = total_log_size_;
258 job_context->num_alive_log_files = num_alive_log_files;
259 }
260 job_context->size_log_to_delete += earliest.size;
261 total_log_size_ -= earliest.size;
262 if (two_write_queues_) {
263 log_write_mutex_.Lock();
264 }
265 alive_log_files_.pop_front();
266 if (two_write_queues_) {
267 log_write_mutex_.Unlock();
268 }
269 // Current log should always stay alive since it can't have
270 // number < MinLogNumber().
271 assert(alive_log_files_.size());
272 }
273 while (!logs_.empty() && logs_.front().number < min_log_number) {
274 auto& log = logs_.front();
275 if (log.getting_synced) {
276 log_sync_cv_.Wait();
277 // logs_ could have changed while we were waiting.
278 continue;
279 }
280 logs_to_free_.push_back(log.ReleaseWriter());
281 {
282 InstrumentedMutexLock wl(&log_write_mutex_);
283 logs_.pop_front();
284 }
285 }
286 // Current log cannot be obsolete.
287 assert(!logs_.empty());
288 }
289
290 // We're just cleaning up for DB::Write().
291 assert(job_context->logs_to_free.empty());
292 job_context->logs_to_free = logs_to_free_;
293 job_context->log_recycle_files.assign(log_recycle_files_.begin(),
294 log_recycle_files_.end());
295 if (job_context->HaveSomethingToDelete()) {
296 ++pending_purge_obsolete_files_;
297 }
298 logs_to_free_.clear();
299 }
300
301 namespace {
302 bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
303 const JobContext::CandidateFileInfo& second) {
304 if (first.file_name > second.file_name) {
305 return true;
306 } else if (first.file_name < second.file_name) {
307 return false;
308 } else {
309 return (first.file_path > second.file_path);
310 }
311 }
312 }; // namespace
313
314 // Delete obsolete files and log status and information of file deletion
315 void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
316 const std::string& path_to_sync,
317 FileType type, uint64_t number) {
318 TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl::BeforeDeletion",
319 const_cast<std::string*>(&fname));
320
321 Status file_deletion_status;
322 if (type == kTableFile || type == kBlobFile || type == kWalFile) {
323 file_deletion_status =
324 DeleteDBFile(&immutable_db_options_, fname, path_to_sync,
325 /*force_bg=*/false, /*force_fg=*/!wal_in_db_path_);
326 } else {
327 file_deletion_status = env_->DeleteFile(fname);
328 }
329 TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion",
330 &file_deletion_status);
331 if (file_deletion_status.ok()) {
332 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
333 "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id,
334 fname.c_str(), type, number,
335 file_deletion_status.ToString().c_str());
336 } else if (env_->FileExists(fname).IsNotFound()) {
337 ROCKS_LOG_INFO(
338 immutable_db_options_.info_log,
339 "[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64
340 " -- %s\n",
341 job_id, fname.c_str(), type, number,
342 file_deletion_status.ToString().c_str());
343 } else {
344 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
345 "[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n",
346 job_id, fname.c_str(), type, number,
347 file_deletion_status.ToString().c_str());
348 }
349 if (type == kTableFile) {
350 EventHelpers::LogAndNotifyTableFileDeletion(
351 &event_logger_, job_id, number, fname, file_deletion_status, GetName(),
352 immutable_db_options_.listeners);
353 }
354 }
355
356 // Diffs the files listed in filenames and those that do not
357 // belong to live files are possibly removed. Also, removes all the
358 // files in sst_delete_files and log_delete_files.
359 // It is not necessary to hold the mutex when invoking this method.
360 void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
361 TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin");
362 // we'd better have sth to delete
363 assert(state.HaveSomethingToDelete());
364
365 // FindObsoleteFiles() should've populated this so nonzero
366 assert(state.manifest_file_number != 0);
367
368 // Now, convert lists to unordered sets, WITHOUT mutex held; set is slow.
369 std::unordered_set<uint64_t> sst_live_set(state.sst_live.begin(),
370 state.sst_live.end());
371 std::unordered_set<uint64_t> blob_live_set(state.blob_live.begin(),
372 state.blob_live.end());
373 std::unordered_set<uint64_t> log_recycle_files_set(
374 state.log_recycle_files.begin(), state.log_recycle_files.end());
375
376 auto candidate_files = state.full_scan_candidate_files;
377 candidate_files.reserve(
378 candidate_files.size() + state.sst_delete_files.size() +
379 state.blob_delete_files.size() + state.log_delete_files.size() +
380 state.manifest_delete_files.size());
381 // We may ignore the dbname when generating the file names.
382 for (auto& file : state.sst_delete_files) {
383 candidate_files.emplace_back(
384 MakeTableFileName(file.metadata->fd.GetNumber()), file.path);
385 if (file.metadata->table_reader_handle) {
386 table_cache_->Release(file.metadata->table_reader_handle);
387 }
388 file.DeleteMetadata();
389 }
390
391 for (const auto& blob_file : state.blob_delete_files) {
392 candidate_files.emplace_back(BlobFileName(blob_file.GetBlobFileNumber()),
393 blob_file.GetPath());
394 }
395
396 for (auto file_num : state.log_delete_files) {
397 if (file_num > 0) {
398 candidate_files.emplace_back(LogFileName(file_num),
399 immutable_db_options_.wal_dir);
400 }
401 }
402 for (const auto& filename : state.manifest_delete_files) {
403 candidate_files.emplace_back(filename, dbname_);
404 }
405
406 // dedup state.candidate_files so we don't try to delete the same
407 // file twice
408 std::sort(candidate_files.begin(), candidate_files.end(),
409 CompareCandidateFile);
410 candidate_files.erase(
411 std::unique(candidate_files.begin(), candidate_files.end()),
412 candidate_files.end());
413
414 if (state.prev_total_log_size > 0) {
415 ROCKS_LOG_INFO(immutable_db_options_.info_log,
416 "[JOB %d] Try to delete WAL files size %" PRIu64
417 ", prev total WAL file size %" PRIu64
418 ", number of live WAL files %" ROCKSDB_PRIszt ".\n",
419 state.job_id, state.size_log_to_delete,
420 state.prev_total_log_size, state.num_alive_log_files);
421 }
422
423 std::vector<std::string> old_info_log_files;
424 InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
425 dbname_);
426
427 // File numbers of most recent two OPTIONS file in candidate_files (found in
428 // previos FindObsoleteFiles(full_scan=true))
429 // At this point, there must not be any duplicate file numbers in
430 // candidate_files.
431 uint64_t optsfile_num1 = std::numeric_limits<uint64_t>::min();
432 uint64_t optsfile_num2 = std::numeric_limits<uint64_t>::min();
433 for (const auto& candidate_file : candidate_files) {
434 const std::string& fname = candidate_file.file_name;
435 uint64_t number;
436 FileType type;
437 if (!ParseFileName(fname, &number, info_log_prefix.prefix, &type) ||
438 type != kOptionsFile) {
439 continue;
440 }
441 if (number > optsfile_num1) {
442 optsfile_num2 = optsfile_num1;
443 optsfile_num1 = number;
444 } else if (number > optsfile_num2) {
445 optsfile_num2 = number;
446 }
447 }
448
449 // Close WALs before trying to delete them.
450 for (const auto w : state.logs_to_free) {
451 // TODO: maybe check the return value of Close.
452 auto s = w->Close();
453 s.PermitUncheckedError();
454 }
455
456 bool own_files = OwnTablesAndLogs();
457 std::unordered_set<uint64_t> files_to_del;
458 for (const auto& candidate_file : candidate_files) {
459 const std::string& to_delete = candidate_file.file_name;
460 uint64_t number;
461 FileType type;
462 // Ignore file if we cannot recognize it.
463 if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) {
464 continue;
465 }
466
467 bool keep = true;
468 switch (type) {
469 case kWalFile:
470 keep = ((number >= state.log_number) ||
471 (number == state.prev_log_number) ||
472 (log_recycle_files_set.find(number) !=
473 log_recycle_files_set.end()));
474 break;
475 case kDescriptorFile:
476 // Keep my manifest file, and any newer incarnations'
477 // (can happen during manifest roll)
478 keep = (number >= state.manifest_file_number);
479 break;
480 case kTableFile:
481 // If the second condition is not there, this makes
482 // DontDeletePendingOutputs fail
483 keep = (sst_live_set.find(number) != sst_live_set.end()) ||
484 number >= state.min_pending_output;
485 if (!keep) {
486 files_to_del.insert(number);
487 }
488 break;
489 case kBlobFile:
490 keep = number >= state.min_pending_output ||
491 (blob_live_set.find(number) != blob_live_set.end());
492 if (!keep) {
493 files_to_del.insert(number);
494 }
495 break;
496 case kTempFile:
497 // Any temp files that are currently being written to must
498 // be recorded in pending_outputs_, which is inserted into "live".
499 // Also, SetCurrentFile creates a temp file when writing out new
500 // manifest, which is equal to state.pending_manifest_file_number. We
501 // should not delete that file
502 //
503 // TODO(yhchiang): carefully modify the third condition to safely
504 // remove the temp options files.
505 keep = (sst_live_set.find(number) != sst_live_set.end()) ||
506 (blob_live_set.find(number) != blob_live_set.end()) ||
507 (number == state.pending_manifest_file_number) ||
508 (to_delete.find(kOptionsFileNamePrefix) != std::string::npos);
509 break;
510 case kInfoLogFile:
511 keep = true;
512 if (number != 0) {
513 old_info_log_files.push_back(to_delete);
514 }
515 break;
516 case kOptionsFile:
517 keep = (number >= optsfile_num2);
518 TEST_SYNC_POINT_CALLBACK(
519 "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:1",
520 reinterpret_cast<void*>(&number));
521 TEST_SYNC_POINT_CALLBACK(
522 "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:2",
523 reinterpret_cast<void*>(&keep));
524 break;
525 case kCurrentFile:
526 case kDBLockFile:
527 case kIdentityFile:
528 case kMetaDatabase:
529 keep = true;
530 break;
531 }
532
533 if (keep) {
534 continue;
535 }
536
537 std::string fname;
538 std::string dir_to_sync;
539 if (type == kTableFile) {
540 // evict from cache
541 TableCache::Evict(table_cache_.get(), number);
542 fname = MakeTableFileName(candidate_file.file_path, number);
543 dir_to_sync = candidate_file.file_path;
544 } else if (type == kBlobFile) {
545 fname = BlobFileName(candidate_file.file_path, number);
546 dir_to_sync = candidate_file.file_path;
547 } else {
548 dir_to_sync =
549 (type == kWalFile) ? immutable_db_options_.wal_dir : dbname_;
550 fname = dir_to_sync +
551 ((!dir_to_sync.empty() && dir_to_sync.back() == '/') ||
552 (!to_delete.empty() && to_delete.front() == '/')
553 ? ""
554 : "/") +
555 to_delete;
556 }
557
558 #ifndef ROCKSDB_LITE
559 if (type == kWalFile && (immutable_db_options_.wal_ttl_seconds > 0 ||
560 immutable_db_options_.wal_size_limit_mb > 0)) {
561 wal_manager_.ArchiveWALFile(fname, number);
562 continue;
563 }
564 #endif // !ROCKSDB_LITE
565
566 // If I do not own these files, e.g. secondary instance with max_open_files
567 // = -1, then no need to delete or schedule delete these files since they
568 // will be removed by their owner, e.g. the primary instance.
569 if (!own_files) {
570 continue;
571 }
572 if (schedule_only) {
573 InstrumentedMutexLock guard_lock(&mutex_);
574 SchedulePendingPurge(fname, dir_to_sync, type, number, state.job_id);
575 } else {
576 DeleteObsoleteFileImpl(state.job_id, fname, dir_to_sync, type, number);
577 }
578 }
579
580 {
581 // After purging obsolete files, remove them from files_grabbed_for_purge_.
582 InstrumentedMutexLock guard_lock(&mutex_);
583 autovector<uint64_t> to_be_removed;
584 for (auto fn : files_grabbed_for_purge_) {
585 if (files_to_del.count(fn) != 0) {
586 to_be_removed.emplace_back(fn);
587 }
588 }
589 for (auto fn : to_be_removed) {
590 files_grabbed_for_purge_.erase(fn);
591 }
592 }
593
594 // Delete old info log files.
595 size_t old_info_log_file_count = old_info_log_files.size();
596 if (old_info_log_file_count != 0 &&
597 old_info_log_file_count >= immutable_db_options_.keep_log_file_num) {
598 std::sort(old_info_log_files.begin(), old_info_log_files.end());
599 size_t end =
600 old_info_log_file_count - immutable_db_options_.keep_log_file_num;
601 for (unsigned int i = 0; i <= end; i++) {
602 std::string& to_delete = old_info_log_files.at(i);
603 std::string full_path_to_delete =
604 (immutable_db_options_.db_log_dir.empty()
605 ? dbname_
606 : immutable_db_options_.db_log_dir) +
607 "/" + to_delete;
608 ROCKS_LOG_INFO(immutable_db_options_.info_log,
609 "[JOB %d] Delete info log file %s\n", state.job_id,
610 full_path_to_delete.c_str());
611 Status s = env_->DeleteFile(full_path_to_delete);
612 if (!s.ok()) {
613 if (env_->FileExists(full_path_to_delete).IsNotFound()) {
614 ROCKS_LOG_INFO(
615 immutable_db_options_.info_log,
616 "[JOB %d] Tried to delete non-existing info log file %s FAILED "
617 "-- %s\n",
618 state.job_id, to_delete.c_str(), s.ToString().c_str());
619 } else {
620 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
621 "[JOB %d] Delete info log file %s FAILED -- %s\n",
622 state.job_id, to_delete.c_str(),
623 s.ToString().c_str());
624 }
625 }
626 }
627 }
628 #ifndef ROCKSDB_LITE
629 wal_manager_.PurgeObsoleteWALFiles();
630 #endif // ROCKSDB_LITE
631 LogFlush(immutable_db_options_.info_log);
632 InstrumentedMutexLock l(&mutex_);
633 --pending_purge_obsolete_files_;
634 assert(pending_purge_obsolete_files_ >= 0);
635 if (pending_purge_obsolete_files_ == 0) {
636 bg_cv_.SignalAll();
637 }
638 TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:End");
639 }
640
641 void DBImpl::DeleteObsoleteFiles() {
642 mutex_.AssertHeld();
643 JobContext job_context(next_job_id_.fetch_add(1));
644 FindObsoleteFiles(&job_context, true);
645
646 mutex_.Unlock();
647 if (job_context.HaveSomethingToDelete()) {
648 PurgeObsoleteFiles(job_context);
649 }
650 job_context.Clean();
651 mutex_.Lock();
652 }
653
654 uint64_t FindMinPrepLogReferencedByMemTable(
655 VersionSet* vset, const ColumnFamilyData* cfd_to_flush,
656 const autovector<MemTable*>& memtables_to_flush) {
657 uint64_t min_log = 0;
658
659 // we must look through the memtables for two phase transactions
660 // that have been committed but not yet flushed
661 for (auto loop_cfd : *vset->GetColumnFamilySet()) {
662 if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) {
663 continue;
664 }
665
666 auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection(
667 memtables_to_flush);
668
669 if (log > 0 && (min_log == 0 || log < min_log)) {
670 min_log = log;
671 }
672
673 log = loop_cfd->mem()->GetMinLogContainingPrepSection();
674
675 if (log > 0 && (min_log == 0 || log < min_log)) {
676 min_log = log;
677 }
678 }
679
680 return min_log;
681 }
682
683 uint64_t PrecomputeMinLogNumberToKeepNon2PC(
684 VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
685 const autovector<VersionEdit*>& edit_list) {
686 assert(vset != nullptr);
687
688 // Precompute the min log number containing unflushed data for the column
689 // family being flushed (`cfd_to_flush`).
690 uint64_t cf_min_log_number_to_keep = 0;
691 for (auto& e : edit_list) {
692 if (e->HasLogNumber()) {
693 cf_min_log_number_to_keep =
694 std::max(cf_min_log_number_to_keep, e->GetLogNumber());
695 }
696 }
697 if (cf_min_log_number_to_keep == 0) {
698 // No version edit contains information on log number. The log number
699 // for this column family should stay the same as it is.
700 cf_min_log_number_to_keep = cfd_to_flush.GetLogNumber();
701 }
702
703 // Get min log number containing unflushed data for other column families.
704 uint64_t min_log_number_to_keep =
705 vset->PreComputeMinLogNumberWithUnflushedData(&cfd_to_flush);
706 if (cf_min_log_number_to_keep != 0) {
707 min_log_number_to_keep =
708 std::min(cf_min_log_number_to_keep, min_log_number_to_keep);
709 }
710 return min_log_number_to_keep;
711 }
712
713 uint64_t PrecomputeMinLogNumberToKeep2PC(
714 VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
715 const autovector<VersionEdit*>& edit_list,
716 const autovector<MemTable*>& memtables_to_flush,
717 LogsWithPrepTracker* prep_tracker) {
718 assert(vset != nullptr);
719 assert(prep_tracker != nullptr);
720 // Calculate updated min_log_number_to_keep
721 // Since the function should only be called in 2pc mode, log number in
722 // the version edit should be sufficient.
723
724 uint64_t min_log_number_to_keep =
725 PrecomputeMinLogNumberToKeepNon2PC(vset, cfd_to_flush, edit_list);
726
727 // if are 2pc we must consider logs containing prepared
728 // sections of outstanding transactions.
729 //
730 // We must check min logs with outstanding prep before we check
731 // logs references by memtables because a log referenced by the
732 // first data structure could transition to the second under us.
733 //
734 // TODO: iterating over all column families under db mutex.
735 // should find more optimal solution
736 auto min_log_in_prep_heap =
737 prep_tracker->FindMinLogContainingOutstandingPrep();
738
739 if (min_log_in_prep_heap != 0 &&
740 min_log_in_prep_heap < min_log_number_to_keep) {
741 min_log_number_to_keep = min_log_in_prep_heap;
742 }
743
744 uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(
745 vset, &cfd_to_flush, memtables_to_flush);
746
747 if (min_log_refed_by_mem != 0 &&
748 min_log_refed_by_mem < min_log_number_to_keep) {
749 min_log_number_to_keep = min_log_refed_by_mem;
750 }
751 return min_log_number_to_keep;
752 }
753
754 Status DBImpl::SetDBId() {
755 Status s;
756 // Happens when immutable_db_options_.write_dbid_to_manifest is set to true
757 // the very first time.
758 if (db_id_.empty()) {
759 // Check for the IDENTITY file and create it if not there.
760 s = fs_->FileExists(IdentityFileName(dbname_), IOOptions(), nullptr);
761 // Typically Identity file is created in NewDB() and for some reason if
762 // it is no longer available then at this point DB ID is not in Identity
763 // file or Manifest.
764 if (s.IsNotFound()) {
765 s = SetIdentityFile(env_, dbname_);
766 if (!s.ok()) {
767 return s;
768 }
769 } else if (!s.ok()) {
770 assert(s.IsIOError());
771 return s;
772 }
773 s = GetDbIdentityFromIdentityFile(&db_id_);
774 if (immutable_db_options_.write_dbid_to_manifest && s.ok()) {
775 VersionEdit edit;
776 edit.SetDBId(db_id_);
777 Options options;
778 MutableCFOptions mutable_cf_options(options);
779 versions_->db_id_ = db_id_;
780 s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
781 mutable_cf_options, &edit, &mutex_, nullptr,
782 /* new_descriptor_log */ false);
783 }
784 } else {
785 s = SetIdentityFile(env_, dbname_, db_id_);
786 }
787 return s;
788 }
789
790 Status DBImpl::DeleteUnreferencedSstFiles() {
791 mutex_.AssertHeld();
792 std::vector<std::string> paths;
793 paths.push_back(NormalizePath(dbname_ + std::string(1, kFilePathSeparator)));
794 for (const auto& db_path : immutable_db_options_.db_paths) {
795 paths.push_back(
796 NormalizePath(db_path.path + std::string(1, kFilePathSeparator)));
797 }
798 for (const auto* cfd : *versions_->GetColumnFamilySet()) {
799 for (const auto& cf_path : cfd->ioptions()->cf_paths) {
800 paths.push_back(
801 NormalizePath(cf_path.path + std::string(1, kFilePathSeparator)));
802 }
803 }
804 // Dedup paths
805 std::sort(paths.begin(), paths.end());
806 paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
807
808 uint64_t next_file_number = versions_->current_next_file_number();
809 uint64_t largest_file_number = next_file_number;
810 std::set<std::string> files_to_delete;
811 Status s;
812 for (const auto& path : paths) {
813 std::vector<std::string> files;
814 s = env_->GetChildren(path, &files);
815 if (!s.ok()) {
816 break;
817 }
818 for (const auto& fname : files) {
819 uint64_t number = 0;
820 FileType type;
821 if (!ParseFileName(fname, &number, &type)) {
822 continue;
823 }
824 // path ends with '/' or '\\'
825 const std::string normalized_fpath = path + fname;
826 largest_file_number = std::max(largest_file_number, number);
827 if (type == kTableFile && number >= next_file_number &&
828 files_to_delete.find(normalized_fpath) == files_to_delete.end()) {
829 files_to_delete.insert(normalized_fpath);
830 }
831 }
832 }
833 if (!s.ok()) {
834 return s;
835 }
836
837 if (largest_file_number > next_file_number) {
838 versions_->next_file_number_.store(largest_file_number + 1);
839 }
840
841 VersionEdit edit;
842 edit.SetNextFile(versions_->next_file_number_.load());
843 assert(versions_->GetColumnFamilySet());
844 ColumnFamilyData* default_cfd = versions_->GetColumnFamilySet()->GetDefault();
845 assert(default_cfd);
846 s = versions_->LogAndApply(
847 default_cfd, *default_cfd->GetLatestMutableCFOptions(), &edit, &mutex_,
848 directories_.GetDbDir(), /*new_descriptor_log*/ false);
849 if (!s.ok()) {
850 return s;
851 }
852
853 mutex_.Unlock();
854 for (const auto& fname : files_to_delete) {
855 s = env_->DeleteFile(fname);
856 if (!s.ok()) {
857 break;
858 }
859 }
860 mutex_.Lock();
861 return s;
862 }
863
864 } // namespace ROCKSDB_NAMESPACE