]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "db/wal_manager.h" | |
11 | ||
12 | #ifndef __STDC_FORMAT_MACROS | |
13 | #define __STDC_FORMAT_MACROS | |
14 | #endif | |
15 | ||
16 | #include <inttypes.h> | |
17 | #include <algorithm> | |
18 | #include <vector> | |
19 | #include <memory> | |
20 | ||
21 | #include "db/log_reader.h" | |
22 | #include "db/log_writer.h" | |
23 | #include "db/transaction_log_impl.h" | |
24 | #include "db/write_batch_internal.h" | |
25 | #include "port/port.h" | |
26 | #include "rocksdb/env.h" | |
27 | #include "rocksdb/options.h" | |
28 | #include "rocksdb/write_batch.h" | |
11fdf7f2 | 29 | #include "util/cast_util.h" |
7c673cae FG |
30 | #include "util/coding.h" |
31 | #include "util/file_reader_writer.h" | |
494da23a | 32 | #include "util/file_util.h" |
7c673cae FG |
33 | #include "util/filename.h" |
34 | #include "util/logging.h" | |
35 | #include "util/mutexlock.h" | |
36 | #include "util/string_util.h" | |
37 | #include "util/sync_point.h" | |
38 | ||
39 | namespace rocksdb { | |
40 | ||
41 | #ifndef ROCKSDB_LITE | |
42 | ||
11fdf7f2 TL |
43 | Status WalManager::DeleteFile(const std::string& fname, uint64_t number) { |
44 | auto s = env_->DeleteFile(db_options_.wal_dir + "/" + fname); | |
45 | if (s.ok()) { | |
46 | MutexLock l(&read_first_record_cache_mutex_); | |
47 | read_first_record_cache_.erase(number); | |
48 | } | |
49 | return s; | |
50 | } | |
51 | ||
7c673cae FG |
52 | Status WalManager::GetSortedWalFiles(VectorLogPtr& files) { |
53 | // First get sorted files in db dir, then get sorted files from archived | |
54 | // dir, to avoid a race condition where a log file is moved to archived | |
55 | // dir in between. | |
56 | Status s; | |
57 | // list wal files in main db dir. | |
58 | VectorLogPtr logs; | |
59 | s = GetSortedWalsOfType(db_options_.wal_dir, logs, kAliveLogFile); | |
60 | if (!s.ok()) { | |
61 | return s; | |
62 | } | |
63 | ||
64 | // Reproduce the race condition where a log file is moved | |
65 | // to archived dir, between these two sync points, used in | |
66 | // (DBTest,TransactionLogIteratorRace) | |
67 | TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1"); | |
68 | TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2"); | |
69 | ||
70 | files.clear(); | |
71 | // list wal files in archive dir. | |
72 | std::string archivedir = ArchivalDirectory(db_options_.wal_dir); | |
73 | Status exists = env_->FileExists(archivedir); | |
74 | if (exists.ok()) { | |
75 | s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile); | |
76 | if (!s.ok()) { | |
77 | return s; | |
78 | } | |
79 | } else if (!exists.IsNotFound()) { | |
80 | assert(s.IsIOError()); | |
81 | return s; | |
82 | } | |
83 | ||
84 | uint64_t latest_archived_log_number = 0; | |
85 | if (!files.empty()) { | |
86 | latest_archived_log_number = files.back()->LogNumber(); | |
87 | ROCKS_LOG_INFO(db_options_.info_log, "Latest Archived log: %" PRIu64, | |
88 | latest_archived_log_number); | |
89 | } | |
90 | ||
91 | files.reserve(files.size() + logs.size()); | |
92 | for (auto& log : logs) { | |
93 | if (log->LogNumber() > latest_archived_log_number) { | |
94 | files.push_back(std::move(log)); | |
95 | } else { | |
96 | // When the race condition happens, we could see the | |
97 | // same log in both db dir and archived dir. Simply | |
98 | // ignore the one in db dir. Note that, if we read | |
99 | // archived dir first, we would have missed the log file. | |
100 | ROCKS_LOG_WARN(db_options_.info_log, "%s already moved to archive", | |
101 | log->PathName().c_str()); | |
102 | } | |
103 | } | |
104 | ||
105 | return s; | |
106 | } | |
107 | ||
108 | Status WalManager::GetUpdatesSince( | |
109 | SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter, | |
110 | const TransactionLogIterator::ReadOptions& read_options, | |
111 | VersionSet* version_set) { | |
112 | ||
113 | // Get all sorted Wal Files. | |
114 | // Do binary search and open files and find the seq number. | |
115 | ||
116 | std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr); | |
117 | Status s = GetSortedWalFiles(*wal_files); | |
118 | if (!s.ok()) { | |
119 | return s; | |
120 | } | |
121 | ||
122 | s = RetainProbableWalFiles(*wal_files, seq); | |
123 | if (!s.ok()) { | |
124 | return s; | |
125 | } | |
126 | iter->reset(new TransactionLogIteratorImpl( | |
127 | db_options_.wal_dir, &db_options_, read_options, env_options_, seq, | |
11fdf7f2 | 128 | std::move(wal_files), version_set, seq_per_batch_)); |
7c673cae FG |
129 | return (*iter)->status(); |
130 | } | |
131 | ||
132 | // 1. Go through all archived files and | |
133 | // a. if ttl is enabled, delete outdated files | |
134 | // b. if archive size limit is enabled, delete empty files, | |
135 | // compute file number and size. | |
136 | // 2. If size limit is enabled: | |
137 | // a. compute how many files should be deleted | |
138 | // b. get sorted non-empty archived logs | |
139 | // c. delete what should be deleted | |
140 | void WalManager::PurgeObsoleteWALFiles() { | |
141 | bool const ttl_enabled = db_options_.wal_ttl_seconds > 0; | |
142 | bool const size_limit_enabled = db_options_.wal_size_limit_mb > 0; | |
143 | if (!ttl_enabled && !size_limit_enabled) { | |
144 | return; | |
145 | } | |
146 | ||
147 | int64_t current_time; | |
148 | Status s = env_->GetCurrentTime(¤t_time); | |
149 | if (!s.ok()) { | |
150 | ROCKS_LOG_ERROR(db_options_.info_log, "Can't get current time: %s", | |
151 | s.ToString().c_str()); | |
152 | assert(false); | |
153 | return; | |
154 | } | |
155 | uint64_t const now_seconds = static_cast<uint64_t>(current_time); | |
156 | uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled) | |
157 | ? db_options_.wal_ttl_seconds / 2 | |
158 | : kDefaultIntervalToDeleteObsoleteWAL; | |
159 | ||
160 | if (purge_wal_files_last_run_ + time_to_check > now_seconds) { | |
161 | return; | |
162 | } | |
163 | ||
164 | purge_wal_files_last_run_ = now_seconds; | |
165 | ||
166 | std::string archival_dir = ArchivalDirectory(db_options_.wal_dir); | |
167 | std::vector<std::string> files; | |
168 | s = env_->GetChildren(archival_dir, &files); | |
169 | if (!s.ok()) { | |
170 | ROCKS_LOG_ERROR(db_options_.info_log, "Can't get archive files: %s", | |
171 | s.ToString().c_str()); | |
172 | assert(false); | |
173 | return; | |
174 | } | |
175 | ||
176 | size_t log_files_num = 0; | |
177 | uint64_t log_file_size = 0; | |
178 | ||
179 | for (auto& f : files) { | |
180 | uint64_t number; | |
181 | FileType type; | |
182 | if (ParseFileName(f, &number, &type) && type == kLogFile) { | |
183 | std::string const file_path = archival_dir + "/" + f; | |
184 | if (ttl_enabled) { | |
185 | uint64_t file_m_time; | |
186 | s = env_->GetFileModificationTime(file_path, &file_m_time); | |
187 | if (!s.ok()) { | |
188 | ROCKS_LOG_WARN(db_options_.info_log, | |
189 | "Can't get file mod time: %s: %s", file_path.c_str(), | |
190 | s.ToString().c_str()); | |
191 | continue; | |
192 | } | |
193 | if (now_seconds - file_m_time > db_options_.wal_ttl_seconds) { | |
494da23a | 194 | s = DeleteDBFile(&db_options_, file_path, archival_dir, false); |
7c673cae FG |
195 | if (!s.ok()) { |
196 | ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s", | |
197 | file_path.c_str(), s.ToString().c_str()); | |
198 | continue; | |
199 | } else { | |
200 | MutexLock l(&read_first_record_cache_mutex_); | |
201 | read_first_record_cache_.erase(number); | |
202 | } | |
203 | continue; | |
204 | } | |
205 | } | |
206 | ||
207 | if (size_limit_enabled) { | |
208 | uint64_t file_size; | |
209 | s = env_->GetFileSize(file_path, &file_size); | |
210 | if (!s.ok()) { | |
211 | ROCKS_LOG_ERROR(db_options_.info_log, | |
212 | "Unable to get file size: %s: %s", file_path.c_str(), | |
213 | s.ToString().c_str()); | |
214 | return; | |
215 | } else { | |
216 | if (file_size > 0) { | |
217 | log_file_size = std::max(log_file_size, file_size); | |
218 | ++log_files_num; | |
219 | } else { | |
494da23a | 220 | s = DeleteDBFile(&db_options_, file_path, archival_dir, false); |
7c673cae FG |
221 | if (!s.ok()) { |
222 | ROCKS_LOG_WARN(db_options_.info_log, | |
223 | "Unable to delete file: %s: %s", file_path.c_str(), | |
224 | s.ToString().c_str()); | |
225 | continue; | |
226 | } else { | |
227 | MutexLock l(&read_first_record_cache_mutex_); | |
228 | read_first_record_cache_.erase(number); | |
229 | } | |
230 | } | |
231 | } | |
232 | } | |
233 | } | |
234 | } | |
235 | ||
236 | if (0 == log_files_num || !size_limit_enabled) { | |
237 | return; | |
238 | } | |
239 | ||
240 | size_t const files_keep_num = | |
11fdf7f2 | 241 | static_cast<size_t>(db_options_.wal_size_limit_mb * 1024 * 1024 / log_file_size); |
7c673cae FG |
242 | if (log_files_num <= files_keep_num) { |
243 | return; | |
244 | } | |
245 | ||
246 | size_t files_del_num = log_files_num - files_keep_num; | |
247 | VectorLogPtr archived_logs; | |
248 | GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile); | |
249 | ||
250 | if (files_del_num > archived_logs.size()) { | |
251 | ROCKS_LOG_WARN(db_options_.info_log, | |
252 | "Trying to delete more archived log files than " | |
253 | "exist. Deleting all"); | |
254 | files_del_num = archived_logs.size(); | |
255 | } | |
256 | ||
257 | for (size_t i = 0; i < files_del_num; ++i) { | |
258 | std::string const file_path = archived_logs[i]->PathName(); | |
494da23a TL |
259 | s = DeleteDBFile(&db_options_, db_options_.wal_dir + "/" + file_path, |
260 | db_options_.wal_dir, false); | |
7c673cae FG |
261 | if (!s.ok()) { |
262 | ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s", | |
263 | file_path.c_str(), s.ToString().c_str()); | |
264 | continue; | |
265 | } else { | |
266 | MutexLock l(&read_first_record_cache_mutex_); | |
267 | read_first_record_cache_.erase(archived_logs[i]->LogNumber()); | |
268 | } | |
269 | } | |
270 | } | |
271 | ||
272 | void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) { | |
273 | auto archived_log_name = ArchivedLogFileName(db_options_.wal_dir, number); | |
274 | // The sync point below is used in (DBTest,TransactionLogIteratorRace) | |
275 | TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1"); | |
276 | Status s = env_->RenameFile(fname, archived_log_name); | |
277 | // The sync point below is used in (DBTest,TransactionLogIteratorRace) | |
278 | TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2"); | |
279 | ROCKS_LOG_INFO(db_options_.info_log, "Move log file %s to %s -- %s\n", | |
280 | fname.c_str(), archived_log_name.c_str(), | |
281 | s.ToString().c_str()); | |
282 | } | |
283 | ||
284 | namespace { | |
285 | struct CompareLogByPointer { | |
286 | bool operator()(const std::unique_ptr<LogFile>& a, | |
287 | const std::unique_ptr<LogFile>& b) { | |
11fdf7f2 TL |
288 | LogFileImpl* a_impl = static_cast_with_check<LogFileImpl, LogFile>(a.get()); |
289 | LogFileImpl* b_impl = static_cast_with_check<LogFileImpl, LogFile>(b.get()); | |
7c673cae FG |
290 | return *a_impl < *b_impl; |
291 | } | |
292 | }; | |
293 | } | |
294 | ||
295 | Status WalManager::GetSortedWalsOfType(const std::string& path, | |
296 | VectorLogPtr& log_files, | |
297 | WalFileType log_type) { | |
298 | std::vector<std::string> all_files; | |
299 | const Status status = env_->GetChildren(path, &all_files); | |
300 | if (!status.ok()) { | |
301 | return status; | |
302 | } | |
303 | log_files.reserve(all_files.size()); | |
304 | for (const auto& f : all_files) { | |
305 | uint64_t number; | |
306 | FileType type; | |
307 | if (ParseFileName(f, &number, &type) && type == kLogFile) { | |
308 | SequenceNumber sequence; | |
309 | Status s = ReadFirstRecord(log_type, number, &sequence); | |
310 | if (!s.ok()) { | |
311 | return s; | |
312 | } | |
313 | if (sequence == 0) { | |
314 | // empty file | |
315 | continue; | |
316 | } | |
317 | ||
318 | // Reproduce the race condition where a log file is moved | |
319 | // to archived dir, between these two sync points, used in | |
320 | // (DBTest,TransactionLogIteratorRace) | |
321 | TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1"); | |
322 | TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2"); | |
323 | ||
324 | uint64_t size_bytes; | |
325 | s = env_->GetFileSize(LogFileName(path, number), &size_bytes); | |
326 | // re-try in case the alive log file has been moved to archive. | |
327 | std::string archived_file = ArchivedLogFileName(path, number); | |
328 | if (!s.ok() && log_type == kAliveLogFile && | |
329 | env_->FileExists(archived_file).ok()) { | |
330 | s = env_->GetFileSize(archived_file, &size_bytes); | |
331 | if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) { | |
332 | // oops, the file just got deleted from archived dir! move on | |
333 | s = Status::OK(); | |
334 | continue; | |
335 | } | |
336 | } | |
337 | if (!s.ok()) { | |
338 | return s; | |
339 | } | |
340 | ||
341 | log_files.push_back(std::unique_ptr<LogFile>( | |
342 | new LogFileImpl(number, log_type, sequence, size_bytes))); | |
343 | } | |
344 | } | |
345 | CompareLogByPointer compare_log_files; | |
346 | std::sort(log_files.begin(), log_files.end(), compare_log_files); | |
347 | return status; | |
348 | } | |
349 | ||
350 | Status WalManager::RetainProbableWalFiles(VectorLogPtr& all_logs, | |
351 | const SequenceNumber target) { | |
352 | int64_t start = 0; // signed to avoid overflow when target is < first file. | |
353 | int64_t end = static_cast<int64_t>(all_logs.size()) - 1; | |
354 | // Binary Search. avoid opening all files. | |
355 | while (end >= start) { | |
356 | int64_t mid = start + (end - start) / 2; // Avoid overflow. | |
11fdf7f2 | 357 | SequenceNumber current_seq_num = all_logs.at(static_cast<size_t>(mid))->StartSequence(); |
7c673cae FG |
358 | if (current_seq_num == target) { |
359 | end = mid; | |
360 | break; | |
361 | } else if (current_seq_num < target) { | |
362 | start = mid + 1; | |
363 | } else { | |
364 | end = mid - 1; | |
365 | } | |
366 | } | |
367 | // end could be -ve. | |
11fdf7f2 | 368 | size_t start_index = static_cast<size_t>(std::max(static_cast<int64_t>(0), end)); |
7c673cae FG |
369 | // The last wal file is always included |
370 | all_logs.erase(all_logs.begin(), all_logs.begin() + start_index); | |
371 | return Status::OK(); | |
372 | } | |
373 | ||
374 | Status WalManager::ReadFirstRecord(const WalFileType type, | |
375 | const uint64_t number, | |
376 | SequenceNumber* sequence) { | |
377 | *sequence = 0; | |
378 | if (type != kAliveLogFile && type != kArchivedLogFile) { | |
379 | ROCKS_LOG_ERROR(db_options_.info_log, "[WalManger] Unknown file type %s", | |
380 | ToString(type).c_str()); | |
381 | return Status::NotSupported( | |
382 | "File Type Not Known " + ToString(type)); | |
383 | } | |
384 | { | |
385 | MutexLock l(&read_first_record_cache_mutex_); | |
386 | auto itr = read_first_record_cache_.find(number); | |
387 | if (itr != read_first_record_cache_.end()) { | |
388 | *sequence = itr->second; | |
389 | return Status::OK(); | |
390 | } | |
391 | } | |
392 | Status s; | |
393 | if (type == kAliveLogFile) { | |
394 | std::string fname = LogFileName(db_options_.wal_dir, number); | |
395 | s = ReadFirstLine(fname, number, sequence); | |
396 | if (env_->FileExists(fname).ok() && !s.ok()) { | |
397 | // return any error that is not caused by non-existing file | |
398 | return s; | |
399 | } | |
400 | } | |
401 | ||
402 | if (type == kArchivedLogFile || !s.ok()) { | |
403 | // check if the file got moved to archive. | |
404 | std::string archived_file = | |
405 | ArchivedLogFileName(db_options_.wal_dir, number); | |
406 | s = ReadFirstLine(archived_file, number, sequence); | |
407 | // maybe the file was deleted from archive dir. If that's the case, return | |
408 | // Status::OK(). The caller with identify this as empty file because | |
409 | // *sequence == 0 | |
410 | if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) { | |
411 | return Status::OK(); | |
412 | } | |
413 | } | |
414 | ||
415 | if (s.ok() && *sequence != 0) { | |
416 | MutexLock l(&read_first_record_cache_mutex_); | |
417 | read_first_record_cache_.insert({number, *sequence}); | |
418 | } | |
419 | return s; | |
420 | } | |
421 | ||
422 | // the function returns status.ok() and sequence == 0 if the file exists, but is | |
423 | // empty | |
424 | Status WalManager::ReadFirstLine(const std::string& fname, | |
425 | const uint64_t number, | |
426 | SequenceNumber* sequence) { | |
427 | struct LogReporter : public log::Reader::Reporter { | |
428 | Env* env; | |
429 | Logger* info_log; | |
430 | const char* fname; | |
431 | ||
432 | Status* status; | |
433 | bool ignore_error; // true if db_options_.paranoid_checks==false | |
494da23a | 434 | void Corruption(size_t bytes, const Status& s) override { |
7c673cae FG |
435 | ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s", |
436 | (this->ignore_error ? "(ignoring error) " : ""), fname, | |
437 | static_cast<int>(bytes), s.ToString().c_str()); | |
438 | if (this->status->ok()) { | |
439 | // only keep the first error | |
440 | *this->status = s; | |
441 | } | |
442 | } | |
443 | }; | |
444 | ||
445 | std::unique_ptr<SequentialFile> file; | |
11fdf7f2 TL |
446 | Status status = env_->NewSequentialFile( |
447 | fname, &file, env_->OptimizeForLogRead(env_options_)); | |
494da23a | 448 | std::unique_ptr<SequentialFileReader> file_reader( |
11fdf7f2 | 449 | new SequentialFileReader(std::move(file), fname)); |
7c673cae FG |
450 | |
451 | if (!status.ok()) { | |
452 | return status; | |
453 | } | |
454 | ||
455 | LogReporter reporter; | |
456 | reporter.env = env_; | |
457 | reporter.info_log = db_options_.info_log.get(); | |
458 | reporter.fname = fname.c_str(); | |
459 | reporter.status = &status; | |
460 | reporter.ignore_error = !db_options_.paranoid_checks; | |
461 | log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter, | |
11fdf7f2 | 462 | true /*checksum*/, number); |
7c673cae FG |
463 | std::string scratch; |
464 | Slice record; | |
465 | ||
466 | if (reader.ReadRecord(&record, &scratch) && | |
467 | (status.ok() || !db_options_.paranoid_checks)) { | |
468 | if (record.size() < WriteBatchInternal::kHeader) { | |
469 | reporter.Corruption(record.size(), | |
470 | Status::Corruption("log record too small")); | |
471 | // TODO read record's till the first no corrupt entry? | |
472 | } else { | |
473 | WriteBatch batch; | |
474 | WriteBatchInternal::SetContents(&batch, record); | |
475 | *sequence = WriteBatchInternal::Sequence(&batch); | |
476 | return Status::OK(); | |
477 | } | |
478 | } | |
479 | ||
480 | // ReadRecord returns false on EOF, which means that the log file is empty. we | |
481 | // return status.ok() in that case and set sequence number to 0 | |
482 | *sequence = 0; | |
483 | return status; | |
484 | } | |
485 | ||
486 | #endif // ROCKSDB_LITE | |
487 | } // namespace rocksdb |