1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
13 #include <unordered_set>
14 #include "db/event_helpers.h"
15 #include "db/memtable_list.h"
16 #include "file/file_util.h"
17 #include "file/filename.h"
18 #include "file/sst_file_manager_impl.h"
19 #include "util/autovector.h"
21 namespace ROCKSDB_NAMESPACE
{
23 uint64_t DBImpl::MinLogNumberToKeep() {
25 return versions_
->min_log_number_to_keep_2pc();
27 return versions_
->MinLogNumberWithUnflushedData();
31 uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
33 if (!pending_outputs_
.empty()) {
34 return *pending_outputs_
.begin();
36 return std::numeric_limits
<uint64_t>::max();
39 Status
DBImpl::DisableFileDeletions() {
40 InstrumentedMutexLock
l(&mutex_
);
41 return DisableFileDeletionsWithLock();
44 Status
DBImpl::DisableFileDeletionsWithLock() {
46 ++disable_delete_obsolete_files_
;
47 if (disable_delete_obsolete_files_
== 1) {
48 ROCKS_LOG_INFO(immutable_db_options_
.info_log
, "File Deletions Disabled");
50 ROCKS_LOG_WARN(immutable_db_options_
.info_log
,
51 "File Deletions Disabled, but already disabled. Counter: %d",
52 disable_delete_obsolete_files_
);
57 Status
DBImpl::EnableFileDeletions(bool force
) {
58 // Job id == 0 means that this is not our background process, but rather
60 JobContext
job_context(0);
61 int saved_counter
; // initialize on all paths
63 InstrumentedMutexLock
l(&mutex_
);
65 // if force, we need to enable file deletions right away
66 disable_delete_obsolete_files_
= 0;
67 } else if (disable_delete_obsolete_files_
> 0) {
68 --disable_delete_obsolete_files_
;
70 saved_counter
= disable_delete_obsolete_files_
;
71 if (saved_counter
== 0) {
72 FindObsoleteFiles(&job_context
, true);
76 if (saved_counter
== 0) {
77 ROCKS_LOG_INFO(immutable_db_options_
.info_log
, "File Deletions Enabled");
78 if (job_context
.HaveSomethingToDelete()) {
79 PurgeObsoleteFiles(job_context
);
82 ROCKS_LOG_WARN(immutable_db_options_
.info_log
,
83 "File Deletions Enable, but not really enabled. Counter: %d",
87 LogFlush(immutable_db_options_
.info_log
);
91 bool DBImpl::IsFileDeletionsEnabled() const {
92 return 0 == disable_delete_obsolete_files_
;
95 // * Returns the list of live files in 'sst_live' and 'blob_live'.
96 // If it's doing full scan:
97 // * Returns the list of all files in the filesystem in
98 // 'full_scan_candidate_files'.
99 // Otherwise, gets obsolete files from VersionSet.
100 // no_full_scan = true -- never do the full scan using GetChildren()
101 // force = false -- don't force the full scan, except every
102 // mutable_db_options_.delete_obsolete_files_period_micros
103 // force = true -- force the full scan
104 void DBImpl::FindObsoleteFiles(JobContext
* job_context
, bool force
,
108 // if deletion is disabled, do nothing
109 if (disable_delete_obsolete_files_
> 0) {
113 bool doing_the_full_scan
= false;
115 // logic for figuring out if we're doing the full scan
117 doing_the_full_scan
= false;
119 mutable_db_options_
.delete_obsolete_files_period_micros
== 0) {
120 doing_the_full_scan
= true;
122 const uint64_t now_micros
= env_
->NowMicros();
123 if ((delete_obsolete_files_last_run_
+
124 mutable_db_options_
.delete_obsolete_files_period_micros
) <
126 doing_the_full_scan
= true;
127 delete_obsolete_files_last_run_
= now_micros
;
131 // don't delete files that might be currently written to from compaction
133 // Since job_context->min_pending_output is set, until file scan finishes,
134 // mutex_ cannot be released. Otherwise, we might see no min_pending_output
135 // here but later find newer generated unfinalized files while scanning.
136 job_context
->min_pending_output
= MinObsoleteSstNumberToKeep();
138 // Get obsolete files. This function will also update the list of
139 // pending files in VersionSet().
140 versions_
->GetObsoleteFiles(
141 &job_context
->sst_delete_files
, &job_context
->blob_delete_files
,
142 &job_context
->manifest_delete_files
, job_context
->min_pending_output
);
144 // Mark the elements in job_context->sst_delete_files and
145 // job_context->blob_delete_files as "grabbed for purge" so that other threads
146 // calling FindObsoleteFiles with full_scan=true will not add these files to
147 // candidate list for purge.
148 for (const auto& sst_to_del
: job_context
->sst_delete_files
) {
149 MarkAsGrabbedForPurge(sst_to_del
.metadata
->fd
.GetNumber());
152 for (const auto& blob_file
: job_context
->blob_delete_files
) {
153 MarkAsGrabbedForPurge(blob_file
.GetBlobFileNumber());
156 // store the current filenum, lognum, etc
157 job_context
->manifest_file_number
= versions_
->manifest_file_number();
158 job_context
->pending_manifest_file_number
=
159 versions_
->pending_manifest_file_number();
160 job_context
->log_number
= MinLogNumberToKeep();
161 job_context
->prev_log_number
= versions_
->prev_log_number();
163 versions_
->AddLiveFiles(&job_context
->sst_live
, &job_context
->blob_live
);
164 if (doing_the_full_scan
) {
165 InfoLogPrefix
info_log_prefix(!immutable_db_options_
.db_log_dir
.empty(),
167 std::set
<std::string
> paths
;
168 for (size_t path_id
= 0; path_id
< immutable_db_options_
.db_paths
.size();
170 paths
.insert(immutable_db_options_
.db_paths
[path_id
].path
);
173 // Note that if cf_paths is not specified in the ColumnFamilyOptions
174 // of a particular column family, we use db_paths as the cf_paths
175 // setting. Hence, there can be multiple duplicates of files from db_paths
176 // in the following code. The duplicate are removed while identifying
177 // unique files in PurgeObsoleteFiles.
178 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
179 for (size_t path_id
= 0; path_id
< cfd
->ioptions()->cf_paths
.size();
181 auto& path
= cfd
->ioptions()->cf_paths
[path_id
].path
;
183 if (paths
.find(path
) == paths
.end()) {
189 for (auto& path
: paths
) {
190 // set of all files in the directory. We'll exclude files that are still
191 // alive in the subsequent processings.
192 std::vector
<std::string
> files
;
193 env_
->GetChildren(path
, &files
).PermitUncheckedError(); // Ignore errors
194 for (const std::string
& file
: files
) {
197 // 1. If we cannot parse the file name, we skip;
198 // 2. If the file with file_number equals number has already been
199 // grabbed for purge by another compaction job, or it has already been
200 // schedule for purge, we also skip it if we
201 // are doing full scan in order to avoid double deletion of the same
202 // file under race conditions. See
203 // https://github.com/facebook/rocksdb/issues/3573
204 if (!ParseFileName(file
, &number
, info_log_prefix
.prefix
, &type
) ||
205 !ShouldPurge(number
)) {
209 // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
210 job_context
->full_scan_candidate_files
.emplace_back("/" + file
, path
);
214 // Add log files in wal_dir
215 if (immutable_db_options_
.wal_dir
!= dbname_
) {
216 std::vector
<std::string
> log_files
;
217 env_
->GetChildren(immutable_db_options_
.wal_dir
,
219 .PermitUncheckedError(); // Ignore errors
220 for (const std::string
& log_file
: log_files
) {
221 job_context
->full_scan_candidate_files
.emplace_back(
222 log_file
, immutable_db_options_
.wal_dir
);
225 // Add info log files in db_log_dir
226 if (!immutable_db_options_
.db_log_dir
.empty() &&
227 immutable_db_options_
.db_log_dir
!= dbname_
) {
228 std::vector
<std::string
> info_log_files
;
230 env_
->GetChildren(immutable_db_options_
.db_log_dir
, &info_log_files
)
231 .PermitUncheckedError();
232 for (std::string
& log_file
: info_log_files
) {
233 job_context
->full_scan_candidate_files
.emplace_back(
234 log_file
, immutable_db_options_
.db_log_dir
);
239 // logs_ is empty when called during recovery, in which case there can't yet
240 // be any tracked obsolete logs
241 if (!alive_log_files_
.empty() && !logs_
.empty()) {
242 uint64_t min_log_number
= job_context
->log_number
;
243 size_t num_alive_log_files
= alive_log_files_
.size();
244 // find newly obsoleted log files
245 while (alive_log_files_
.begin()->number
< min_log_number
) {
246 auto& earliest
= *alive_log_files_
.begin();
247 if (immutable_db_options_
.recycle_log_file_num
>
248 log_recycle_files_
.size()) {
249 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
250 "adding log %" PRIu64
" to recycle list\n",
252 log_recycle_files_
.push_back(earliest
.number
);
254 job_context
->log_delete_files
.push_back(earliest
.number
);
256 if (job_context
->size_log_to_delete
== 0) {
257 job_context
->prev_total_log_size
= total_log_size_
;
258 job_context
->num_alive_log_files
= num_alive_log_files
;
260 job_context
->size_log_to_delete
+= earliest
.size
;
261 total_log_size_
-= earliest
.size
;
262 if (two_write_queues_
) {
263 log_write_mutex_
.Lock();
265 alive_log_files_
.pop_front();
266 if (two_write_queues_
) {
267 log_write_mutex_
.Unlock();
269 // Current log should always stay alive since it can't have
270 // number < MinLogNumber().
271 assert(alive_log_files_
.size());
273 while (!logs_
.empty() && logs_
.front().number
< min_log_number
) {
274 auto& log
= logs_
.front();
275 if (log
.getting_synced
) {
277 // logs_ could have changed while we were waiting.
280 logs_to_free_
.push_back(log
.ReleaseWriter());
282 InstrumentedMutexLock
wl(&log_write_mutex_
);
286 // Current log cannot be obsolete.
287 assert(!logs_
.empty());
290 // We're just cleaning up for DB::Write().
291 assert(job_context
->logs_to_free
.empty());
292 job_context
->logs_to_free
= logs_to_free_
;
293 job_context
->log_recycle_files
.assign(log_recycle_files_
.begin(),
294 log_recycle_files_
.end());
295 if (job_context
->HaveSomethingToDelete()) {
296 ++pending_purge_obsolete_files_
;
298 logs_to_free_
.clear();
302 bool CompareCandidateFile(const JobContext::CandidateFileInfo
& first
,
303 const JobContext::CandidateFileInfo
& second
) {
304 if (first
.file_name
> second
.file_name
) {
306 } else if (first
.file_name
< second
.file_name
) {
309 return (first
.file_path
> second
.file_path
);
314 // Delete obsolete files and log status and information of file deletion
315 void DBImpl::DeleteObsoleteFileImpl(int job_id
, const std::string
& fname
,
316 const std::string
& path_to_sync
,
317 FileType type
, uint64_t number
) {
318 TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl::BeforeDeletion",
319 const_cast<std::string
*>(&fname
));
321 Status file_deletion_status
;
322 if (type
== kTableFile
|| type
== kBlobFile
|| type
== kWalFile
) {
323 file_deletion_status
=
324 DeleteDBFile(&immutable_db_options_
, fname
, path_to_sync
,
325 /*force_bg=*/false, /*force_fg=*/!wal_in_db_path_
);
327 file_deletion_status
= env_
->DeleteFile(fname
);
329 TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion",
330 &file_deletion_status
);
331 if (file_deletion_status
.ok()) {
332 ROCKS_LOG_DEBUG(immutable_db_options_
.info_log
,
333 "[JOB %d] Delete %s type=%d #%" PRIu64
" -- %s\n", job_id
,
334 fname
.c_str(), type
, number
,
335 file_deletion_status
.ToString().c_str());
336 } else if (env_
->FileExists(fname
).IsNotFound()) {
338 immutable_db_options_
.info_log
,
339 "[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64
341 job_id
, fname
.c_str(), type
, number
,
342 file_deletion_status
.ToString().c_str());
344 ROCKS_LOG_ERROR(immutable_db_options_
.info_log
,
345 "[JOB %d] Failed to delete %s type=%d #%" PRIu64
" -- %s\n",
346 job_id
, fname
.c_str(), type
, number
,
347 file_deletion_status
.ToString().c_str());
349 if (type
== kTableFile
) {
350 EventHelpers::LogAndNotifyTableFileDeletion(
351 &event_logger_
, job_id
, number
, fname
, file_deletion_status
, GetName(),
352 immutable_db_options_
.listeners
);
356 // Diffs the files listed in filenames and those that do not
357 // belong to live files are possibly removed. Also, removes all the
358 // files in sst_delete_files and log_delete_files.
359 // It is not necessary to hold the mutex when invoking this method.
360 void DBImpl::PurgeObsoleteFiles(JobContext
& state
, bool schedule_only
) {
361 TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin");
362 // we'd better have sth to delete
363 assert(state
.HaveSomethingToDelete());
365 // FindObsoleteFiles() should've populated this so nonzero
366 assert(state
.manifest_file_number
!= 0);
368 // Now, convert lists to unordered sets, WITHOUT mutex held; set is slow.
369 std::unordered_set
<uint64_t> sst_live_set(state
.sst_live
.begin(),
370 state
.sst_live
.end());
371 std::unordered_set
<uint64_t> blob_live_set(state
.blob_live
.begin(),
372 state
.blob_live
.end());
373 std::unordered_set
<uint64_t> log_recycle_files_set(
374 state
.log_recycle_files
.begin(), state
.log_recycle_files
.end());
376 auto candidate_files
= state
.full_scan_candidate_files
;
377 candidate_files
.reserve(
378 candidate_files
.size() + state
.sst_delete_files
.size() +
379 state
.blob_delete_files
.size() + state
.log_delete_files
.size() +
380 state
.manifest_delete_files
.size());
381 // We may ignore the dbname when generating the file names.
382 for (auto& file
: state
.sst_delete_files
) {
383 candidate_files
.emplace_back(
384 MakeTableFileName(file
.metadata
->fd
.GetNumber()), file
.path
);
385 if (file
.metadata
->table_reader_handle
) {
386 table_cache_
->Release(file
.metadata
->table_reader_handle
);
388 file
.DeleteMetadata();
391 for (const auto& blob_file
: state
.blob_delete_files
) {
392 candidate_files
.emplace_back(BlobFileName(blob_file
.GetBlobFileNumber()),
393 blob_file
.GetPath());
396 for (auto file_num
: state
.log_delete_files
) {
398 candidate_files
.emplace_back(LogFileName(file_num
),
399 immutable_db_options_
.wal_dir
);
402 for (const auto& filename
: state
.manifest_delete_files
) {
403 candidate_files
.emplace_back(filename
, dbname_
);
406 // dedup state.candidate_files so we don't try to delete the same
408 std::sort(candidate_files
.begin(), candidate_files
.end(),
409 CompareCandidateFile
);
410 candidate_files
.erase(
411 std::unique(candidate_files
.begin(), candidate_files
.end()),
412 candidate_files
.end());
414 if (state
.prev_total_log_size
> 0) {
415 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
416 "[JOB %d] Try to delete WAL files size %" PRIu64
417 ", prev total WAL file size %" PRIu64
418 ", number of live WAL files %" ROCKSDB_PRIszt
".\n",
419 state
.job_id
, state
.size_log_to_delete
,
420 state
.prev_total_log_size
, state
.num_alive_log_files
);
423 std::vector
<std::string
> old_info_log_files
;
424 InfoLogPrefix
info_log_prefix(!immutable_db_options_
.db_log_dir
.empty(),
427 // File numbers of most recent two OPTIONS file in candidate_files (found in
428 // previos FindObsoleteFiles(full_scan=true))
429 // At this point, there must not be any duplicate file numbers in
431 uint64_t optsfile_num1
= std::numeric_limits
<uint64_t>::min();
432 uint64_t optsfile_num2
= std::numeric_limits
<uint64_t>::min();
433 for (const auto& candidate_file
: candidate_files
) {
434 const std::string
& fname
= candidate_file
.file_name
;
437 if (!ParseFileName(fname
, &number
, info_log_prefix
.prefix
, &type
) ||
438 type
!= kOptionsFile
) {
441 if (number
> optsfile_num1
) {
442 optsfile_num2
= optsfile_num1
;
443 optsfile_num1
= number
;
444 } else if (number
> optsfile_num2
) {
445 optsfile_num2
= number
;
449 // Close WALs before trying to delete them.
450 for (const auto w
: state
.logs_to_free
) {
451 // TODO: maybe check the return value of Close.
453 s
.PermitUncheckedError();
456 bool own_files
= OwnTablesAndLogs();
457 std::unordered_set
<uint64_t> files_to_del
;
458 for (const auto& candidate_file
: candidate_files
) {
459 const std::string
& to_delete
= candidate_file
.file_name
;
462 // Ignore file if we cannot recognize it.
463 if (!ParseFileName(to_delete
, &number
, info_log_prefix
.prefix
, &type
)) {
470 keep
= ((number
>= state
.log_number
) ||
471 (number
== state
.prev_log_number
) ||
472 (log_recycle_files_set
.find(number
) !=
473 log_recycle_files_set
.end()));
475 case kDescriptorFile
:
476 // Keep my manifest file, and any newer incarnations'
477 // (can happen during manifest roll)
478 keep
= (number
>= state
.manifest_file_number
);
481 // If the second condition is not there, this makes
482 // DontDeletePendingOutputs fail
483 keep
= (sst_live_set
.find(number
) != sst_live_set
.end()) ||
484 number
>= state
.min_pending_output
;
486 files_to_del
.insert(number
);
490 keep
= number
>= state
.min_pending_output
||
491 (blob_live_set
.find(number
) != blob_live_set
.end());
493 files_to_del
.insert(number
);
497 // Any temp files that are currently being written to must
498 // be recorded in pending_outputs_, which is inserted into "live".
499 // Also, SetCurrentFile creates a temp file when writing out new
500 // manifest, which is equal to state.pending_manifest_file_number. We
501 // should not delete that file
503 // TODO(yhchiang): carefully modify the third condition to safely
504 // remove the temp options files.
505 keep
= (sst_live_set
.find(number
) != sst_live_set
.end()) ||
506 (blob_live_set
.find(number
) != blob_live_set
.end()) ||
507 (number
== state
.pending_manifest_file_number
) ||
508 (to_delete
.find(kOptionsFileNamePrefix
) != std::string::npos
);
513 old_info_log_files
.push_back(to_delete
);
517 keep
= (number
>= optsfile_num2
);
518 TEST_SYNC_POINT_CALLBACK(
519 "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:1",
520 reinterpret_cast<void*>(&number
));
521 TEST_SYNC_POINT_CALLBACK(
522 "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:2",
523 reinterpret_cast<void*>(&keep
));
538 std::string dir_to_sync
;
539 if (type
== kTableFile
) {
541 TableCache::Evict(table_cache_
.get(), number
);
542 fname
= MakeTableFileName(candidate_file
.file_path
, number
);
543 dir_to_sync
= candidate_file
.file_path
;
544 } else if (type
== kBlobFile
) {
545 fname
= BlobFileName(candidate_file
.file_path
, number
);
546 dir_to_sync
= candidate_file
.file_path
;
549 (type
== kWalFile
) ? immutable_db_options_
.wal_dir
: dbname_
;
550 fname
= dir_to_sync
+
551 ((!dir_to_sync
.empty() && dir_to_sync
.back() == '/') ||
552 (!to_delete
.empty() && to_delete
.front() == '/')
559 if (type
== kWalFile
&& (immutable_db_options_
.wal_ttl_seconds
> 0 ||
560 immutable_db_options_
.wal_size_limit_mb
> 0)) {
561 wal_manager_
.ArchiveWALFile(fname
, number
);
564 #endif // !ROCKSDB_LITE
566 // If I do not own these files, e.g. secondary instance with max_open_files
567 // = -1, then no need to delete or schedule delete these files since they
568 // will be removed by their owner, e.g. the primary instance.
573 InstrumentedMutexLock
guard_lock(&mutex_
);
574 SchedulePendingPurge(fname
, dir_to_sync
, type
, number
, state
.job_id
);
576 DeleteObsoleteFileImpl(state
.job_id
, fname
, dir_to_sync
, type
, number
);
581 // After purging obsolete files, remove them from files_grabbed_for_purge_.
582 InstrumentedMutexLock
guard_lock(&mutex_
);
583 autovector
<uint64_t> to_be_removed
;
584 for (auto fn
: files_grabbed_for_purge_
) {
585 if (files_to_del
.count(fn
) != 0) {
586 to_be_removed
.emplace_back(fn
);
589 for (auto fn
: to_be_removed
) {
590 files_grabbed_for_purge_
.erase(fn
);
594 // Delete old info log files.
595 size_t old_info_log_file_count
= old_info_log_files
.size();
596 if (old_info_log_file_count
!= 0 &&
597 old_info_log_file_count
>= immutable_db_options_
.keep_log_file_num
) {
598 std::sort(old_info_log_files
.begin(), old_info_log_files
.end());
600 old_info_log_file_count
- immutable_db_options_
.keep_log_file_num
;
601 for (unsigned int i
= 0; i
<= end
; i
++) {
602 std::string
& to_delete
= old_info_log_files
.at(i
);
603 std::string full_path_to_delete
=
604 (immutable_db_options_
.db_log_dir
.empty()
606 : immutable_db_options_
.db_log_dir
) +
608 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
609 "[JOB %d] Delete info log file %s\n", state
.job_id
,
610 full_path_to_delete
.c_str());
611 Status s
= env_
->DeleteFile(full_path_to_delete
);
613 if (env_
->FileExists(full_path_to_delete
).IsNotFound()) {
615 immutable_db_options_
.info_log
,
616 "[JOB %d] Tried to delete non-existing info log file %s FAILED "
618 state
.job_id
, to_delete
.c_str(), s
.ToString().c_str());
620 ROCKS_LOG_ERROR(immutable_db_options_
.info_log
,
621 "[JOB %d] Delete info log file %s FAILED -- %s\n",
622 state
.job_id
, to_delete
.c_str(),
623 s
.ToString().c_str());
629 wal_manager_
.PurgeObsoleteWALFiles();
630 #endif // ROCKSDB_LITE
631 LogFlush(immutable_db_options_
.info_log
);
632 InstrumentedMutexLock
l(&mutex_
);
633 --pending_purge_obsolete_files_
;
634 assert(pending_purge_obsolete_files_
>= 0);
635 if (pending_purge_obsolete_files_
== 0) {
638 TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:End");
641 void DBImpl::DeleteObsoleteFiles() {
643 JobContext
job_context(next_job_id_
.fetch_add(1));
644 FindObsoleteFiles(&job_context
, true);
647 if (job_context
.HaveSomethingToDelete()) {
648 PurgeObsoleteFiles(job_context
);
654 uint64_t FindMinPrepLogReferencedByMemTable(
655 VersionSet
* vset
, const ColumnFamilyData
* cfd_to_flush
,
656 const autovector
<MemTable
*>& memtables_to_flush
) {
657 uint64_t min_log
= 0;
659 // we must look through the memtables for two phase transactions
660 // that have been committed but not yet flushed
661 for (auto loop_cfd
: *vset
->GetColumnFamilySet()) {
662 if (loop_cfd
->IsDropped() || loop_cfd
== cfd_to_flush
) {
666 auto log
= loop_cfd
->imm()->PrecomputeMinLogContainingPrepSection(
669 if (log
> 0 && (min_log
== 0 || log
< min_log
)) {
673 log
= loop_cfd
->mem()->GetMinLogContainingPrepSection();
675 if (log
> 0 && (min_log
== 0 || log
< min_log
)) {
683 uint64_t PrecomputeMinLogNumberToKeepNon2PC(
684 VersionSet
* vset
, const ColumnFamilyData
& cfd_to_flush
,
685 const autovector
<VersionEdit
*>& edit_list
) {
686 assert(vset
!= nullptr);
688 // Precompute the min log number containing unflushed data for the column
689 // family being flushed (`cfd_to_flush`).
690 uint64_t cf_min_log_number_to_keep
= 0;
691 for (auto& e
: edit_list
) {
692 if (e
->HasLogNumber()) {
693 cf_min_log_number_to_keep
=
694 std::max(cf_min_log_number_to_keep
, e
->GetLogNumber());
697 if (cf_min_log_number_to_keep
== 0) {
698 // No version edit contains information on log number. The log number
699 // for this column family should stay the same as it is.
700 cf_min_log_number_to_keep
= cfd_to_flush
.GetLogNumber();
703 // Get min log number containing unflushed data for other column families.
704 uint64_t min_log_number_to_keep
=
705 vset
->PreComputeMinLogNumberWithUnflushedData(&cfd_to_flush
);
706 if (cf_min_log_number_to_keep
!= 0) {
707 min_log_number_to_keep
=
708 std::min(cf_min_log_number_to_keep
, min_log_number_to_keep
);
710 return min_log_number_to_keep
;
713 uint64_t PrecomputeMinLogNumberToKeep2PC(
714 VersionSet
* vset
, const ColumnFamilyData
& cfd_to_flush
,
715 const autovector
<VersionEdit
*>& edit_list
,
716 const autovector
<MemTable
*>& memtables_to_flush
,
717 LogsWithPrepTracker
* prep_tracker
) {
718 assert(vset
!= nullptr);
719 assert(prep_tracker
!= nullptr);
720 // Calculate updated min_log_number_to_keep
721 // Since the function should only be called in 2pc mode, log number in
722 // the version edit should be sufficient.
724 uint64_t min_log_number_to_keep
=
725 PrecomputeMinLogNumberToKeepNon2PC(vset
, cfd_to_flush
, edit_list
);
727 // if are 2pc we must consider logs containing prepared
728 // sections of outstanding transactions.
730 // We must check min logs with outstanding prep before we check
731 // logs references by memtables because a log referenced by the
732 // first data structure could transition to the second under us.
734 // TODO: iterating over all column families under db mutex.
735 // should find more optimal solution
736 auto min_log_in_prep_heap
=
737 prep_tracker
->FindMinLogContainingOutstandingPrep();
739 if (min_log_in_prep_heap
!= 0 &&
740 min_log_in_prep_heap
< min_log_number_to_keep
) {
741 min_log_number_to_keep
= min_log_in_prep_heap
;
744 uint64_t min_log_refed_by_mem
= FindMinPrepLogReferencedByMemTable(
745 vset
, &cfd_to_flush
, memtables_to_flush
);
747 if (min_log_refed_by_mem
!= 0 &&
748 min_log_refed_by_mem
< min_log_number_to_keep
) {
749 min_log_number_to_keep
= min_log_refed_by_mem
;
751 return min_log_number_to_keep
;
754 Status
DBImpl::SetDBId() {
756 // Happens when immutable_db_options_.write_dbid_to_manifest is set to true
757 // the very first time.
758 if (db_id_
.empty()) {
759 // Check for the IDENTITY file and create it if not there.
760 s
= fs_
->FileExists(IdentityFileName(dbname_
), IOOptions(), nullptr);
761 // Typically Identity file is created in NewDB() and for some reason if
762 // it is no longer available then at this point DB ID is not in Identity
764 if (s
.IsNotFound()) {
765 s
= SetIdentityFile(env_
, dbname_
);
769 } else if (!s
.ok()) {
770 assert(s
.IsIOError());
773 s
= GetDbIdentityFromIdentityFile(&db_id_
);
774 if (immutable_db_options_
.write_dbid_to_manifest
&& s
.ok()) {
776 edit
.SetDBId(db_id_
);
778 MutableCFOptions
mutable_cf_options(options
);
779 versions_
->db_id_
= db_id_
;
780 s
= versions_
->LogAndApply(versions_
->GetColumnFamilySet()->GetDefault(),
781 mutable_cf_options
, &edit
, &mutex_
, nullptr,
782 /* new_descriptor_log */ false);
785 s
= SetIdentityFile(env_
, dbname_
, db_id_
);
790 Status
DBImpl::DeleteUnreferencedSstFiles() {
792 std::vector
<std::string
> paths
;
793 paths
.push_back(NormalizePath(dbname_
+ std::string(1, kFilePathSeparator
)));
794 for (const auto& db_path
: immutable_db_options_
.db_paths
) {
796 NormalizePath(db_path
.path
+ std::string(1, kFilePathSeparator
)));
798 for (const auto* cfd
: *versions_
->GetColumnFamilySet()) {
799 for (const auto& cf_path
: cfd
->ioptions()->cf_paths
) {
801 NormalizePath(cf_path
.path
+ std::string(1, kFilePathSeparator
)));
805 std::sort(paths
.begin(), paths
.end());
806 paths
.erase(std::unique(paths
.begin(), paths
.end()), paths
.end());
808 uint64_t next_file_number
= versions_
->current_next_file_number();
809 uint64_t largest_file_number
= next_file_number
;
810 std::set
<std::string
> files_to_delete
;
812 for (const auto& path
: paths
) {
813 std::vector
<std::string
> files
;
814 s
= env_
->GetChildren(path
, &files
);
818 for (const auto& fname
: files
) {
821 if (!ParseFileName(fname
, &number
, &type
)) {
824 // path ends with '/' or '\\'
825 const std::string normalized_fpath
= path
+ fname
;
826 largest_file_number
= std::max(largest_file_number
, number
);
827 if (type
== kTableFile
&& number
>= next_file_number
&&
828 files_to_delete
.find(normalized_fpath
) == files_to_delete
.end()) {
829 files_to_delete
.insert(normalized_fpath
);
837 if (largest_file_number
> next_file_number
) {
838 versions_
->next_file_number_
.store(largest_file_number
+ 1);
842 edit
.SetNextFile(versions_
->next_file_number_
.load());
843 assert(versions_
->GetColumnFamilySet());
844 ColumnFamilyData
* default_cfd
= versions_
->GetColumnFamilySet()->GetDefault();
846 s
= versions_
->LogAndApply(
847 default_cfd
, *default_cfd
->GetLatestMutableCFOptions(), &edit
, &mutex_
,
848 directories_
.GetDbDir(), /*new_descriptor_log*/ false);
854 for (const auto& fname
: files_to_delete
) {
855 s
= env_
->DeleteFile(fname
);
864 } // namespace ROCKSDB_NAMESPACE