]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_impl_files.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / db_impl_files.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9#include "db/db_impl.h"
10
11#ifndef __STDC_FORMAT_MACROS
12#define __STDC_FORMAT_MACROS
13#endif
14#include <inttypes.h>
11fdf7f2
TL
15#include <set>
16#include <unordered_set>
7c673cae 17#include "db/event_helpers.h"
11fdf7f2 18#include "db/memtable_list.h"
7c673cae
FG
19#include "util/file_util.h"
20#include "util/sst_file_manager_impl.h"
21
7c673cae 22namespace rocksdb {
494da23a 23
7c673cae 24uint64_t DBImpl::MinLogNumberToKeep() {
7c673cae 25 if (allow_2pc()) {
11fdf7f2
TL
26 return versions_->min_log_number_to_keep_2pc();
27 } else {
28 return versions_->MinLogNumberWithUnflushedData();
7c673cae 29 }
7c673cae
FG
30}
31
494da23a
TL
32uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
33 mutex_.AssertHeld();
34 if (!pending_outputs_.empty()) {
35 return *pending_outputs_.begin();
36 }
37 return std::numeric_limits<uint64_t>::max();
38}
39
7c673cae
FG
40// * Returns the list of live files in 'sst_live'
41// If it's doing full scan:
42// * Returns the list of all files in the filesystem in
43// 'full_scan_candidate_files'.
44// Otherwise, gets obsolete files from VersionSet.
45// no_full_scan = true -- never do the full scan using GetChildren()
46// force = false -- don't force the full scan, except every
47// mutable_db_options_.delete_obsolete_files_period_micros
48// force = true -- force the full scan
49void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
50 bool no_full_scan) {
51 mutex_.AssertHeld();
52
53 // if deletion is disabled, do nothing
54 if (disable_delete_obsolete_files_ > 0) {
55 return;
56 }
57
58 bool doing_the_full_scan = false;
59
11fdf7f2 60 // logic for figuring out if we're doing the full scan
7c673cae
FG
61 if (no_full_scan) {
62 doing_the_full_scan = false;
63 } else if (force ||
64 mutable_db_options_.delete_obsolete_files_period_micros == 0) {
65 doing_the_full_scan = true;
66 } else {
67 const uint64_t now_micros = env_->NowMicros();
68 if ((delete_obsolete_files_last_run_ +
69 mutable_db_options_.delete_obsolete_files_period_micros) <
70 now_micros) {
71 doing_the_full_scan = true;
72 delete_obsolete_files_last_run_ = now_micros;
73 }
74 }
75
76 // don't delete files that might be currently written to from compaction
77 // threads
78 // Since job_context->min_pending_output is set, until file scan finishes,
79 // mutex_ cannot be released. Otherwise, we might see no min_pending_output
11fdf7f2 80 // here but later find newer generated unfinalized files while scanning.
7c673cae
FG
81 if (!pending_outputs_.empty()) {
82 job_context->min_pending_output = *pending_outputs_.begin();
83 } else {
84 // delete all of them
85 job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
86 }
87
88 // Get obsolete files. This function will also update the list of
89 // pending files in VersionSet().
90 versions_->GetObsoleteFiles(&job_context->sst_delete_files,
91 &job_context->manifest_delete_files,
92 job_context->min_pending_output);
93
11fdf7f2
TL
94 // Mark the elements in job_context->sst_delete_files as grabbedForPurge
95 // so that other threads calling FindObsoleteFiles with full_scan=true
96 // will not add these files to candidate list for purge.
97 for (const auto& sst_to_del : job_context->sst_delete_files) {
98 MarkAsGrabbedForPurge(sst_to_del.metadata->fd.GetNumber());
99 }
100
7c673cae
FG
101 // store the current filenum, lognum, etc
102 job_context->manifest_file_number = versions_->manifest_file_number();
103 job_context->pending_manifest_file_number =
104 versions_->pending_manifest_file_number();
105 job_context->log_number = MinLogNumberToKeep();
7c673cae
FG
106 job_context->prev_log_number = versions_->prev_log_number();
107
108 versions_->AddLiveFiles(&job_context->sst_live);
109 if (doing_the_full_scan) {
11fdf7f2 110 InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
494da23a 111 dbname_);
11fdf7f2 112 std::set<std::string> paths;
7c673cae
FG
113 for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
114 path_id++) {
11fdf7f2
TL
115 paths.insert(immutable_db_options_.db_paths[path_id].path);
116 }
117
118 // Note that if cf_paths is not specified in the ColumnFamilyOptions
119 // of a particular column family, we use db_paths as the cf_paths
120 // setting. Hence, there can be multiple duplicates of files from db_paths
121 // in the following code. The duplicate are removed while identifying
122 // unique files in PurgeObsoleteFiles.
123 for (auto cfd : *versions_->GetColumnFamilySet()) {
124 for (size_t path_id = 0; path_id < cfd->ioptions()->cf_paths.size();
125 path_id++) {
126 auto& path = cfd->ioptions()->cf_paths[path_id].path;
127
128 if (paths.find(path) == paths.end()) {
129 paths.insert(path);
130 }
131 }
132 }
133
134 for (auto& path : paths) {
7c673cae
FG
135 // set of all files in the directory. We'll exclude files that are still
136 // alive in the subsequent processings.
137 std::vector<std::string> files;
11fdf7f2
TL
138 env_->GetChildren(path, &files); // Ignore errors
139 for (const std::string& file : files) {
140 uint64_t number;
141 FileType type;
142 // 1. If we cannot parse the file name, we skip;
143 // 2. If the file with file_number equals number has already been
144 // grabbed for purge by another compaction job, or it has already been
145 // schedule for purge, we also skip it if we
146 // are doing full scan in order to avoid double deletion of the same
147 // file under race conditions. See
148 // https://github.com/facebook/rocksdb/issues/3573
149 if (!ParseFileName(file, &number, info_log_prefix.prefix, &type) ||
150 !ShouldPurge(number)) {
151 continue;
152 }
153
7c673cae 154 // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
494da23a 155 job_context->full_scan_candidate_files.emplace_back("/" + file, path);
7c673cae
FG
156 }
157 }
158
159 // Add log files in wal_dir
160 if (immutable_db_options_.wal_dir != dbname_) {
161 std::vector<std::string> log_files;
162 env_->GetChildren(immutable_db_options_.wal_dir,
163 &log_files); // Ignore errors
11fdf7f2 164 for (const std::string& log_file : log_files) {
494da23a
TL
165 job_context->full_scan_candidate_files.emplace_back(
166 log_file, immutable_db_options_.wal_dir);
7c673cae
FG
167 }
168 }
169 // Add info log files in db_log_dir
170 if (!immutable_db_options_.db_log_dir.empty() &&
171 immutable_db_options_.db_log_dir != dbname_) {
172 std::vector<std::string> info_log_files;
173 // Ignore errors
174 env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files);
11fdf7f2 175 for (std::string& log_file : info_log_files) {
494da23a
TL
176 job_context->full_scan_candidate_files.emplace_back(
177 log_file, immutable_db_options_.db_log_dir);
7c673cae
FG
178 }
179 }
180 }
181
182 // logs_ is empty when called during recovery, in which case there can't yet
183 // be any tracked obsolete logs
184 if (!alive_log_files_.empty() && !logs_.empty()) {
185 uint64_t min_log_number = job_context->log_number;
186 size_t num_alive_log_files = alive_log_files_.size();
187 // find newly obsoleted log files
188 while (alive_log_files_.begin()->number < min_log_number) {
189 auto& earliest = *alive_log_files_.begin();
190 if (immutable_db_options_.recycle_log_file_num >
11fdf7f2 191 log_recycle_files_.size()) {
7c673cae
FG
192 ROCKS_LOG_INFO(immutable_db_options_.info_log,
193 "adding log %" PRIu64 " to recycle list\n",
194 earliest.number);
11fdf7f2 195 log_recycle_files_.push_back(earliest.number);
7c673cae
FG
196 } else {
197 job_context->log_delete_files.push_back(earliest.number);
198 }
199 if (job_context->size_log_to_delete == 0) {
200 job_context->prev_total_log_size = total_log_size_;
201 job_context->num_alive_log_files = num_alive_log_files;
202 }
203 job_context->size_log_to_delete += earliest.size;
204 total_log_size_ -= earliest.size;
11fdf7f2
TL
205 if (two_write_queues_) {
206 log_write_mutex_.Lock();
207 }
7c673cae 208 alive_log_files_.pop_front();
11fdf7f2
TL
209 if (two_write_queues_) {
210 log_write_mutex_.Unlock();
211 }
7c673cae
FG
212 // Current log should always stay alive since it can't have
213 // number < MinLogNumber().
214 assert(alive_log_files_.size());
215 }
216 while (!logs_.empty() && logs_.front().number < min_log_number) {
217 auto& log = logs_.front();
218 if (log.getting_synced) {
219 log_sync_cv_.Wait();
220 // logs_ could have changed while we were waiting.
221 continue;
222 }
223 logs_to_free_.push_back(log.ReleaseWriter());
11fdf7f2
TL
224 {
225 InstrumentedMutexLock wl(&log_write_mutex_);
226 logs_.pop_front();
227 }
7c673cae
FG
228 }
229 // Current log cannot be obsolete.
230 assert(!logs_.empty());
231 }
232
233 // We're just cleaning up for DB::Write().
234 assert(job_context->logs_to_free.empty());
235 job_context->logs_to_free = logs_to_free_;
11fdf7f2
TL
236 job_context->log_recycle_files.assign(log_recycle_files_.begin(),
237 log_recycle_files_.end());
238 if (job_context->HaveSomethingToDelete()) {
239 ++pending_purge_obsolete_files_;
240 }
7c673cae
FG
241 logs_to_free_.clear();
242}
243
244namespace {
245bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
246 const JobContext::CandidateFileInfo& second) {
247 if (first.file_name > second.file_name) {
248 return true;
249 } else if (first.file_name < second.file_name) {
250 return false;
251 } else {
11fdf7f2 252 return (first.file_path > second.file_path);
7c673cae
FG
253 }
254}
255}; // namespace
256
257// Delete obsolete files and log status and information of file deletion
11fdf7f2
TL
258void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
259 const std::string& path_to_sync,
260 FileType type, uint64_t number) {
261 Status file_deletion_status;
494da23a 262 if (type == kTableFile || type == kLogFile) {
7c673cae 263 file_deletion_status =
494da23a 264 DeleteDBFile(&immutable_db_options_, fname, path_to_sync);
7c673cae
FG
265 } else {
266 file_deletion_status = env_->DeleteFile(fname);
267 }
11fdf7f2 268 TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion",
494da23a 269 &file_deletion_status);
7c673cae
FG
270 if (file_deletion_status.ok()) {
271 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
272 "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id,
273 fname.c_str(), type, number,
274 file_deletion_status.ToString().c_str());
275 } else if (env_->FileExists(fname).IsNotFound()) {
276 ROCKS_LOG_INFO(
277 immutable_db_options_.info_log,
278 "[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64
279 " -- %s\n",
280 job_id, fname.c_str(), type, number,
281 file_deletion_status.ToString().c_str());
282 } else {
283 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
284 "[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n",
285 job_id, fname.c_str(), type, number,
286 file_deletion_status.ToString().c_str());
287 }
288 if (type == kTableFile) {
289 EventHelpers::LogAndNotifyTableFileDeletion(
290 &event_logger_, job_id, number, fname, file_deletion_status, GetName(),
291 immutable_db_options_.listeners);
292 }
293}
294
295// Diffs the files listed in filenames and those that do not
11fdf7f2 296// belong to live files are possibly removed. Also, removes all the
7c673cae
FG
297// files in sst_delete_files and log_delete_files.
298// It is not necessary to hold the mutex when invoking this method.
11fdf7f2
TL
299void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
300 TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin");
7c673cae
FG
301 // we'd better have sth to delete
302 assert(state.HaveSomethingToDelete());
303
11fdf7f2
TL
304 // FindObsoleteFiles() should've populated this so nonzero
305 assert(state.manifest_file_number != 0);
7c673cae
FG
306
307 // Now, convert live list to an unordered map, WITHOUT mutex held;
308 // set is slow.
309 std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
310 for (const FileDescriptor& fd : state.sst_live) {
311 sst_live_map[fd.GetNumber()] = &fd;
312 }
313 std::unordered_set<uint64_t> log_recycle_files_set(
314 state.log_recycle_files.begin(), state.log_recycle_files.end());
315
316 auto candidate_files = state.full_scan_candidate_files;
317 candidate_files.reserve(
318 candidate_files.size() + state.sst_delete_files.size() +
319 state.log_delete_files.size() + state.manifest_delete_files.size());
320 // We may ignore the dbname when generating the file names.
321 const char* kDumbDbName = "";
11fdf7f2 322 for (auto& file : state.sst_delete_files) {
7c673cae 323 candidate_files.emplace_back(
494da23a
TL
324 MakeTableFileName(kDumbDbName, file.metadata->fd.GetNumber()),
325 file.path);
11fdf7f2
TL
326 if (file.metadata->table_reader_handle) {
327 table_cache_->Release(file.metadata->table_reader_handle);
328 }
329 file.DeleteMetadata();
7c673cae
FG
330 }
331
332 for (auto file_num : state.log_delete_files) {
333 if (file_num > 0) {
11fdf7f2 334 candidate_files.emplace_back(LogFileName(kDumbDbName, file_num),
494da23a 335 immutable_db_options_.wal_dir);
7c673cae
FG
336 }
337 }
338 for (const auto& filename : state.manifest_delete_files) {
11fdf7f2 339 candidate_files.emplace_back(filename, dbname_);
7c673cae
FG
340 }
341
342 // dedup state.candidate_files so we don't try to delete the same
343 // file twice
344 std::sort(candidate_files.begin(), candidate_files.end(),
345 CompareCandidateFile);
346 candidate_files.erase(
347 std::unique(candidate_files.begin(), candidate_files.end()),
348 candidate_files.end());
349
350 if (state.prev_total_log_size > 0) {
351 ROCKS_LOG_INFO(immutable_db_options_.info_log,
352 "[JOB %d] Try to delete WAL files size %" PRIu64
353 ", prev total WAL file size %" PRIu64
354 ", number of live WAL files %" ROCKSDB_PRIszt ".\n",
355 state.job_id, state.size_log_to_delete,
356 state.prev_total_log_size, state.num_alive_log_files);
357 }
358
359 std::vector<std::string> old_info_log_files;
360 InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
361 dbname_);
11fdf7f2
TL
362
363 // File numbers of most recent two OPTIONS file in candidate_files (found in
364 // previos FindObsoleteFiles(full_scan=true))
365 // At this point, there must not be any duplicate file numbers in
366 // candidate_files.
367 uint64_t optsfile_num1 = std::numeric_limits<uint64_t>::min();
368 uint64_t optsfile_num2 = std::numeric_limits<uint64_t>::min();
7c673cae 369 for (const auto& candidate_file : candidate_files) {
11fdf7f2
TL
370 const std::string& fname = candidate_file.file_name;
371 uint64_t number;
372 FileType type;
373 if (!ParseFileName(fname, &number, info_log_prefix.prefix, &type) ||
374 type != kOptionsFile) {
375 continue;
376 }
377 if (number > optsfile_num1) {
378 optsfile_num2 = optsfile_num1;
379 optsfile_num1 = number;
380 } else if (number > optsfile_num2) {
381 optsfile_num2 = number;
382 }
383 }
384
385 std::unordered_set<uint64_t> files_to_del;
386 for (const auto& candidate_file : candidate_files) {
387 const std::string& to_delete = candidate_file.file_name;
7c673cae
FG
388 uint64_t number;
389 FileType type;
390 // Ignore file if we cannot recognize it.
391 if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) {
392 continue;
393 }
394
395 bool keep = true;
396 switch (type) {
397 case kLogFile:
398 keep = ((number >= state.log_number) ||
399 (number == state.prev_log_number) ||
400 (log_recycle_files_set.find(number) !=
401 log_recycle_files_set.end()));
402 break;
403 case kDescriptorFile:
404 // Keep my manifest file, and any newer incarnations'
405 // (can happen during manifest roll)
406 keep = (number >= state.manifest_file_number);
407 break;
408 case kTableFile:
409 // If the second condition is not there, this makes
410 // DontDeletePendingOutputs fail
411 keep = (sst_live_map.find(number) != sst_live_map.end()) ||
412 number >= state.min_pending_output;
11fdf7f2
TL
413 if (!keep) {
414 files_to_del.insert(number);
415 }
7c673cae
FG
416 break;
417 case kTempFile:
418 // Any temp files that are currently being written to must
419 // be recorded in pending_outputs_, which is inserted into "live".
420 // Also, SetCurrentFile creates a temp file when writing out new
421 // manifest, which is equal to state.pending_manifest_file_number. We
422 // should not delete that file
423 //
424 // TODO(yhchiang): carefully modify the third condition to safely
425 // remove the temp options files.
426 keep = (sst_live_map.find(number) != sst_live_map.end()) ||
427 (number == state.pending_manifest_file_number) ||
428 (to_delete.find(kOptionsFileNamePrefix) != std::string::npos);
429 break;
430 case kInfoLogFile:
431 keep = true;
432 if (number != 0) {
433 old_info_log_files.push_back(to_delete);
434 }
435 break;
11fdf7f2
TL
436 case kOptionsFile:
437 keep = (number >= optsfile_num2);
438 TEST_SYNC_POINT_CALLBACK(
439 "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:1",
440 reinterpret_cast<void*>(&number));
441 TEST_SYNC_POINT_CALLBACK(
442 "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:2",
443 reinterpret_cast<void*>(&keep));
444 break;
7c673cae
FG
445 case kCurrentFile:
446 case kDBLockFile:
447 case kIdentityFile:
448 case kMetaDatabase:
7c673cae
FG
449 case kBlobFile:
450 keep = true;
451 break;
452 }
453
454 if (keep) {
455 continue;
456 }
457
458 std::string fname;
11fdf7f2 459 std::string dir_to_sync;
7c673cae
FG
460 if (type == kTableFile) {
461 // evict from cache
462 TableCache::Evict(table_cache_.get(), number);
11fdf7f2
TL
463 fname = MakeTableFileName(candidate_file.file_path, number);
464 dir_to_sync = candidate_file.file_path;
7c673cae 465 } else {
11fdf7f2
TL
466 dir_to_sync =
467 (type == kLogFile) ? immutable_db_options_.wal_dir : dbname_;
494da23a
TL
468 fname = dir_to_sync +
469 ((!dir_to_sync.empty() && dir_to_sync.back() == '/') ||
470 (!to_delete.empty() && to_delete.front() == '/')
471 ? ""
472 : "/") +
473 to_delete;
7c673cae
FG
474 }
475
476#ifndef ROCKSDB_LITE
477 if (type == kLogFile && (immutable_db_options_.wal_ttl_seconds > 0 ||
478 immutable_db_options_.wal_size_limit_mb > 0)) {
479 wal_manager_.ArchiveWALFile(fname, number);
480 continue;
481 }
482#endif // !ROCKSDB_LITE
483
484 Status file_deletion_status;
485 if (schedule_only) {
486 InstrumentedMutexLock guard_lock(&mutex_);
11fdf7f2 487 SchedulePendingPurge(fname, dir_to_sync, type, number, state.job_id);
7c673cae 488 } else {
11fdf7f2 489 DeleteObsoleteFileImpl(state.job_id, fname, dir_to_sync, type, number);
7c673cae
FG
490 }
491 }
492
11fdf7f2
TL
493 {
494 // After purging obsolete files, remove them from files_grabbed_for_purge_.
495 // Use a temporary vector to perform bulk deletion via swap.
496 InstrumentedMutexLock guard_lock(&mutex_);
497 std::vector<uint64_t> tmp;
498 for (auto fn : files_grabbed_for_purge_) {
499 if (files_to_del.count(fn) == 0) {
500 tmp.emplace_back(fn);
501 }
502 }
503 files_grabbed_for_purge_.swap(tmp);
504 }
505
7c673cae
FG
506 // Delete old info log files.
507 size_t old_info_log_file_count = old_info_log_files.size();
508 if (old_info_log_file_count != 0 &&
509 old_info_log_file_count >= immutable_db_options_.keep_log_file_num) {
510 std::sort(old_info_log_files.begin(), old_info_log_files.end());
511 size_t end =
512 old_info_log_file_count - immutable_db_options_.keep_log_file_num;
513 for (unsigned int i = 0; i <= end; i++) {
514 std::string& to_delete = old_info_log_files.at(i);
515 std::string full_path_to_delete =
516 (immutable_db_options_.db_log_dir.empty()
517 ? dbname_
518 : immutable_db_options_.db_log_dir) +
519 "/" + to_delete;
520 ROCKS_LOG_INFO(immutable_db_options_.info_log,
521 "[JOB %d] Delete info log file %s\n", state.job_id,
522 full_path_to_delete.c_str());
523 Status s = env_->DeleteFile(full_path_to_delete);
524 if (!s.ok()) {
525 if (env_->FileExists(full_path_to_delete).IsNotFound()) {
526 ROCKS_LOG_INFO(
527 immutable_db_options_.info_log,
528 "[JOB %d] Tried to delete non-existing info log file %s FAILED "
529 "-- %s\n",
530 state.job_id, to_delete.c_str(), s.ToString().c_str());
531 } else {
532 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
533 "[JOB %d] Delete info log file %s FAILED -- %s\n",
534 state.job_id, to_delete.c_str(),
535 s.ToString().c_str());
536 }
537 }
538 }
539 }
540#ifndef ROCKSDB_LITE
541 wal_manager_.PurgeObsoleteWALFiles();
542#endif // ROCKSDB_LITE
543 LogFlush(immutable_db_options_.info_log);
11fdf7f2
TL
544 InstrumentedMutexLock l(&mutex_);
545 --pending_purge_obsolete_files_;
546 assert(pending_purge_obsolete_files_ >= 0);
547 if (pending_purge_obsolete_files_ == 0) {
548 bg_cv_.SignalAll();
549 }
550 TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:End");
7c673cae
FG
551}
552
553void DBImpl::DeleteObsoleteFiles() {
554 mutex_.AssertHeld();
555 JobContext job_context(next_job_id_.fetch_add(1));
556 FindObsoleteFiles(&job_context, true);
557
558 mutex_.Unlock();
559 if (job_context.HaveSomethingToDelete()) {
560 PurgeObsoleteFiles(job_context);
561 }
562 job_context.Clean();
563 mutex_.Lock();
564}
11fdf7f2
TL
565
566uint64_t FindMinPrepLogReferencedByMemTable(
567 VersionSet* vset, const ColumnFamilyData* cfd_to_flush,
568 const autovector<MemTable*>& memtables_to_flush) {
569 uint64_t min_log = 0;
570
571 // we must look through the memtables for two phase transactions
572 // that have been committed but not yet flushed
573 for (auto loop_cfd : *vset->GetColumnFamilySet()) {
574 if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) {
575 continue;
576 }
577
578 auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection(
579 memtables_to_flush);
580
581 if (log > 0 && (min_log == 0 || log < min_log)) {
582 min_log = log;
583 }
584
585 log = loop_cfd->mem()->GetMinLogContainingPrepSection();
586
587 if (log > 0 && (min_log == 0 || log < min_log)) {
588 min_log = log;
589 }
590 }
591
592 return min_log;
593}
594
595uint64_t PrecomputeMinLogNumberToKeep(
596 VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
597 autovector<VersionEdit*> edit_list,
598 const autovector<MemTable*>& memtables_to_flush,
599 LogsWithPrepTracker* prep_tracker) {
600 assert(vset != nullptr);
601 assert(prep_tracker != nullptr);
602 // Calculate updated min_log_number_to_keep
603 // Since the function should only be called in 2pc mode, log number in
604 // the version edit should be sufficient.
605
606 // Precompute the min log number containing unflushed data for the column
607 // family being flushed (`cfd_to_flush`).
608 uint64_t cf_min_log_number_to_keep = 0;
609 for (auto& e : edit_list) {
610 if (e->has_log_number()) {
611 cf_min_log_number_to_keep =
612 std::max(cf_min_log_number_to_keep, e->log_number());
613 }
614 }
615 if (cf_min_log_number_to_keep == 0) {
616 // No version edit contains information on log number. The log number
617 // for this column family should stay the same as it is.
618 cf_min_log_number_to_keep = cfd_to_flush.GetLogNumber();
619 }
620
621 // Get min log number containing unflushed data for other column families.
622 uint64_t min_log_number_to_keep =
623 vset->PreComputeMinLogNumberWithUnflushedData(&cfd_to_flush);
624 if (cf_min_log_number_to_keep != 0) {
625 min_log_number_to_keep =
626 std::min(cf_min_log_number_to_keep, min_log_number_to_keep);
627 }
628
629 // if are 2pc we must consider logs containing prepared
630 // sections of outstanding transactions.
631 //
632 // We must check min logs with outstanding prep before we check
633 // logs references by memtables because a log referenced by the
634 // first data structure could transition to the second under us.
635 //
636 // TODO: iterating over all column families under db mutex.
637 // should find more optimal solution
638 auto min_log_in_prep_heap =
639 prep_tracker->FindMinLogContainingOutstandingPrep();
640
641 if (min_log_in_prep_heap != 0 &&
642 min_log_in_prep_heap < min_log_number_to_keep) {
643 min_log_number_to_keep = min_log_in_prep_heap;
644 }
645
646 uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(
647 vset, &cfd_to_flush, memtables_to_flush);
648
649 if (min_log_refed_by_mem != 0 &&
650 min_log_refed_by_mem < min_log_number_to_keep) {
651 min_log_number_to_keep = min_log_refed_by_mem;
652 }
653 return min_log_number_to_keep;
654}
655
7c673cae 656} // namespace rocksdb