]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/wal_manager.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / wal_manager.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "db/wal_manager.h"
11
12#ifndef __STDC_FORMAT_MACROS
13#define __STDC_FORMAT_MACROS
14#endif
15
16#include <inttypes.h>
17#include <algorithm>
18#include <vector>
19#include <memory>
20
21#include "db/log_reader.h"
22#include "db/log_writer.h"
23#include "db/transaction_log_impl.h"
24#include "db/write_batch_internal.h"
25#include "port/port.h"
26#include "rocksdb/env.h"
27#include "rocksdb/options.h"
28#include "rocksdb/write_batch.h"
11fdf7f2 29#include "util/cast_util.h"
7c673cae
FG
30#include "util/coding.h"
31#include "util/file_reader_writer.h"
32#include "util/filename.h"
33#include "util/logging.h"
34#include "util/mutexlock.h"
35#include "util/string_util.h"
36#include "util/sync_point.h"
37
38namespace rocksdb {
39
40#ifndef ROCKSDB_LITE
41
11fdf7f2
TL
42Status WalManager::DeleteFile(const std::string& fname, uint64_t number) {
43 auto s = env_->DeleteFile(db_options_.wal_dir + "/" + fname);
44 if (s.ok()) {
45 MutexLock l(&read_first_record_cache_mutex_);
46 read_first_record_cache_.erase(number);
47 }
48 return s;
49}
50
7c673cae
FG
51Status WalManager::GetSortedWalFiles(VectorLogPtr& files) {
52 // First get sorted files in db dir, then get sorted files from archived
53 // dir, to avoid a race condition where a log file is moved to archived
54 // dir in between.
55 Status s;
56 // list wal files in main db dir.
57 VectorLogPtr logs;
58 s = GetSortedWalsOfType(db_options_.wal_dir, logs, kAliveLogFile);
59 if (!s.ok()) {
60 return s;
61 }
62
63 // Reproduce the race condition where a log file is moved
64 // to archived dir, between these two sync points, used in
65 // (DBTest,TransactionLogIteratorRace)
66 TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1");
67 TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2");
68
69 files.clear();
70 // list wal files in archive dir.
71 std::string archivedir = ArchivalDirectory(db_options_.wal_dir);
72 Status exists = env_->FileExists(archivedir);
73 if (exists.ok()) {
74 s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
75 if (!s.ok()) {
76 return s;
77 }
78 } else if (!exists.IsNotFound()) {
79 assert(s.IsIOError());
80 return s;
81 }
82
83 uint64_t latest_archived_log_number = 0;
84 if (!files.empty()) {
85 latest_archived_log_number = files.back()->LogNumber();
86 ROCKS_LOG_INFO(db_options_.info_log, "Latest Archived log: %" PRIu64,
87 latest_archived_log_number);
88 }
89
90 files.reserve(files.size() + logs.size());
91 for (auto& log : logs) {
92 if (log->LogNumber() > latest_archived_log_number) {
93 files.push_back(std::move(log));
94 } else {
95 // When the race condition happens, we could see the
96 // same log in both db dir and archived dir. Simply
97 // ignore the one in db dir. Note that, if we read
98 // archived dir first, we would have missed the log file.
99 ROCKS_LOG_WARN(db_options_.info_log, "%s already moved to archive",
100 log->PathName().c_str());
101 }
102 }
103
104 return s;
105}
106
107Status WalManager::GetUpdatesSince(
108 SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
109 const TransactionLogIterator::ReadOptions& read_options,
110 VersionSet* version_set) {
111
112 // Get all sorted Wal Files.
113 // Do binary search and open files and find the seq number.
114
115 std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
116 Status s = GetSortedWalFiles(*wal_files);
117 if (!s.ok()) {
118 return s;
119 }
120
121 s = RetainProbableWalFiles(*wal_files, seq);
122 if (!s.ok()) {
123 return s;
124 }
125 iter->reset(new TransactionLogIteratorImpl(
126 db_options_.wal_dir, &db_options_, read_options, env_options_, seq,
11fdf7f2 127 std::move(wal_files), version_set, seq_per_batch_));
7c673cae
FG
128 return (*iter)->status();
129}
130
131// 1. Go through all archived files and
132// a. if ttl is enabled, delete outdated files
133// b. if archive size limit is enabled, delete empty files,
134// compute file number and size.
135// 2. If size limit is enabled:
136// a. compute how many files should be deleted
137// b. get sorted non-empty archived logs
138// c. delete what should be deleted
139void WalManager::PurgeObsoleteWALFiles() {
140 bool const ttl_enabled = db_options_.wal_ttl_seconds > 0;
141 bool const size_limit_enabled = db_options_.wal_size_limit_mb > 0;
142 if (!ttl_enabled && !size_limit_enabled) {
143 return;
144 }
145
146 int64_t current_time;
147 Status s = env_->GetCurrentTime(&current_time);
148 if (!s.ok()) {
149 ROCKS_LOG_ERROR(db_options_.info_log, "Can't get current time: %s",
150 s.ToString().c_str());
151 assert(false);
152 return;
153 }
154 uint64_t const now_seconds = static_cast<uint64_t>(current_time);
155 uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled)
156 ? db_options_.wal_ttl_seconds / 2
157 : kDefaultIntervalToDeleteObsoleteWAL;
158
159 if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
160 return;
161 }
162
163 purge_wal_files_last_run_ = now_seconds;
164
165 std::string archival_dir = ArchivalDirectory(db_options_.wal_dir);
166 std::vector<std::string> files;
167 s = env_->GetChildren(archival_dir, &files);
168 if (!s.ok()) {
169 ROCKS_LOG_ERROR(db_options_.info_log, "Can't get archive files: %s",
170 s.ToString().c_str());
171 assert(false);
172 return;
173 }
174
175 size_t log_files_num = 0;
176 uint64_t log_file_size = 0;
177
178 for (auto& f : files) {
179 uint64_t number;
180 FileType type;
181 if (ParseFileName(f, &number, &type) && type == kLogFile) {
182 std::string const file_path = archival_dir + "/" + f;
183 if (ttl_enabled) {
184 uint64_t file_m_time;
185 s = env_->GetFileModificationTime(file_path, &file_m_time);
186 if (!s.ok()) {
187 ROCKS_LOG_WARN(db_options_.info_log,
188 "Can't get file mod time: %s: %s", file_path.c_str(),
189 s.ToString().c_str());
190 continue;
191 }
192 if (now_seconds - file_m_time > db_options_.wal_ttl_seconds) {
193 s = env_->DeleteFile(file_path);
194 if (!s.ok()) {
195 ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s",
196 file_path.c_str(), s.ToString().c_str());
197 continue;
198 } else {
199 MutexLock l(&read_first_record_cache_mutex_);
200 read_first_record_cache_.erase(number);
201 }
202 continue;
203 }
204 }
205
206 if (size_limit_enabled) {
207 uint64_t file_size;
208 s = env_->GetFileSize(file_path, &file_size);
209 if (!s.ok()) {
210 ROCKS_LOG_ERROR(db_options_.info_log,
211 "Unable to get file size: %s: %s", file_path.c_str(),
212 s.ToString().c_str());
213 return;
214 } else {
215 if (file_size > 0) {
216 log_file_size = std::max(log_file_size, file_size);
217 ++log_files_num;
218 } else {
219 s = env_->DeleteFile(file_path);
220 if (!s.ok()) {
221 ROCKS_LOG_WARN(db_options_.info_log,
222 "Unable to delete file: %s: %s", file_path.c_str(),
223 s.ToString().c_str());
224 continue;
225 } else {
226 MutexLock l(&read_first_record_cache_mutex_);
227 read_first_record_cache_.erase(number);
228 }
229 }
230 }
231 }
232 }
233 }
234
235 if (0 == log_files_num || !size_limit_enabled) {
236 return;
237 }
238
239 size_t const files_keep_num =
11fdf7f2 240 static_cast<size_t>(db_options_.wal_size_limit_mb * 1024 * 1024 / log_file_size);
7c673cae
FG
241 if (log_files_num <= files_keep_num) {
242 return;
243 }
244
245 size_t files_del_num = log_files_num - files_keep_num;
246 VectorLogPtr archived_logs;
247 GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);
248
249 if (files_del_num > archived_logs.size()) {
250 ROCKS_LOG_WARN(db_options_.info_log,
251 "Trying to delete more archived log files than "
252 "exist. Deleting all");
253 files_del_num = archived_logs.size();
254 }
255
256 for (size_t i = 0; i < files_del_num; ++i) {
257 std::string const file_path = archived_logs[i]->PathName();
258 s = env_->DeleteFile(db_options_.wal_dir + "/" + file_path);
259 if (!s.ok()) {
260 ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
261 file_path.c_str(), s.ToString().c_str());
262 continue;
263 } else {
264 MutexLock l(&read_first_record_cache_mutex_);
265 read_first_record_cache_.erase(archived_logs[i]->LogNumber());
266 }
267 }
268}
269
270void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
271 auto archived_log_name = ArchivedLogFileName(db_options_.wal_dir, number);
272 // The sync point below is used in (DBTest,TransactionLogIteratorRace)
273 TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1");
274 Status s = env_->RenameFile(fname, archived_log_name);
275 // The sync point below is used in (DBTest,TransactionLogIteratorRace)
276 TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2");
277 ROCKS_LOG_INFO(db_options_.info_log, "Move log file %s to %s -- %s\n",
278 fname.c_str(), archived_log_name.c_str(),
279 s.ToString().c_str());
280}
281
282namespace {
283struct CompareLogByPointer {
284 bool operator()(const std::unique_ptr<LogFile>& a,
285 const std::unique_ptr<LogFile>& b) {
11fdf7f2
TL
286 LogFileImpl* a_impl = static_cast_with_check<LogFileImpl, LogFile>(a.get());
287 LogFileImpl* b_impl = static_cast_with_check<LogFileImpl, LogFile>(b.get());
7c673cae
FG
288 return *a_impl < *b_impl;
289 }
290};
291}
292
293Status WalManager::GetSortedWalsOfType(const std::string& path,
294 VectorLogPtr& log_files,
295 WalFileType log_type) {
296 std::vector<std::string> all_files;
297 const Status status = env_->GetChildren(path, &all_files);
298 if (!status.ok()) {
299 return status;
300 }
301 log_files.reserve(all_files.size());
302 for (const auto& f : all_files) {
303 uint64_t number;
304 FileType type;
305 if (ParseFileName(f, &number, &type) && type == kLogFile) {
306 SequenceNumber sequence;
307 Status s = ReadFirstRecord(log_type, number, &sequence);
308 if (!s.ok()) {
309 return s;
310 }
311 if (sequence == 0) {
312 // empty file
313 continue;
314 }
315
316 // Reproduce the race condition where a log file is moved
317 // to archived dir, between these two sync points, used in
318 // (DBTest,TransactionLogIteratorRace)
319 TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1");
320 TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2");
321
322 uint64_t size_bytes;
323 s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
324 // re-try in case the alive log file has been moved to archive.
325 std::string archived_file = ArchivedLogFileName(path, number);
326 if (!s.ok() && log_type == kAliveLogFile &&
327 env_->FileExists(archived_file).ok()) {
328 s = env_->GetFileSize(archived_file, &size_bytes);
329 if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
330 // oops, the file just got deleted from archived dir! move on
331 s = Status::OK();
332 continue;
333 }
334 }
335 if (!s.ok()) {
336 return s;
337 }
338
339 log_files.push_back(std::unique_ptr<LogFile>(
340 new LogFileImpl(number, log_type, sequence, size_bytes)));
341 }
342 }
343 CompareLogByPointer compare_log_files;
344 std::sort(log_files.begin(), log_files.end(), compare_log_files);
345 return status;
346}
347
348Status WalManager::RetainProbableWalFiles(VectorLogPtr& all_logs,
349 const SequenceNumber target) {
350 int64_t start = 0; // signed to avoid overflow when target is < first file.
351 int64_t end = static_cast<int64_t>(all_logs.size()) - 1;
352 // Binary Search. avoid opening all files.
353 while (end >= start) {
354 int64_t mid = start + (end - start) / 2; // Avoid overflow.
11fdf7f2 355 SequenceNumber current_seq_num = all_logs.at(static_cast<size_t>(mid))->StartSequence();
7c673cae
FG
356 if (current_seq_num == target) {
357 end = mid;
358 break;
359 } else if (current_seq_num < target) {
360 start = mid + 1;
361 } else {
362 end = mid - 1;
363 }
364 }
365 // end could be -ve.
11fdf7f2 366 size_t start_index = static_cast<size_t>(std::max(static_cast<int64_t>(0), end));
7c673cae
FG
367 // The last wal file is always included
368 all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
369 return Status::OK();
370}
371
372Status WalManager::ReadFirstRecord(const WalFileType type,
373 const uint64_t number,
374 SequenceNumber* sequence) {
375 *sequence = 0;
376 if (type != kAliveLogFile && type != kArchivedLogFile) {
377 ROCKS_LOG_ERROR(db_options_.info_log, "[WalManger] Unknown file type %s",
378 ToString(type).c_str());
379 return Status::NotSupported(
380 "File Type Not Known " + ToString(type));
381 }
382 {
383 MutexLock l(&read_first_record_cache_mutex_);
384 auto itr = read_first_record_cache_.find(number);
385 if (itr != read_first_record_cache_.end()) {
386 *sequence = itr->second;
387 return Status::OK();
388 }
389 }
390 Status s;
391 if (type == kAliveLogFile) {
392 std::string fname = LogFileName(db_options_.wal_dir, number);
393 s = ReadFirstLine(fname, number, sequence);
394 if (env_->FileExists(fname).ok() && !s.ok()) {
395 // return any error that is not caused by non-existing file
396 return s;
397 }
398 }
399
400 if (type == kArchivedLogFile || !s.ok()) {
401 // check if the file got moved to archive.
402 std::string archived_file =
403 ArchivedLogFileName(db_options_.wal_dir, number);
404 s = ReadFirstLine(archived_file, number, sequence);
405 // maybe the file was deleted from archive dir. If that's the case, return
406 // Status::OK(). The caller with identify this as empty file because
407 // *sequence == 0
408 if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
409 return Status::OK();
410 }
411 }
412
413 if (s.ok() && *sequence != 0) {
414 MutexLock l(&read_first_record_cache_mutex_);
415 read_first_record_cache_.insert({number, *sequence});
416 }
417 return s;
418}
419
420// the function returns status.ok() and sequence == 0 if the file exists, but is
421// empty
422Status WalManager::ReadFirstLine(const std::string& fname,
423 const uint64_t number,
424 SequenceNumber* sequence) {
425 struct LogReporter : public log::Reader::Reporter {
426 Env* env;
427 Logger* info_log;
428 const char* fname;
429
430 Status* status;
431 bool ignore_error; // true if db_options_.paranoid_checks==false
432 virtual void Corruption(size_t bytes, const Status& s) override {
433 ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s",
434 (this->ignore_error ? "(ignoring error) " : ""), fname,
435 static_cast<int>(bytes), s.ToString().c_str());
436 if (this->status->ok()) {
437 // only keep the first error
438 *this->status = s;
439 }
440 }
441 };
442
443 std::unique_ptr<SequentialFile> file;
11fdf7f2
TL
444 Status status = env_->NewSequentialFile(
445 fname, &file, env_->OptimizeForLogRead(env_options_));
7c673cae 446 unique_ptr<SequentialFileReader> file_reader(
11fdf7f2 447 new SequentialFileReader(std::move(file), fname));
7c673cae
FG
448
449 if (!status.ok()) {
450 return status;
451 }
452
453 LogReporter reporter;
454 reporter.env = env_;
455 reporter.info_log = db_options_.info_log.get();
456 reporter.fname = fname.c_str();
457 reporter.status = &status;
458 reporter.ignore_error = !db_options_.paranoid_checks;
459 log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
11fdf7f2 460 true /*checksum*/, number);
7c673cae
FG
461 std::string scratch;
462 Slice record;
463
464 if (reader.ReadRecord(&record, &scratch) &&
465 (status.ok() || !db_options_.paranoid_checks)) {
466 if (record.size() < WriteBatchInternal::kHeader) {
467 reporter.Corruption(record.size(),
468 Status::Corruption("log record too small"));
469 // TODO read record's till the first no corrupt entry?
470 } else {
471 WriteBatch batch;
472 WriteBatchInternal::SetContents(&batch, record);
473 *sequence = WriteBatchInternal::Sequence(&batch);
474 return Status::OK();
475 }
476 }
477
478 // ReadRecord returns false on EOF, which means that the log file is empty. we
479 // return status.ok() in that case and set sequence number to 0
480 *sequence = 0;
481 return status;
482}
483
484#endif // ROCKSDB_LITE
485} // namespace rocksdb