]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/wal_manager.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / wal_manager.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "db/wal_manager.h"
11
12#ifndef __STDC_FORMAT_MACROS
13#define __STDC_FORMAT_MACROS
14#endif
15
16#include <inttypes.h>
17#include <algorithm>
18#include <vector>
19#include <memory>
20
21#include "db/log_reader.h"
22#include "db/log_writer.h"
23#include "db/transaction_log_impl.h"
24#include "db/write_batch_internal.h"
25#include "port/port.h"
26#include "rocksdb/env.h"
27#include "rocksdb/options.h"
28#include "rocksdb/write_batch.h"
11fdf7f2 29#include "util/cast_util.h"
7c673cae
FG
30#include "util/coding.h"
31#include "util/file_reader_writer.h"
494da23a 32#include "util/file_util.h"
7c673cae
FG
33#include "util/filename.h"
34#include "util/logging.h"
35#include "util/mutexlock.h"
36#include "util/string_util.h"
37#include "util/sync_point.h"
38
39namespace rocksdb {
40
41#ifndef ROCKSDB_LITE
42
11fdf7f2
TL
43Status WalManager::DeleteFile(const std::string& fname, uint64_t number) {
44 auto s = env_->DeleteFile(db_options_.wal_dir + "/" + fname);
45 if (s.ok()) {
46 MutexLock l(&read_first_record_cache_mutex_);
47 read_first_record_cache_.erase(number);
48 }
49 return s;
50}
51
7c673cae
FG
52Status WalManager::GetSortedWalFiles(VectorLogPtr& files) {
53 // First get sorted files in db dir, then get sorted files from archived
54 // dir, to avoid a race condition where a log file is moved to archived
55 // dir in between.
56 Status s;
57 // list wal files in main db dir.
58 VectorLogPtr logs;
59 s = GetSortedWalsOfType(db_options_.wal_dir, logs, kAliveLogFile);
60 if (!s.ok()) {
61 return s;
62 }
63
64 // Reproduce the race condition where a log file is moved
65 // to archived dir, between these two sync points, used in
66 // (DBTest,TransactionLogIteratorRace)
67 TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1");
68 TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2");
69
70 files.clear();
71 // list wal files in archive dir.
72 std::string archivedir = ArchivalDirectory(db_options_.wal_dir);
73 Status exists = env_->FileExists(archivedir);
74 if (exists.ok()) {
75 s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
76 if (!s.ok()) {
77 return s;
78 }
79 } else if (!exists.IsNotFound()) {
80 assert(s.IsIOError());
81 return s;
82 }
83
84 uint64_t latest_archived_log_number = 0;
85 if (!files.empty()) {
86 latest_archived_log_number = files.back()->LogNumber();
87 ROCKS_LOG_INFO(db_options_.info_log, "Latest Archived log: %" PRIu64,
88 latest_archived_log_number);
89 }
90
91 files.reserve(files.size() + logs.size());
92 for (auto& log : logs) {
93 if (log->LogNumber() > latest_archived_log_number) {
94 files.push_back(std::move(log));
95 } else {
96 // When the race condition happens, we could see the
97 // same log in both db dir and archived dir. Simply
98 // ignore the one in db dir. Note that, if we read
99 // archived dir first, we would have missed the log file.
100 ROCKS_LOG_WARN(db_options_.info_log, "%s already moved to archive",
101 log->PathName().c_str());
102 }
103 }
104
105 return s;
106}
107
108Status WalManager::GetUpdatesSince(
109 SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
110 const TransactionLogIterator::ReadOptions& read_options,
111 VersionSet* version_set) {
112
113 // Get all sorted Wal Files.
114 // Do binary search and open files and find the seq number.
115
116 std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
117 Status s = GetSortedWalFiles(*wal_files);
118 if (!s.ok()) {
119 return s;
120 }
121
122 s = RetainProbableWalFiles(*wal_files, seq);
123 if (!s.ok()) {
124 return s;
125 }
126 iter->reset(new TransactionLogIteratorImpl(
127 db_options_.wal_dir, &db_options_, read_options, env_options_, seq,
11fdf7f2 128 std::move(wal_files), version_set, seq_per_batch_));
7c673cae
FG
129 return (*iter)->status();
130}
131
132// 1. Go through all archived files and
133// a. if ttl is enabled, delete outdated files
134// b. if archive size limit is enabled, delete empty files,
135// compute file number and size.
136// 2. If size limit is enabled:
137// a. compute how many files should be deleted
138// b. get sorted non-empty archived logs
139// c. delete what should be deleted
140void WalManager::PurgeObsoleteWALFiles() {
141 bool const ttl_enabled = db_options_.wal_ttl_seconds > 0;
142 bool const size_limit_enabled = db_options_.wal_size_limit_mb > 0;
143 if (!ttl_enabled && !size_limit_enabled) {
144 return;
145 }
146
147 int64_t current_time;
148 Status s = env_->GetCurrentTime(&current_time);
149 if (!s.ok()) {
150 ROCKS_LOG_ERROR(db_options_.info_log, "Can't get current time: %s",
151 s.ToString().c_str());
152 assert(false);
153 return;
154 }
155 uint64_t const now_seconds = static_cast<uint64_t>(current_time);
156 uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled)
157 ? db_options_.wal_ttl_seconds / 2
158 : kDefaultIntervalToDeleteObsoleteWAL;
159
160 if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
161 return;
162 }
163
164 purge_wal_files_last_run_ = now_seconds;
165
166 std::string archival_dir = ArchivalDirectory(db_options_.wal_dir);
167 std::vector<std::string> files;
168 s = env_->GetChildren(archival_dir, &files);
169 if (!s.ok()) {
170 ROCKS_LOG_ERROR(db_options_.info_log, "Can't get archive files: %s",
171 s.ToString().c_str());
172 assert(false);
173 return;
174 }
175
176 size_t log_files_num = 0;
177 uint64_t log_file_size = 0;
178
179 for (auto& f : files) {
180 uint64_t number;
181 FileType type;
182 if (ParseFileName(f, &number, &type) && type == kLogFile) {
183 std::string const file_path = archival_dir + "/" + f;
184 if (ttl_enabled) {
185 uint64_t file_m_time;
186 s = env_->GetFileModificationTime(file_path, &file_m_time);
187 if (!s.ok()) {
188 ROCKS_LOG_WARN(db_options_.info_log,
189 "Can't get file mod time: %s: %s", file_path.c_str(),
190 s.ToString().c_str());
191 continue;
192 }
193 if (now_seconds - file_m_time > db_options_.wal_ttl_seconds) {
494da23a 194 s = DeleteDBFile(&db_options_, file_path, archival_dir, false);
7c673cae
FG
195 if (!s.ok()) {
196 ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s",
197 file_path.c_str(), s.ToString().c_str());
198 continue;
199 } else {
200 MutexLock l(&read_first_record_cache_mutex_);
201 read_first_record_cache_.erase(number);
202 }
203 continue;
204 }
205 }
206
207 if (size_limit_enabled) {
208 uint64_t file_size;
209 s = env_->GetFileSize(file_path, &file_size);
210 if (!s.ok()) {
211 ROCKS_LOG_ERROR(db_options_.info_log,
212 "Unable to get file size: %s: %s", file_path.c_str(),
213 s.ToString().c_str());
214 return;
215 } else {
216 if (file_size > 0) {
217 log_file_size = std::max(log_file_size, file_size);
218 ++log_files_num;
219 } else {
494da23a 220 s = DeleteDBFile(&db_options_, file_path, archival_dir, false);
7c673cae
FG
221 if (!s.ok()) {
222 ROCKS_LOG_WARN(db_options_.info_log,
223 "Unable to delete file: %s: %s", file_path.c_str(),
224 s.ToString().c_str());
225 continue;
226 } else {
227 MutexLock l(&read_first_record_cache_mutex_);
228 read_first_record_cache_.erase(number);
229 }
230 }
231 }
232 }
233 }
234 }
235
236 if (0 == log_files_num || !size_limit_enabled) {
237 return;
238 }
239
240 size_t const files_keep_num =
11fdf7f2 241 static_cast<size_t>(db_options_.wal_size_limit_mb * 1024 * 1024 / log_file_size);
7c673cae
FG
242 if (log_files_num <= files_keep_num) {
243 return;
244 }
245
246 size_t files_del_num = log_files_num - files_keep_num;
247 VectorLogPtr archived_logs;
248 GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);
249
250 if (files_del_num > archived_logs.size()) {
251 ROCKS_LOG_WARN(db_options_.info_log,
252 "Trying to delete more archived log files than "
253 "exist. Deleting all");
254 files_del_num = archived_logs.size();
255 }
256
257 for (size_t i = 0; i < files_del_num; ++i) {
258 std::string const file_path = archived_logs[i]->PathName();
494da23a
TL
259 s = DeleteDBFile(&db_options_, db_options_.wal_dir + "/" + file_path,
260 db_options_.wal_dir, false);
7c673cae
FG
261 if (!s.ok()) {
262 ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
263 file_path.c_str(), s.ToString().c_str());
264 continue;
265 } else {
266 MutexLock l(&read_first_record_cache_mutex_);
267 read_first_record_cache_.erase(archived_logs[i]->LogNumber());
268 }
269 }
270}
271
272void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
273 auto archived_log_name = ArchivedLogFileName(db_options_.wal_dir, number);
274 // The sync point below is used in (DBTest,TransactionLogIteratorRace)
275 TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1");
276 Status s = env_->RenameFile(fname, archived_log_name);
277 // The sync point below is used in (DBTest,TransactionLogIteratorRace)
278 TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2");
279 ROCKS_LOG_INFO(db_options_.info_log, "Move log file %s to %s -- %s\n",
280 fname.c_str(), archived_log_name.c_str(),
281 s.ToString().c_str());
282}
283
284namespace {
285struct CompareLogByPointer {
286 bool operator()(const std::unique_ptr<LogFile>& a,
287 const std::unique_ptr<LogFile>& b) {
11fdf7f2
TL
288 LogFileImpl* a_impl = static_cast_with_check<LogFileImpl, LogFile>(a.get());
289 LogFileImpl* b_impl = static_cast_with_check<LogFileImpl, LogFile>(b.get());
7c673cae
FG
290 return *a_impl < *b_impl;
291 }
292};
293}
294
295Status WalManager::GetSortedWalsOfType(const std::string& path,
296 VectorLogPtr& log_files,
297 WalFileType log_type) {
298 std::vector<std::string> all_files;
299 const Status status = env_->GetChildren(path, &all_files);
300 if (!status.ok()) {
301 return status;
302 }
303 log_files.reserve(all_files.size());
304 for (const auto& f : all_files) {
305 uint64_t number;
306 FileType type;
307 if (ParseFileName(f, &number, &type) && type == kLogFile) {
308 SequenceNumber sequence;
309 Status s = ReadFirstRecord(log_type, number, &sequence);
310 if (!s.ok()) {
311 return s;
312 }
313 if (sequence == 0) {
314 // empty file
315 continue;
316 }
317
318 // Reproduce the race condition where a log file is moved
319 // to archived dir, between these two sync points, used in
320 // (DBTest,TransactionLogIteratorRace)
321 TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1");
322 TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2");
323
324 uint64_t size_bytes;
325 s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
326 // re-try in case the alive log file has been moved to archive.
327 std::string archived_file = ArchivedLogFileName(path, number);
328 if (!s.ok() && log_type == kAliveLogFile &&
329 env_->FileExists(archived_file).ok()) {
330 s = env_->GetFileSize(archived_file, &size_bytes);
331 if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
332 // oops, the file just got deleted from archived dir! move on
333 s = Status::OK();
334 continue;
335 }
336 }
337 if (!s.ok()) {
338 return s;
339 }
340
341 log_files.push_back(std::unique_ptr<LogFile>(
342 new LogFileImpl(number, log_type, sequence, size_bytes)));
343 }
344 }
345 CompareLogByPointer compare_log_files;
346 std::sort(log_files.begin(), log_files.end(), compare_log_files);
347 return status;
348}
349
350Status WalManager::RetainProbableWalFiles(VectorLogPtr& all_logs,
351 const SequenceNumber target) {
352 int64_t start = 0; // signed to avoid overflow when target is < first file.
353 int64_t end = static_cast<int64_t>(all_logs.size()) - 1;
354 // Binary Search. avoid opening all files.
355 while (end >= start) {
356 int64_t mid = start + (end - start) / 2; // Avoid overflow.
11fdf7f2 357 SequenceNumber current_seq_num = all_logs.at(static_cast<size_t>(mid))->StartSequence();
7c673cae
FG
358 if (current_seq_num == target) {
359 end = mid;
360 break;
361 } else if (current_seq_num < target) {
362 start = mid + 1;
363 } else {
364 end = mid - 1;
365 }
366 }
367 // end could be -ve.
11fdf7f2 368 size_t start_index = static_cast<size_t>(std::max(static_cast<int64_t>(0), end));
7c673cae
FG
369 // The last wal file is always included
370 all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
371 return Status::OK();
372}
373
374Status WalManager::ReadFirstRecord(const WalFileType type,
375 const uint64_t number,
376 SequenceNumber* sequence) {
377 *sequence = 0;
378 if (type != kAliveLogFile && type != kArchivedLogFile) {
379 ROCKS_LOG_ERROR(db_options_.info_log, "[WalManger] Unknown file type %s",
380 ToString(type).c_str());
381 return Status::NotSupported(
382 "File Type Not Known " + ToString(type));
383 }
384 {
385 MutexLock l(&read_first_record_cache_mutex_);
386 auto itr = read_first_record_cache_.find(number);
387 if (itr != read_first_record_cache_.end()) {
388 *sequence = itr->second;
389 return Status::OK();
390 }
391 }
392 Status s;
393 if (type == kAliveLogFile) {
394 std::string fname = LogFileName(db_options_.wal_dir, number);
395 s = ReadFirstLine(fname, number, sequence);
396 if (env_->FileExists(fname).ok() && !s.ok()) {
397 // return any error that is not caused by non-existing file
398 return s;
399 }
400 }
401
402 if (type == kArchivedLogFile || !s.ok()) {
403 // check if the file got moved to archive.
404 std::string archived_file =
405 ArchivedLogFileName(db_options_.wal_dir, number);
406 s = ReadFirstLine(archived_file, number, sequence);
407 // maybe the file was deleted from archive dir. If that's the case, return
408 // Status::OK(). The caller with identify this as empty file because
409 // *sequence == 0
410 if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
411 return Status::OK();
412 }
413 }
414
415 if (s.ok() && *sequence != 0) {
416 MutexLock l(&read_first_record_cache_mutex_);
417 read_first_record_cache_.insert({number, *sequence});
418 }
419 return s;
420}
421
422// the function returns status.ok() and sequence == 0 if the file exists, but is
423// empty
424Status WalManager::ReadFirstLine(const std::string& fname,
425 const uint64_t number,
426 SequenceNumber* sequence) {
427 struct LogReporter : public log::Reader::Reporter {
428 Env* env;
429 Logger* info_log;
430 const char* fname;
431
432 Status* status;
433 bool ignore_error; // true if db_options_.paranoid_checks==false
494da23a 434 void Corruption(size_t bytes, const Status& s) override {
7c673cae
FG
435 ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s",
436 (this->ignore_error ? "(ignoring error) " : ""), fname,
437 static_cast<int>(bytes), s.ToString().c_str());
438 if (this->status->ok()) {
439 // only keep the first error
440 *this->status = s;
441 }
442 }
443 };
444
445 std::unique_ptr<SequentialFile> file;
11fdf7f2
TL
446 Status status = env_->NewSequentialFile(
447 fname, &file, env_->OptimizeForLogRead(env_options_));
494da23a 448 std::unique_ptr<SequentialFileReader> file_reader(
11fdf7f2 449 new SequentialFileReader(std::move(file), fname));
7c673cae
FG
450
451 if (!status.ok()) {
452 return status;
453 }
454
455 LogReporter reporter;
456 reporter.env = env_;
457 reporter.info_log = db_options_.info_log.get();
458 reporter.fname = fname.c_str();
459 reporter.status = &status;
460 reporter.ignore_error = !db_options_.paranoid_checks;
461 log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
11fdf7f2 462 true /*checksum*/, number);
7c673cae
FG
463 std::string scratch;
464 Slice record;
465
466 if (reader.ReadRecord(&record, &scratch) &&
467 (status.ok() || !db_options_.paranoid_checks)) {
468 if (record.size() < WriteBatchInternal::kHeader) {
469 reporter.Corruption(record.size(),
470 Status::Corruption("log record too small"));
471 // TODO read record's till the first no corrupt entry?
472 } else {
473 WriteBatch batch;
474 WriteBatchInternal::SetContents(&batch, record);
475 *sequence = WriteBatchInternal::Sequence(&batch);
476 return Status::OK();
477 }
478 }
479
480 // ReadRecord returns false on EOF, which means that the log file is empty. we
481 // return status.ok() in that case and set sequence number to 0
482 *sequence = 0;
483 return status;
484}
485
486#endif // ROCKSDB_LITE
487} // namespace rocksdb