]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "db/wal_manager.h" | |
11 | ||
12 | #ifndef __STDC_FORMAT_MACROS | |
13 | #define __STDC_FORMAT_MACROS | |
14 | #endif | |
15 | ||
16 | #include <inttypes.h> | |
17 | #include <algorithm> | |
18 | #include <vector> | |
19 | #include <memory> | |
20 | ||
21 | #include "db/log_reader.h" | |
22 | #include "db/log_writer.h" | |
23 | #include "db/transaction_log_impl.h" | |
24 | #include "db/write_batch_internal.h" | |
25 | #include "port/port.h" | |
26 | #include "rocksdb/env.h" | |
27 | #include "rocksdb/options.h" | |
28 | #include "rocksdb/write_batch.h" | |
11fdf7f2 | 29 | #include "util/cast_util.h" |
7c673cae FG |
30 | #include "util/coding.h" |
31 | #include "util/file_reader_writer.h" | |
32 | #include "util/filename.h" | |
33 | #include "util/logging.h" | |
34 | #include "util/mutexlock.h" | |
35 | #include "util/string_util.h" | |
36 | #include "util/sync_point.h" | |
37 | ||
38 | namespace rocksdb { | |
39 | ||
40 | #ifndef ROCKSDB_LITE | |
41 | ||
11fdf7f2 TL |
42 | Status WalManager::DeleteFile(const std::string& fname, uint64_t number) { |
43 | auto s = env_->DeleteFile(db_options_.wal_dir + "/" + fname); | |
44 | if (s.ok()) { | |
45 | MutexLock l(&read_first_record_cache_mutex_); | |
46 | read_first_record_cache_.erase(number); | |
47 | } | |
48 | return s; | |
49 | } | |
50 | ||
7c673cae FG |
51 | Status WalManager::GetSortedWalFiles(VectorLogPtr& files) { |
52 | // First get sorted files in db dir, then get sorted files from archived | |
53 | // dir, to avoid a race condition where a log file is moved to archived | |
54 | // dir in between. | |
55 | Status s; | |
56 | // list wal files in main db dir. | |
57 | VectorLogPtr logs; | |
58 | s = GetSortedWalsOfType(db_options_.wal_dir, logs, kAliveLogFile); | |
59 | if (!s.ok()) { | |
60 | return s; | |
61 | } | |
62 | ||
63 | // Reproduce the race condition where a log file is moved | |
64 | // to archived dir, between these two sync points, used in | |
65 | // (DBTest,TransactionLogIteratorRace) | |
66 | TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1"); | |
67 | TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2"); | |
68 | ||
69 | files.clear(); | |
70 | // list wal files in archive dir. | |
71 | std::string archivedir = ArchivalDirectory(db_options_.wal_dir); | |
72 | Status exists = env_->FileExists(archivedir); | |
73 | if (exists.ok()) { | |
74 | s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile); | |
75 | if (!s.ok()) { | |
76 | return s; | |
77 | } | |
78 | } else if (!exists.IsNotFound()) { | |
79 | assert(s.IsIOError()); | |
80 | return s; | |
81 | } | |
82 | ||
83 | uint64_t latest_archived_log_number = 0; | |
84 | if (!files.empty()) { | |
85 | latest_archived_log_number = files.back()->LogNumber(); | |
86 | ROCKS_LOG_INFO(db_options_.info_log, "Latest Archived log: %" PRIu64, | |
87 | latest_archived_log_number); | |
88 | } | |
89 | ||
90 | files.reserve(files.size() + logs.size()); | |
91 | for (auto& log : logs) { | |
92 | if (log->LogNumber() > latest_archived_log_number) { | |
93 | files.push_back(std::move(log)); | |
94 | } else { | |
95 | // When the race condition happens, we could see the | |
96 | // same log in both db dir and archived dir. Simply | |
97 | // ignore the one in db dir. Note that, if we read | |
98 | // archived dir first, we would have missed the log file. | |
99 | ROCKS_LOG_WARN(db_options_.info_log, "%s already moved to archive", | |
100 | log->PathName().c_str()); | |
101 | } | |
102 | } | |
103 | ||
104 | return s; | |
105 | } | |
106 | ||
107 | Status WalManager::GetUpdatesSince( | |
108 | SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter, | |
109 | const TransactionLogIterator::ReadOptions& read_options, | |
110 | VersionSet* version_set) { | |
111 | ||
112 | // Get all sorted Wal Files. | |
113 | // Do binary search and open files and find the seq number. | |
114 | ||
115 | std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr); | |
116 | Status s = GetSortedWalFiles(*wal_files); | |
117 | if (!s.ok()) { | |
118 | return s; | |
119 | } | |
120 | ||
121 | s = RetainProbableWalFiles(*wal_files, seq); | |
122 | if (!s.ok()) { | |
123 | return s; | |
124 | } | |
125 | iter->reset(new TransactionLogIteratorImpl( | |
126 | db_options_.wal_dir, &db_options_, read_options, env_options_, seq, | |
11fdf7f2 | 127 | std::move(wal_files), version_set, seq_per_batch_)); |
7c673cae FG |
128 | return (*iter)->status(); |
129 | } | |
130 | ||
131 | // 1. Go through all archived files and | |
132 | // a. if ttl is enabled, delete outdated files | |
133 | // b. if archive size limit is enabled, delete empty files, | |
134 | // compute file number and size. | |
135 | // 2. If size limit is enabled: | |
136 | // a. compute how many files should be deleted | |
137 | // b. get sorted non-empty archived logs | |
138 | // c. delete what should be deleted | |
139 | void WalManager::PurgeObsoleteWALFiles() { | |
140 | bool const ttl_enabled = db_options_.wal_ttl_seconds > 0; | |
141 | bool const size_limit_enabled = db_options_.wal_size_limit_mb > 0; | |
142 | if (!ttl_enabled && !size_limit_enabled) { | |
143 | return; | |
144 | } | |
145 | ||
146 | int64_t current_time; | |
147 | Status s = env_->GetCurrentTime(¤t_time); | |
148 | if (!s.ok()) { | |
149 | ROCKS_LOG_ERROR(db_options_.info_log, "Can't get current time: %s", | |
150 | s.ToString().c_str()); | |
151 | assert(false); | |
152 | return; | |
153 | } | |
154 | uint64_t const now_seconds = static_cast<uint64_t>(current_time); | |
155 | uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled) | |
156 | ? db_options_.wal_ttl_seconds / 2 | |
157 | : kDefaultIntervalToDeleteObsoleteWAL; | |
158 | ||
159 | if (purge_wal_files_last_run_ + time_to_check > now_seconds) { | |
160 | return; | |
161 | } | |
162 | ||
163 | purge_wal_files_last_run_ = now_seconds; | |
164 | ||
165 | std::string archival_dir = ArchivalDirectory(db_options_.wal_dir); | |
166 | std::vector<std::string> files; | |
167 | s = env_->GetChildren(archival_dir, &files); | |
168 | if (!s.ok()) { | |
169 | ROCKS_LOG_ERROR(db_options_.info_log, "Can't get archive files: %s", | |
170 | s.ToString().c_str()); | |
171 | assert(false); | |
172 | return; | |
173 | } | |
174 | ||
175 | size_t log_files_num = 0; | |
176 | uint64_t log_file_size = 0; | |
177 | ||
178 | for (auto& f : files) { | |
179 | uint64_t number; | |
180 | FileType type; | |
181 | if (ParseFileName(f, &number, &type) && type == kLogFile) { | |
182 | std::string const file_path = archival_dir + "/" + f; | |
183 | if (ttl_enabled) { | |
184 | uint64_t file_m_time; | |
185 | s = env_->GetFileModificationTime(file_path, &file_m_time); | |
186 | if (!s.ok()) { | |
187 | ROCKS_LOG_WARN(db_options_.info_log, | |
188 | "Can't get file mod time: %s: %s", file_path.c_str(), | |
189 | s.ToString().c_str()); | |
190 | continue; | |
191 | } | |
192 | if (now_seconds - file_m_time > db_options_.wal_ttl_seconds) { | |
193 | s = env_->DeleteFile(file_path); | |
194 | if (!s.ok()) { | |
195 | ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s", | |
196 | file_path.c_str(), s.ToString().c_str()); | |
197 | continue; | |
198 | } else { | |
199 | MutexLock l(&read_first_record_cache_mutex_); | |
200 | read_first_record_cache_.erase(number); | |
201 | } | |
202 | continue; | |
203 | } | |
204 | } | |
205 | ||
206 | if (size_limit_enabled) { | |
207 | uint64_t file_size; | |
208 | s = env_->GetFileSize(file_path, &file_size); | |
209 | if (!s.ok()) { | |
210 | ROCKS_LOG_ERROR(db_options_.info_log, | |
211 | "Unable to get file size: %s: %s", file_path.c_str(), | |
212 | s.ToString().c_str()); | |
213 | return; | |
214 | } else { | |
215 | if (file_size > 0) { | |
216 | log_file_size = std::max(log_file_size, file_size); | |
217 | ++log_files_num; | |
218 | } else { | |
219 | s = env_->DeleteFile(file_path); | |
220 | if (!s.ok()) { | |
221 | ROCKS_LOG_WARN(db_options_.info_log, | |
222 | "Unable to delete file: %s: %s", file_path.c_str(), | |
223 | s.ToString().c_str()); | |
224 | continue; | |
225 | } else { | |
226 | MutexLock l(&read_first_record_cache_mutex_); | |
227 | read_first_record_cache_.erase(number); | |
228 | } | |
229 | } | |
230 | } | |
231 | } | |
232 | } | |
233 | } | |
234 | ||
235 | if (0 == log_files_num || !size_limit_enabled) { | |
236 | return; | |
237 | } | |
238 | ||
239 | size_t const files_keep_num = | |
11fdf7f2 | 240 | static_cast<size_t>(db_options_.wal_size_limit_mb * 1024 * 1024 / log_file_size); |
7c673cae FG |
241 | if (log_files_num <= files_keep_num) { |
242 | return; | |
243 | } | |
244 | ||
245 | size_t files_del_num = log_files_num - files_keep_num; | |
246 | VectorLogPtr archived_logs; | |
247 | GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile); | |
248 | ||
249 | if (files_del_num > archived_logs.size()) { | |
250 | ROCKS_LOG_WARN(db_options_.info_log, | |
251 | "Trying to delete more archived log files than " | |
252 | "exist. Deleting all"); | |
253 | files_del_num = archived_logs.size(); | |
254 | } | |
255 | ||
256 | for (size_t i = 0; i < files_del_num; ++i) { | |
257 | std::string const file_path = archived_logs[i]->PathName(); | |
258 | s = env_->DeleteFile(db_options_.wal_dir + "/" + file_path); | |
259 | if (!s.ok()) { | |
260 | ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s", | |
261 | file_path.c_str(), s.ToString().c_str()); | |
262 | continue; | |
263 | } else { | |
264 | MutexLock l(&read_first_record_cache_mutex_); | |
265 | read_first_record_cache_.erase(archived_logs[i]->LogNumber()); | |
266 | } | |
267 | } | |
268 | } | |
269 | ||
270 | void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) { | |
271 | auto archived_log_name = ArchivedLogFileName(db_options_.wal_dir, number); | |
272 | // The sync point below is used in (DBTest,TransactionLogIteratorRace) | |
273 | TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1"); | |
274 | Status s = env_->RenameFile(fname, archived_log_name); | |
275 | // The sync point below is used in (DBTest,TransactionLogIteratorRace) | |
276 | TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2"); | |
277 | ROCKS_LOG_INFO(db_options_.info_log, "Move log file %s to %s -- %s\n", | |
278 | fname.c_str(), archived_log_name.c_str(), | |
279 | s.ToString().c_str()); | |
280 | } | |
281 | ||
282 | namespace { | |
283 | struct CompareLogByPointer { | |
284 | bool operator()(const std::unique_ptr<LogFile>& a, | |
285 | const std::unique_ptr<LogFile>& b) { | |
11fdf7f2 TL |
286 | LogFileImpl* a_impl = static_cast_with_check<LogFileImpl, LogFile>(a.get()); |
287 | LogFileImpl* b_impl = static_cast_with_check<LogFileImpl, LogFile>(b.get()); | |
7c673cae FG |
288 | return *a_impl < *b_impl; |
289 | } | |
290 | }; | |
291 | } | |
292 | ||
293 | Status WalManager::GetSortedWalsOfType(const std::string& path, | |
294 | VectorLogPtr& log_files, | |
295 | WalFileType log_type) { | |
296 | std::vector<std::string> all_files; | |
297 | const Status status = env_->GetChildren(path, &all_files); | |
298 | if (!status.ok()) { | |
299 | return status; | |
300 | } | |
301 | log_files.reserve(all_files.size()); | |
302 | for (const auto& f : all_files) { | |
303 | uint64_t number; | |
304 | FileType type; | |
305 | if (ParseFileName(f, &number, &type) && type == kLogFile) { | |
306 | SequenceNumber sequence; | |
307 | Status s = ReadFirstRecord(log_type, number, &sequence); | |
308 | if (!s.ok()) { | |
309 | return s; | |
310 | } | |
311 | if (sequence == 0) { | |
312 | // empty file | |
313 | continue; | |
314 | } | |
315 | ||
316 | // Reproduce the race condition where a log file is moved | |
317 | // to archived dir, between these two sync points, used in | |
318 | // (DBTest,TransactionLogIteratorRace) | |
319 | TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1"); | |
320 | TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2"); | |
321 | ||
322 | uint64_t size_bytes; | |
323 | s = env_->GetFileSize(LogFileName(path, number), &size_bytes); | |
324 | // re-try in case the alive log file has been moved to archive. | |
325 | std::string archived_file = ArchivedLogFileName(path, number); | |
326 | if (!s.ok() && log_type == kAliveLogFile && | |
327 | env_->FileExists(archived_file).ok()) { | |
328 | s = env_->GetFileSize(archived_file, &size_bytes); | |
329 | if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) { | |
330 | // oops, the file just got deleted from archived dir! move on | |
331 | s = Status::OK(); | |
332 | continue; | |
333 | } | |
334 | } | |
335 | if (!s.ok()) { | |
336 | return s; | |
337 | } | |
338 | ||
339 | log_files.push_back(std::unique_ptr<LogFile>( | |
340 | new LogFileImpl(number, log_type, sequence, size_bytes))); | |
341 | } | |
342 | } | |
343 | CompareLogByPointer compare_log_files; | |
344 | std::sort(log_files.begin(), log_files.end(), compare_log_files); | |
345 | return status; | |
346 | } | |
347 | ||
348 | Status WalManager::RetainProbableWalFiles(VectorLogPtr& all_logs, | |
349 | const SequenceNumber target) { | |
350 | int64_t start = 0; // signed to avoid overflow when target is < first file. | |
351 | int64_t end = static_cast<int64_t>(all_logs.size()) - 1; | |
352 | // Binary Search. avoid opening all files. | |
353 | while (end >= start) { | |
354 | int64_t mid = start + (end - start) / 2; // Avoid overflow. | |
11fdf7f2 | 355 | SequenceNumber current_seq_num = all_logs.at(static_cast<size_t>(mid))->StartSequence(); |
7c673cae FG |
356 | if (current_seq_num == target) { |
357 | end = mid; | |
358 | break; | |
359 | } else if (current_seq_num < target) { | |
360 | start = mid + 1; | |
361 | } else { | |
362 | end = mid - 1; | |
363 | } | |
364 | } | |
365 | // end could be -ve. | |
11fdf7f2 | 366 | size_t start_index = static_cast<size_t>(std::max(static_cast<int64_t>(0), end)); |
7c673cae FG |
367 | // The last wal file is always included |
368 | all_logs.erase(all_logs.begin(), all_logs.begin() + start_index); | |
369 | return Status::OK(); | |
370 | } | |
371 | ||
372 | Status WalManager::ReadFirstRecord(const WalFileType type, | |
373 | const uint64_t number, | |
374 | SequenceNumber* sequence) { | |
375 | *sequence = 0; | |
376 | if (type != kAliveLogFile && type != kArchivedLogFile) { | |
377 | ROCKS_LOG_ERROR(db_options_.info_log, "[WalManger] Unknown file type %s", | |
378 | ToString(type).c_str()); | |
379 | return Status::NotSupported( | |
380 | "File Type Not Known " + ToString(type)); | |
381 | } | |
382 | { | |
383 | MutexLock l(&read_first_record_cache_mutex_); | |
384 | auto itr = read_first_record_cache_.find(number); | |
385 | if (itr != read_first_record_cache_.end()) { | |
386 | *sequence = itr->second; | |
387 | return Status::OK(); | |
388 | } | |
389 | } | |
390 | Status s; | |
391 | if (type == kAliveLogFile) { | |
392 | std::string fname = LogFileName(db_options_.wal_dir, number); | |
393 | s = ReadFirstLine(fname, number, sequence); | |
394 | if (env_->FileExists(fname).ok() && !s.ok()) { | |
395 | // return any error that is not caused by non-existing file | |
396 | return s; | |
397 | } | |
398 | } | |
399 | ||
400 | if (type == kArchivedLogFile || !s.ok()) { | |
401 | // check if the file got moved to archive. | |
402 | std::string archived_file = | |
403 | ArchivedLogFileName(db_options_.wal_dir, number); | |
404 | s = ReadFirstLine(archived_file, number, sequence); | |
405 | // maybe the file was deleted from archive dir. If that's the case, return | |
406 | // Status::OK(). The caller with identify this as empty file because | |
407 | // *sequence == 0 | |
408 | if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) { | |
409 | return Status::OK(); | |
410 | } | |
411 | } | |
412 | ||
413 | if (s.ok() && *sequence != 0) { | |
414 | MutexLock l(&read_first_record_cache_mutex_); | |
415 | read_first_record_cache_.insert({number, *sequence}); | |
416 | } | |
417 | return s; | |
418 | } | |
419 | ||
420 | // the function returns status.ok() and sequence == 0 if the file exists, but is | |
421 | // empty | |
422 | Status WalManager::ReadFirstLine(const std::string& fname, | |
423 | const uint64_t number, | |
424 | SequenceNumber* sequence) { | |
425 | struct LogReporter : public log::Reader::Reporter { | |
426 | Env* env; | |
427 | Logger* info_log; | |
428 | const char* fname; | |
429 | ||
430 | Status* status; | |
431 | bool ignore_error; // true if db_options_.paranoid_checks==false | |
432 | virtual void Corruption(size_t bytes, const Status& s) override { | |
433 | ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s", | |
434 | (this->ignore_error ? "(ignoring error) " : ""), fname, | |
435 | static_cast<int>(bytes), s.ToString().c_str()); | |
436 | if (this->status->ok()) { | |
437 | // only keep the first error | |
438 | *this->status = s; | |
439 | } | |
440 | } | |
441 | }; | |
442 | ||
443 | std::unique_ptr<SequentialFile> file; | |
11fdf7f2 TL |
444 | Status status = env_->NewSequentialFile( |
445 | fname, &file, env_->OptimizeForLogRead(env_options_)); | |
7c673cae | 446 | unique_ptr<SequentialFileReader> file_reader( |
11fdf7f2 | 447 | new SequentialFileReader(std::move(file), fname)); |
7c673cae FG |
448 | |
449 | if (!status.ok()) { | |
450 | return status; | |
451 | } | |
452 | ||
453 | LogReporter reporter; | |
454 | reporter.env = env_; | |
455 | reporter.info_log = db_options_.info_log.get(); | |
456 | reporter.fname = fname.c_str(); | |
457 | reporter.status = &status; | |
458 | reporter.ignore_error = !db_options_.paranoid_checks; | |
459 | log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter, | |
11fdf7f2 | 460 | true /*checksum*/, number); |
7c673cae FG |
461 | std::string scratch; |
462 | Slice record; | |
463 | ||
464 | if (reader.ReadRecord(&record, &scratch) && | |
465 | (status.ok() || !db_options_.paranoid_checks)) { | |
466 | if (record.size() < WriteBatchInternal::kHeader) { | |
467 | reporter.Corruption(record.size(), | |
468 | Status::Corruption("log record too small")); | |
469 | // TODO read record's till the first no corrupt entry? | |
470 | } else { | |
471 | WriteBatch batch; | |
472 | WriteBatchInternal::SetContents(&batch, record); | |
473 | *sequence = WriteBatchInternal::Sequence(&batch); | |
474 | return Status::OK(); | |
475 | } | |
476 | } | |
477 | ||
478 | // ReadRecord returns false on EOF, which means that the log file is empty. we | |
479 | // return status.ok() in that case and set sequence number to 0 | |
480 | *sequence = 0; | |
481 | return status; | |
482 | } | |
483 | ||
484 | #endif // ROCKSDB_LITE | |
485 | } // namespace rocksdb |