]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | #include "db/db_impl.h" | |
10 | ||
11 | #ifndef __STDC_FORMAT_MACROS | |
12 | #define __STDC_FORMAT_MACROS | |
13 | #endif | |
14 | #include <inttypes.h> | |
11fdf7f2 TL |
15 | #include <set> |
16 | #include <unordered_set> | |
7c673cae | 17 | #include "db/event_helpers.h" |
11fdf7f2 | 18 | #include "db/memtable_list.h" |
7c673cae FG |
19 | #include "util/file_util.h" |
20 | #include "util/sst_file_manager_impl.h" | |
21 | ||
7c673cae | 22 | namespace rocksdb { |
494da23a | 23 | |
7c673cae | 24 | uint64_t DBImpl::MinLogNumberToKeep() { |
7c673cae | 25 | if (allow_2pc()) { |
11fdf7f2 TL |
26 | return versions_->min_log_number_to_keep_2pc(); |
27 | } else { | |
28 | return versions_->MinLogNumberWithUnflushedData(); | |
7c673cae | 29 | } |
7c673cae FG |
30 | } |
31 | ||
494da23a TL |
32 | uint64_t DBImpl::MinObsoleteSstNumberToKeep() { |
33 | mutex_.AssertHeld(); | |
34 | if (!pending_outputs_.empty()) { | |
35 | return *pending_outputs_.begin(); | |
36 | } | |
37 | return std::numeric_limits<uint64_t>::max(); | |
38 | } | |
39 | ||
7c673cae FG |
40 | // * Returns the list of live files in 'sst_live' |
41 | // If it's doing full scan: | |
42 | // * Returns the list of all files in the filesystem in | |
43 | // 'full_scan_candidate_files'. | |
44 | // Otherwise, gets obsolete files from VersionSet. | |
45 | // no_full_scan = true -- never do the full scan using GetChildren() | |
46 | // force = false -- don't force the full scan, except every | |
47 | // mutable_db_options_.delete_obsolete_files_period_micros | |
48 | // force = true -- force the full scan | |
49 | void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, | |
50 | bool no_full_scan) { | |
51 | mutex_.AssertHeld(); | |
52 | ||
53 | // if deletion is disabled, do nothing | |
54 | if (disable_delete_obsolete_files_ > 0) { | |
55 | return; | |
56 | } | |
57 | ||
58 | bool doing_the_full_scan = false; | |
59 | ||
11fdf7f2 | 60 | // logic for figuring out if we're doing the full scan |
7c673cae FG |
61 | if (no_full_scan) { |
62 | doing_the_full_scan = false; | |
63 | } else if (force || | |
64 | mutable_db_options_.delete_obsolete_files_period_micros == 0) { | |
65 | doing_the_full_scan = true; | |
66 | } else { | |
67 | const uint64_t now_micros = env_->NowMicros(); | |
68 | if ((delete_obsolete_files_last_run_ + | |
69 | mutable_db_options_.delete_obsolete_files_period_micros) < | |
70 | now_micros) { | |
71 | doing_the_full_scan = true; | |
72 | delete_obsolete_files_last_run_ = now_micros; | |
73 | } | |
74 | } | |
75 | ||
76 | // don't delete files that might be currently written to from compaction | |
77 | // threads | |
78 | // Since job_context->min_pending_output is set, until file scan finishes, | |
79 | // mutex_ cannot be released. Otherwise, we might see no min_pending_output | |
11fdf7f2 | 80 | // here but later find newer generated unfinalized files while scanning. |
7c673cae FG |
81 | if (!pending_outputs_.empty()) { |
82 | job_context->min_pending_output = *pending_outputs_.begin(); | |
83 | } else { | |
84 | // delete all of them | |
85 | job_context->min_pending_output = std::numeric_limits<uint64_t>::max(); | |
86 | } | |
87 | ||
88 | // Get obsolete files. This function will also update the list of | |
89 | // pending files in VersionSet(). | |
90 | versions_->GetObsoleteFiles(&job_context->sst_delete_files, | |
91 | &job_context->manifest_delete_files, | |
92 | job_context->min_pending_output); | |
93 | ||
11fdf7f2 TL |
94 | // Mark the elements in job_context->sst_delete_files as grabbedForPurge |
95 | // so that other threads calling FindObsoleteFiles with full_scan=true | |
96 | // will not add these files to candidate list for purge. | |
97 | for (const auto& sst_to_del : job_context->sst_delete_files) { | |
98 | MarkAsGrabbedForPurge(sst_to_del.metadata->fd.GetNumber()); | |
99 | } | |
100 | ||
7c673cae FG |
101 | // store the current filenum, lognum, etc |
102 | job_context->manifest_file_number = versions_->manifest_file_number(); | |
103 | job_context->pending_manifest_file_number = | |
104 | versions_->pending_manifest_file_number(); | |
105 | job_context->log_number = MinLogNumberToKeep(); | |
7c673cae FG |
106 | job_context->prev_log_number = versions_->prev_log_number(); |
107 | ||
108 | versions_->AddLiveFiles(&job_context->sst_live); | |
109 | if (doing_the_full_scan) { | |
11fdf7f2 | 110 | InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(), |
494da23a | 111 | dbname_); |
11fdf7f2 | 112 | std::set<std::string> paths; |
7c673cae FG |
113 | for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size(); |
114 | path_id++) { | |
11fdf7f2 TL |
115 | paths.insert(immutable_db_options_.db_paths[path_id].path); |
116 | } | |
117 | ||
118 | // Note that if cf_paths is not specified in the ColumnFamilyOptions | |
119 | // of a particular column family, we use db_paths as the cf_paths | |
120 | // setting. Hence, there can be multiple duplicates of files from db_paths | |
121 | // in the following code. The duplicate are removed while identifying | |
122 | // unique files in PurgeObsoleteFiles. | |
123 | for (auto cfd : *versions_->GetColumnFamilySet()) { | |
124 | for (size_t path_id = 0; path_id < cfd->ioptions()->cf_paths.size(); | |
125 | path_id++) { | |
126 | auto& path = cfd->ioptions()->cf_paths[path_id].path; | |
127 | ||
128 | if (paths.find(path) == paths.end()) { | |
129 | paths.insert(path); | |
130 | } | |
131 | } | |
132 | } | |
133 | ||
134 | for (auto& path : paths) { | |
7c673cae FG |
135 | // set of all files in the directory. We'll exclude files that are still |
136 | // alive in the subsequent processings. | |
137 | std::vector<std::string> files; | |
11fdf7f2 TL |
138 | env_->GetChildren(path, &files); // Ignore errors |
139 | for (const std::string& file : files) { | |
140 | uint64_t number; | |
141 | FileType type; | |
142 | // 1. If we cannot parse the file name, we skip; | |
143 | // 2. If the file with file_number equals number has already been | |
144 | // grabbed for purge by another compaction job, or it has already been | |
145 | // schedule for purge, we also skip it if we | |
146 | // are doing full scan in order to avoid double deletion of the same | |
147 | // file under race conditions. See | |
148 | // https://github.com/facebook/rocksdb/issues/3573 | |
149 | if (!ParseFileName(file, &number, info_log_prefix.prefix, &type) || | |
150 | !ShouldPurge(number)) { | |
151 | continue; | |
152 | } | |
153 | ||
7c673cae | 154 | // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes |
494da23a | 155 | job_context->full_scan_candidate_files.emplace_back("/" + file, path); |
7c673cae FG |
156 | } |
157 | } | |
158 | ||
159 | // Add log files in wal_dir | |
160 | if (immutable_db_options_.wal_dir != dbname_) { | |
161 | std::vector<std::string> log_files; | |
162 | env_->GetChildren(immutable_db_options_.wal_dir, | |
163 | &log_files); // Ignore errors | |
11fdf7f2 | 164 | for (const std::string& log_file : log_files) { |
494da23a TL |
165 | job_context->full_scan_candidate_files.emplace_back( |
166 | log_file, immutable_db_options_.wal_dir); | |
7c673cae FG |
167 | } |
168 | } | |
169 | // Add info log files in db_log_dir | |
170 | if (!immutable_db_options_.db_log_dir.empty() && | |
171 | immutable_db_options_.db_log_dir != dbname_) { | |
172 | std::vector<std::string> info_log_files; | |
173 | // Ignore errors | |
174 | env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files); | |
11fdf7f2 | 175 | for (std::string& log_file : info_log_files) { |
494da23a TL |
176 | job_context->full_scan_candidate_files.emplace_back( |
177 | log_file, immutable_db_options_.db_log_dir); | |
7c673cae FG |
178 | } |
179 | } | |
180 | } | |
181 | ||
182 | // logs_ is empty when called during recovery, in which case there can't yet | |
183 | // be any tracked obsolete logs | |
184 | if (!alive_log_files_.empty() && !logs_.empty()) { | |
185 | uint64_t min_log_number = job_context->log_number; | |
186 | size_t num_alive_log_files = alive_log_files_.size(); | |
187 | // find newly obsoleted log files | |
188 | while (alive_log_files_.begin()->number < min_log_number) { | |
189 | auto& earliest = *alive_log_files_.begin(); | |
190 | if (immutable_db_options_.recycle_log_file_num > | |
11fdf7f2 | 191 | log_recycle_files_.size()) { |
7c673cae FG |
192 | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
193 | "adding log %" PRIu64 " to recycle list\n", | |
194 | earliest.number); | |
11fdf7f2 | 195 | log_recycle_files_.push_back(earliest.number); |
7c673cae FG |
196 | } else { |
197 | job_context->log_delete_files.push_back(earliest.number); | |
198 | } | |
199 | if (job_context->size_log_to_delete == 0) { | |
200 | job_context->prev_total_log_size = total_log_size_; | |
201 | job_context->num_alive_log_files = num_alive_log_files; | |
202 | } | |
203 | job_context->size_log_to_delete += earliest.size; | |
204 | total_log_size_ -= earliest.size; | |
11fdf7f2 TL |
205 | if (two_write_queues_) { |
206 | log_write_mutex_.Lock(); | |
207 | } | |
7c673cae | 208 | alive_log_files_.pop_front(); |
11fdf7f2 TL |
209 | if (two_write_queues_) { |
210 | log_write_mutex_.Unlock(); | |
211 | } | |
7c673cae FG |
212 | // Current log should always stay alive since it can't have |
213 | // number < MinLogNumber(). | |
214 | assert(alive_log_files_.size()); | |
215 | } | |
216 | while (!logs_.empty() && logs_.front().number < min_log_number) { | |
217 | auto& log = logs_.front(); | |
218 | if (log.getting_synced) { | |
219 | log_sync_cv_.Wait(); | |
220 | // logs_ could have changed while we were waiting. | |
221 | continue; | |
222 | } | |
223 | logs_to_free_.push_back(log.ReleaseWriter()); | |
11fdf7f2 TL |
224 | { |
225 | InstrumentedMutexLock wl(&log_write_mutex_); | |
226 | logs_.pop_front(); | |
227 | } | |
7c673cae FG |
228 | } |
229 | // Current log cannot be obsolete. | |
230 | assert(!logs_.empty()); | |
231 | } | |
232 | ||
233 | // We're just cleaning up for DB::Write(). | |
234 | assert(job_context->logs_to_free.empty()); | |
235 | job_context->logs_to_free = logs_to_free_; | |
11fdf7f2 TL |
236 | job_context->log_recycle_files.assign(log_recycle_files_.begin(), |
237 | log_recycle_files_.end()); | |
238 | if (job_context->HaveSomethingToDelete()) { | |
239 | ++pending_purge_obsolete_files_; | |
240 | } | |
7c673cae FG |
241 | logs_to_free_.clear(); |
242 | } | |
243 | ||
244 | namespace { | |
245 | bool CompareCandidateFile(const JobContext::CandidateFileInfo& first, | |
246 | const JobContext::CandidateFileInfo& second) { | |
247 | if (first.file_name > second.file_name) { | |
248 | return true; | |
249 | } else if (first.file_name < second.file_name) { | |
250 | return false; | |
251 | } else { | |
11fdf7f2 | 252 | return (first.file_path > second.file_path); |
7c673cae FG |
253 | } |
254 | } | |
255 | }; // namespace | |
256 | ||
257 | // Delete obsolete files and log status and information of file deletion | |
11fdf7f2 TL |
258 | void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, |
259 | const std::string& path_to_sync, | |
260 | FileType type, uint64_t number) { | |
261 | Status file_deletion_status; | |
494da23a | 262 | if (type == kTableFile || type == kLogFile) { |
7c673cae | 263 | file_deletion_status = |
494da23a | 264 | DeleteDBFile(&immutable_db_options_, fname, path_to_sync); |
7c673cae FG |
265 | } else { |
266 | file_deletion_status = env_->DeleteFile(fname); | |
267 | } | |
11fdf7f2 | 268 | TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion", |
494da23a | 269 | &file_deletion_status); |
7c673cae FG |
270 | if (file_deletion_status.ok()) { |
271 | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, | |
272 | "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id, | |
273 | fname.c_str(), type, number, | |
274 | file_deletion_status.ToString().c_str()); | |
275 | } else if (env_->FileExists(fname).IsNotFound()) { | |
276 | ROCKS_LOG_INFO( | |
277 | immutable_db_options_.info_log, | |
278 | "[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64 | |
279 | " -- %s\n", | |
280 | job_id, fname.c_str(), type, number, | |
281 | file_deletion_status.ToString().c_str()); | |
282 | } else { | |
283 | ROCKS_LOG_ERROR(immutable_db_options_.info_log, | |
284 | "[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n", | |
285 | job_id, fname.c_str(), type, number, | |
286 | file_deletion_status.ToString().c_str()); | |
287 | } | |
288 | if (type == kTableFile) { | |
289 | EventHelpers::LogAndNotifyTableFileDeletion( | |
290 | &event_logger_, job_id, number, fname, file_deletion_status, GetName(), | |
291 | immutable_db_options_.listeners); | |
292 | } | |
293 | } | |
294 | ||
295 | // Diffs the files listed in filenames and those that do not | |
11fdf7f2 | 296 | // belong to live files are possibly removed. Also, removes all the |
7c673cae FG |
297 | // files in sst_delete_files and log_delete_files. |
298 | // It is not necessary to hold the mutex when invoking this method. | |
11fdf7f2 TL |
299 | void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { |
300 | TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin"); | |
7c673cae FG |
301 | // we'd better have sth to delete |
302 | assert(state.HaveSomethingToDelete()); | |
303 | ||
11fdf7f2 TL |
304 | // FindObsoleteFiles() should've populated this so nonzero |
305 | assert(state.manifest_file_number != 0); | |
7c673cae FG |
306 | |
307 | // Now, convert live list to an unordered map, WITHOUT mutex held; | |
308 | // set is slow. | |
309 | std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map; | |
310 | for (const FileDescriptor& fd : state.sst_live) { | |
311 | sst_live_map[fd.GetNumber()] = &fd; | |
312 | } | |
313 | std::unordered_set<uint64_t> log_recycle_files_set( | |
314 | state.log_recycle_files.begin(), state.log_recycle_files.end()); | |
315 | ||
316 | auto candidate_files = state.full_scan_candidate_files; | |
317 | candidate_files.reserve( | |
318 | candidate_files.size() + state.sst_delete_files.size() + | |
319 | state.log_delete_files.size() + state.manifest_delete_files.size()); | |
320 | // We may ignore the dbname when generating the file names. | |
321 | const char* kDumbDbName = ""; | |
11fdf7f2 | 322 | for (auto& file : state.sst_delete_files) { |
7c673cae | 323 | candidate_files.emplace_back( |
494da23a TL |
324 | MakeTableFileName(kDumbDbName, file.metadata->fd.GetNumber()), |
325 | file.path); | |
11fdf7f2 TL |
326 | if (file.metadata->table_reader_handle) { |
327 | table_cache_->Release(file.metadata->table_reader_handle); | |
328 | } | |
329 | file.DeleteMetadata(); | |
7c673cae FG |
330 | } |
331 | ||
332 | for (auto file_num : state.log_delete_files) { | |
333 | if (file_num > 0) { | |
11fdf7f2 | 334 | candidate_files.emplace_back(LogFileName(kDumbDbName, file_num), |
494da23a | 335 | immutable_db_options_.wal_dir); |
7c673cae FG |
336 | } |
337 | } | |
338 | for (const auto& filename : state.manifest_delete_files) { | |
11fdf7f2 | 339 | candidate_files.emplace_back(filename, dbname_); |
7c673cae FG |
340 | } |
341 | ||
342 | // dedup state.candidate_files so we don't try to delete the same | |
343 | // file twice | |
344 | std::sort(candidate_files.begin(), candidate_files.end(), | |
345 | CompareCandidateFile); | |
346 | candidate_files.erase( | |
347 | std::unique(candidate_files.begin(), candidate_files.end()), | |
348 | candidate_files.end()); | |
349 | ||
350 | if (state.prev_total_log_size > 0) { | |
351 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
352 | "[JOB %d] Try to delete WAL files size %" PRIu64 | |
353 | ", prev total WAL file size %" PRIu64 | |
354 | ", number of live WAL files %" ROCKSDB_PRIszt ".\n", | |
355 | state.job_id, state.size_log_to_delete, | |
356 | state.prev_total_log_size, state.num_alive_log_files); | |
357 | } | |
358 | ||
359 | std::vector<std::string> old_info_log_files; | |
360 | InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(), | |
361 | dbname_); | |
11fdf7f2 TL |
362 | |
363 | // File numbers of most recent two OPTIONS file in candidate_files (found in | |
364 | // previos FindObsoleteFiles(full_scan=true)) | |
365 | // At this point, there must not be any duplicate file numbers in | |
366 | // candidate_files. | |
367 | uint64_t optsfile_num1 = std::numeric_limits<uint64_t>::min(); | |
368 | uint64_t optsfile_num2 = std::numeric_limits<uint64_t>::min(); | |
7c673cae | 369 | for (const auto& candidate_file : candidate_files) { |
11fdf7f2 TL |
370 | const std::string& fname = candidate_file.file_name; |
371 | uint64_t number; | |
372 | FileType type; | |
373 | if (!ParseFileName(fname, &number, info_log_prefix.prefix, &type) || | |
374 | type != kOptionsFile) { | |
375 | continue; | |
376 | } | |
377 | if (number > optsfile_num1) { | |
378 | optsfile_num2 = optsfile_num1; | |
379 | optsfile_num1 = number; | |
380 | } else if (number > optsfile_num2) { | |
381 | optsfile_num2 = number; | |
382 | } | |
383 | } | |
384 | ||
385 | std::unordered_set<uint64_t> files_to_del; | |
386 | for (const auto& candidate_file : candidate_files) { | |
387 | const std::string& to_delete = candidate_file.file_name; | |
7c673cae FG |
388 | uint64_t number; |
389 | FileType type; | |
390 | // Ignore file if we cannot recognize it. | |
391 | if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) { | |
392 | continue; | |
393 | } | |
394 | ||
395 | bool keep = true; | |
396 | switch (type) { | |
397 | case kLogFile: | |
398 | keep = ((number >= state.log_number) || | |
399 | (number == state.prev_log_number) || | |
400 | (log_recycle_files_set.find(number) != | |
401 | log_recycle_files_set.end())); | |
402 | break; | |
403 | case kDescriptorFile: | |
404 | // Keep my manifest file, and any newer incarnations' | |
405 | // (can happen during manifest roll) | |
406 | keep = (number >= state.manifest_file_number); | |
407 | break; | |
408 | case kTableFile: | |
409 | // If the second condition is not there, this makes | |
410 | // DontDeletePendingOutputs fail | |
411 | keep = (sst_live_map.find(number) != sst_live_map.end()) || | |
412 | number >= state.min_pending_output; | |
11fdf7f2 TL |
413 | if (!keep) { |
414 | files_to_del.insert(number); | |
415 | } | |
7c673cae FG |
416 | break; |
417 | case kTempFile: | |
418 | // Any temp files that are currently being written to must | |
419 | // be recorded in pending_outputs_, which is inserted into "live". | |
420 | // Also, SetCurrentFile creates a temp file when writing out new | |
421 | // manifest, which is equal to state.pending_manifest_file_number. We | |
422 | // should not delete that file | |
423 | // | |
424 | // TODO(yhchiang): carefully modify the third condition to safely | |
425 | // remove the temp options files. | |
426 | keep = (sst_live_map.find(number) != sst_live_map.end()) || | |
427 | (number == state.pending_manifest_file_number) || | |
428 | (to_delete.find(kOptionsFileNamePrefix) != std::string::npos); | |
429 | break; | |
430 | case kInfoLogFile: | |
431 | keep = true; | |
432 | if (number != 0) { | |
433 | old_info_log_files.push_back(to_delete); | |
434 | } | |
435 | break; | |
11fdf7f2 TL |
436 | case kOptionsFile: |
437 | keep = (number >= optsfile_num2); | |
438 | TEST_SYNC_POINT_CALLBACK( | |
439 | "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:1", | |
440 | reinterpret_cast<void*>(&number)); | |
441 | TEST_SYNC_POINT_CALLBACK( | |
442 | "DBImpl::PurgeObsoleteFiles:CheckOptionsFiles:2", | |
443 | reinterpret_cast<void*>(&keep)); | |
444 | break; | |
7c673cae FG |
445 | case kCurrentFile: |
446 | case kDBLockFile: | |
447 | case kIdentityFile: | |
448 | case kMetaDatabase: | |
7c673cae FG |
449 | case kBlobFile: |
450 | keep = true; | |
451 | break; | |
452 | } | |
453 | ||
454 | if (keep) { | |
455 | continue; | |
456 | } | |
457 | ||
458 | std::string fname; | |
11fdf7f2 | 459 | std::string dir_to_sync; |
7c673cae FG |
460 | if (type == kTableFile) { |
461 | // evict from cache | |
462 | TableCache::Evict(table_cache_.get(), number); | |
11fdf7f2 TL |
463 | fname = MakeTableFileName(candidate_file.file_path, number); |
464 | dir_to_sync = candidate_file.file_path; | |
7c673cae | 465 | } else { |
11fdf7f2 TL |
466 | dir_to_sync = |
467 | (type == kLogFile) ? immutable_db_options_.wal_dir : dbname_; | |
494da23a TL |
468 | fname = dir_to_sync + |
469 | ((!dir_to_sync.empty() && dir_to_sync.back() == '/') || | |
470 | (!to_delete.empty() && to_delete.front() == '/') | |
471 | ? "" | |
472 | : "/") + | |
473 | to_delete; | |
7c673cae FG |
474 | } |
475 | ||
476 | #ifndef ROCKSDB_LITE | |
477 | if (type == kLogFile && (immutable_db_options_.wal_ttl_seconds > 0 || | |
478 | immutable_db_options_.wal_size_limit_mb > 0)) { | |
479 | wal_manager_.ArchiveWALFile(fname, number); | |
480 | continue; | |
481 | } | |
482 | #endif // !ROCKSDB_LITE | |
483 | ||
484 | Status file_deletion_status; | |
485 | if (schedule_only) { | |
486 | InstrumentedMutexLock guard_lock(&mutex_); | |
11fdf7f2 | 487 | SchedulePendingPurge(fname, dir_to_sync, type, number, state.job_id); |
7c673cae | 488 | } else { |
11fdf7f2 | 489 | DeleteObsoleteFileImpl(state.job_id, fname, dir_to_sync, type, number); |
7c673cae FG |
490 | } |
491 | } | |
492 | ||
11fdf7f2 TL |
493 | { |
494 | // After purging obsolete files, remove them from files_grabbed_for_purge_. | |
495 | // Use a temporary vector to perform bulk deletion via swap. | |
496 | InstrumentedMutexLock guard_lock(&mutex_); | |
497 | std::vector<uint64_t> tmp; | |
498 | for (auto fn : files_grabbed_for_purge_) { | |
499 | if (files_to_del.count(fn) == 0) { | |
500 | tmp.emplace_back(fn); | |
501 | } | |
502 | } | |
503 | files_grabbed_for_purge_.swap(tmp); | |
504 | } | |
505 | ||
7c673cae FG |
506 | // Delete old info log files. |
507 | size_t old_info_log_file_count = old_info_log_files.size(); | |
508 | if (old_info_log_file_count != 0 && | |
509 | old_info_log_file_count >= immutable_db_options_.keep_log_file_num) { | |
510 | std::sort(old_info_log_files.begin(), old_info_log_files.end()); | |
511 | size_t end = | |
512 | old_info_log_file_count - immutable_db_options_.keep_log_file_num; | |
513 | for (unsigned int i = 0; i <= end; i++) { | |
514 | std::string& to_delete = old_info_log_files.at(i); | |
515 | std::string full_path_to_delete = | |
516 | (immutable_db_options_.db_log_dir.empty() | |
517 | ? dbname_ | |
518 | : immutable_db_options_.db_log_dir) + | |
519 | "/" + to_delete; | |
520 | ROCKS_LOG_INFO(immutable_db_options_.info_log, | |
521 | "[JOB %d] Delete info log file %s\n", state.job_id, | |
522 | full_path_to_delete.c_str()); | |
523 | Status s = env_->DeleteFile(full_path_to_delete); | |
524 | if (!s.ok()) { | |
525 | if (env_->FileExists(full_path_to_delete).IsNotFound()) { | |
526 | ROCKS_LOG_INFO( | |
527 | immutable_db_options_.info_log, | |
528 | "[JOB %d] Tried to delete non-existing info log file %s FAILED " | |
529 | "-- %s\n", | |
530 | state.job_id, to_delete.c_str(), s.ToString().c_str()); | |
531 | } else { | |
532 | ROCKS_LOG_ERROR(immutable_db_options_.info_log, | |
533 | "[JOB %d] Delete info log file %s FAILED -- %s\n", | |
534 | state.job_id, to_delete.c_str(), | |
535 | s.ToString().c_str()); | |
536 | } | |
537 | } | |
538 | } | |
539 | } | |
540 | #ifndef ROCKSDB_LITE | |
541 | wal_manager_.PurgeObsoleteWALFiles(); | |
542 | #endif // ROCKSDB_LITE | |
543 | LogFlush(immutable_db_options_.info_log); | |
11fdf7f2 TL |
544 | InstrumentedMutexLock l(&mutex_); |
545 | --pending_purge_obsolete_files_; | |
546 | assert(pending_purge_obsolete_files_ >= 0); | |
547 | if (pending_purge_obsolete_files_ == 0) { | |
548 | bg_cv_.SignalAll(); | |
549 | } | |
550 | TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:End"); | |
7c673cae FG |
551 | } |
552 | ||
553 | void DBImpl::DeleteObsoleteFiles() { | |
554 | mutex_.AssertHeld(); | |
555 | JobContext job_context(next_job_id_.fetch_add(1)); | |
556 | FindObsoleteFiles(&job_context, true); | |
557 | ||
558 | mutex_.Unlock(); | |
559 | if (job_context.HaveSomethingToDelete()) { | |
560 | PurgeObsoleteFiles(job_context); | |
561 | } | |
562 | job_context.Clean(); | |
563 | mutex_.Lock(); | |
564 | } | |
11fdf7f2 TL |
565 | |
566 | uint64_t FindMinPrepLogReferencedByMemTable( | |
567 | VersionSet* vset, const ColumnFamilyData* cfd_to_flush, | |
568 | const autovector<MemTable*>& memtables_to_flush) { | |
569 | uint64_t min_log = 0; | |
570 | ||
571 | // we must look through the memtables for two phase transactions | |
572 | // that have been committed but not yet flushed | |
573 | for (auto loop_cfd : *vset->GetColumnFamilySet()) { | |
574 | if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) { | |
575 | continue; | |
576 | } | |
577 | ||
578 | auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection( | |
579 | memtables_to_flush); | |
580 | ||
581 | if (log > 0 && (min_log == 0 || log < min_log)) { | |
582 | min_log = log; | |
583 | } | |
584 | ||
585 | log = loop_cfd->mem()->GetMinLogContainingPrepSection(); | |
586 | ||
587 | if (log > 0 && (min_log == 0 || log < min_log)) { | |
588 | min_log = log; | |
589 | } | |
590 | } | |
591 | ||
592 | return min_log; | |
593 | } | |
594 | ||
595 | uint64_t PrecomputeMinLogNumberToKeep( | |
596 | VersionSet* vset, const ColumnFamilyData& cfd_to_flush, | |
597 | autovector<VersionEdit*> edit_list, | |
598 | const autovector<MemTable*>& memtables_to_flush, | |
599 | LogsWithPrepTracker* prep_tracker) { | |
600 | assert(vset != nullptr); | |
601 | assert(prep_tracker != nullptr); | |
602 | // Calculate updated min_log_number_to_keep | |
603 | // Since the function should only be called in 2pc mode, log number in | |
604 | // the version edit should be sufficient. | |
605 | ||
606 | // Precompute the min log number containing unflushed data for the column | |
607 | // family being flushed (`cfd_to_flush`). | |
608 | uint64_t cf_min_log_number_to_keep = 0; | |
609 | for (auto& e : edit_list) { | |
610 | if (e->has_log_number()) { | |
611 | cf_min_log_number_to_keep = | |
612 | std::max(cf_min_log_number_to_keep, e->log_number()); | |
613 | } | |
614 | } | |
615 | if (cf_min_log_number_to_keep == 0) { | |
616 | // No version edit contains information on log number. The log number | |
617 | // for this column family should stay the same as it is. | |
618 | cf_min_log_number_to_keep = cfd_to_flush.GetLogNumber(); | |
619 | } | |
620 | ||
621 | // Get min log number containing unflushed data for other column families. | |
622 | uint64_t min_log_number_to_keep = | |
623 | vset->PreComputeMinLogNumberWithUnflushedData(&cfd_to_flush); | |
624 | if (cf_min_log_number_to_keep != 0) { | |
625 | min_log_number_to_keep = | |
626 | std::min(cf_min_log_number_to_keep, min_log_number_to_keep); | |
627 | } | |
628 | ||
629 | // if are 2pc we must consider logs containing prepared | |
630 | // sections of outstanding transactions. | |
631 | // | |
632 | // We must check min logs with outstanding prep before we check | |
633 | // logs references by memtables because a log referenced by the | |
634 | // first data structure could transition to the second under us. | |
635 | // | |
636 | // TODO: iterating over all column families under db mutex. | |
637 | // should find more optimal solution | |
638 | auto min_log_in_prep_heap = | |
639 | prep_tracker->FindMinLogContainingOutstandingPrep(); | |
640 | ||
641 | if (min_log_in_prep_heap != 0 && | |
642 | min_log_in_prep_heap < min_log_number_to_keep) { | |
643 | min_log_number_to_keep = min_log_in_prep_heap; | |
644 | } | |
645 | ||
646 | uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable( | |
647 | vset, &cfd_to_flush, memtables_to_flush); | |
648 | ||
649 | if (min_log_refed_by_mem != 0 && | |
650 | min_log_refed_by_mem < min_log_number_to_keep) { | |
651 | min_log_number_to_keep = min_log_refed_by_mem; | |
652 | } | |
653 | return min_log_number_to_keep; | |
654 | } | |
655 | ||
7c673cae | 656 | } // namespace rocksdb |