// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include "rocksdb/options.h"
#ifndef ROCKSDB_LITE
#include <fcntl.h>
#include "db/db_test_util.h"
#include "db/log_format.h"
#include "db/version_set.h"
-#include "env/composite_env_wrapper.h"
#include "file/filename.h"
+#include "port/stack_trace.h"
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/table.h"
+#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/write_batch.h"
#include "table/block_based/block_based_table_builder.h"
#include "table/meta_blocks.h"
namespace ROCKSDB_NAMESPACE {
static constexpr int kValueSize = 1000;
-
+namespace {
+// A wrapper that allows injection of errors.
+class ErrorEnv : public EnvWrapper {
+ public:
+ bool writable_file_error_;
+ int num_writable_file_errors_;
+
+ explicit ErrorEnv(Env* _target)
+ : EnvWrapper(_target),
+ writable_file_error_(false),
+ num_writable_file_errors_(0) {}
+ const char* Name() const override { return "ErrorEnv"; }
+
+ virtual Status NewWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& soptions) override {
+ result->reset();
+ if (writable_file_error_) {
+ ++num_writable_file_errors_;
+ return Status::IOError(fname, "fake error");
+ }
+ return target()->NewWritableFile(fname, result, soptions);
+ }
+};
+} // anonymous namespace
class CorruptionTest : public testing::Test {
public:
- test::ErrorEnv env_;
+ std::shared_ptr<Env> env_guard_;
+ ErrorEnv* env_;
std::string dbname_;
std::shared_ptr<Cache> tiny_cache_;
Options options_;
// set it to 0), test SequenceNumberRecovery will fail, likely because of a
// bug in recovery code. Keep it 4 for now to make the test passes.
tiny_cache_ = NewLRUCache(100, 4);
+ Env* base_env = Env::Default();
+ EXPECT_OK(
+ test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_));
+ EXPECT_NE(base_env, nullptr);
+ env_ = new ErrorEnv(base_env);
options_.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
- options_.env = &env_;
- dbname_ = test::PerThreadDBPath("corruption_test");
+ options_.env = env_;
+ dbname_ = test::PerThreadDBPath(env_, "corruption_test");
Status s = DestroyDB(dbname_, options_);
EXPECT_OK(s);
if (getenv("KEEP_DB")) {
fprintf(stdout, "db is still at %s\n", dbname_.c_str());
} else {
- EXPECT_OK(DestroyDB(dbname_, Options()));
+ Options opts;
+ opts.env = env_->target();
+ EXPECT_OK(DestroyDB(dbname_, opts));
}
+ delete env_;
}
void CloseDb() {
if (opt.env == Options().env) {
// If env is not overridden, replace it with ErrorEnv.
// Otherwise, the test already uses a non-default Env.
- opt.env = &env_;
+ opt.env = env_;
}
opt.arena_block_size = 4096;
BlockBasedTableOptions table_options;
return DB::Open(opt, dbname_, &db_);
}
- void Reopen(Options* options = nullptr) {
- ASSERT_OK(TryReopen(options));
- }
+ void Reopen(Options* options = nullptr) { ASSERT_OK(TryReopen(options)); }
void RepairDB() {
delete db_;
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
ASSERT_OK(dbi->TEST_FlushMemTable());
}
- //if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n);
+ // if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n);
Slice key = Key(i + start, &key_space);
batch.Clear();
ASSERT_OK(batch.Put(key, Value(i + start, &value_space)));
ASSERT_OK(iter->status());
uint64_t key;
Slice in(iter->key());
- if (!ConsumeDecimalNumber(&in, &key) ||
- !in.empty() ||
+ if (!ConsumeDecimalNumber(&in, &key) || !in.empty() ||
key < next_expected) {
bad_keys++;
continue;
iter->status().PermitUncheckedError();
delete iter;
- fprintf(stderr,
- "expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%llu\n",
- min_expected, max_expected, correct, bad_keys, bad_values,
- static_cast<unsigned long long>(missed));
+ fprintf(
+ stderr,
+ "expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%llu\n",
+ min_expected, max_expected, correct, bad_keys, bad_values,
+ static_cast<unsigned long long>(missed));
ASSERT_LE(min_expected, correct);
ASSERT_GE(max_expected, correct);
}
void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) {
// Pick file to corrupt
std::vector<std::string> filenames;
- ASSERT_OK(env_.GetChildren(dbname_, &filenames));
+ ASSERT_OK(env_->GetChildren(dbname_, &filenames));
uint64_t number;
FileType type;
std::string fname;
int picked_number = -1;
for (size_t i = 0; i < filenames.size(); i++) {
- if (ParseFileName(filenames[i], &number, &type) &&
- type == filetype &&
+ if (ParseFileName(filenames[i], &number, &type) && type == filetype &&
static_cast<int>(number) > picked_number) { // Pick latest file
fname = dbname_ + "/" + filenames[i];
picked_number = static_cast<int>(number);
}
ASSERT_TRUE(!fname.empty()) << filetype;
- ASSERT_OK(test::CorruptFile(&env_, fname, offset, bytes_to_corrupt));
+ ASSERT_OK(test::CorruptFile(env_, fname, offset, bytes_to_corrupt));
}
// corrupts exactly one file at level `level`. if no file found at level,
db_->GetLiveFilesMetaData(&metadata);
for (const auto& m : metadata) {
if (m.level == level) {
- ASSERT_OK(test::CorruptFile(&env_, dbname_ + "/" + m.name, offset,
+ ASSERT_OK(test::CorruptFile(env_, dbname_ + "/" + m.name, offset,
bytes_to_corrupt));
return;
}
FAIL() << "no file found at level";
}
-
int Property(const std::string& name) {
std::string property;
int result;
}
return Slice(*storage);
}
+
+ void GetSortedWalFiles(std::vector<uint64_t>& file_nums) {
+ std::vector<std::string> tmp_files;
+ ASSERT_OK(env_->GetChildren(dbname_, &tmp_files));
+ FileType type = kWalFile;
+ for (const auto& file : tmp_files) {
+ uint64_t number = 0;
+ if (ParseFileName(file, &number, &type) && type == kWalFile) {
+ file_nums.push_back(number);
+ }
+ }
+ std::sort(file_nums.begin(), file_nums.end());
+ }
+
+ void CorruptFileWithTruncation(FileType file, uint64_t number,
+ uint64_t bytes_to_truncate = 0) {
+ std::string path;
+ switch (file) {
+ case FileType::kWalFile:
+ path = LogFileName(dbname_, number);
+ break;
+ // TODO: Add other file types as this method is being used for those file
+ // types.
+ default:
+ return;
+ }
+ uint64_t old_size = 0;
+ ASSERT_OK(env_->GetFileSize(path, &old_size));
+ assert(old_size > bytes_to_truncate);
+ uint64_t new_size = old_size - bytes_to_truncate;
+ // If bytes_to_truncate == 0, it will do full truncation.
+ if (bytes_to_truncate == 0) {
+ new_size = 0;
+ }
+ ASSERT_OK(test::TruncateFile(env_, path, new_size));
+ }
};
TEST_F(CorruptionTest, Recovery) {
Check(36, 36);
}
+TEST_F(CorruptionTest, PostPITRCorruptionWALsRetained) {
+ // Repro for bug where WALs following the point-in-time recovery were not
+ // retained leading to the next recovery failing.
+ CloseDb();
+
+ options_.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
+
+ const std::string test_cf_name = "test_cf";
+ std::vector<ColumnFamilyDescriptor> cf_descs;
+ cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
+ cf_descs.emplace_back(test_cf_name, ColumnFamilyOptions());
+
+ uint64_t log_num;
+ {
+ options_.create_missing_column_families = true;
+ std::vector<ColumnFamilyHandle*> cfhs;
+ ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
+ assert(db_ != nullptr); // suppress false clang-analyze report
+
+ ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k", "v"));
+ ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k", "v"));
+ ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k2", "v2"));
+ std::vector<uint64_t> file_nums;
+ GetSortedWalFiles(file_nums);
+ log_num = file_nums.back();
+ for (auto* cfh : cfhs) {
+ delete cfh;
+ }
+ CloseDb();
+ }
+
+ CorruptFileWithTruncation(FileType::kWalFile, log_num,
+ /*bytes_to_truncate=*/1);
+
+ {
+ // Recover "k" -> "v" for both CFs. "k2" -> "v2" is lost due to truncation.
+ options_.avoid_flush_during_recovery = true;
+ std::vector<ColumnFamilyHandle*> cfhs;
+ ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
+ assert(db_ != nullptr); // suppress false clang-analyze report
+
+ // Flush one but not both CFs and write some data so there's a seqno gap
+ // between the PITR corruption and the next DB session's first WAL.
+ ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k2", "v2"));
+ ASSERT_OK(db_->Flush(FlushOptions(), cfhs[1]));
+
+ for (auto* cfh : cfhs) {
+ delete cfh;
+ }
+ CloseDb();
+ }
+
+ // With the bug, this DB open would remove the WALs following the PITR
+ // corruption. Then, the next recovery would fail.
+ for (int i = 0; i < 2; ++i) {
+ std::vector<ColumnFamilyHandle*> cfhs;
+ ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
+ assert(db_ != nullptr); // suppress false clang-analyze report
+
+ for (auto* cfh : cfhs) {
+ delete cfh;
+ }
+ CloseDb();
+ }
+}
+
TEST_F(CorruptionTest, RecoverWriteError) {
- env_.writable_file_error_ = true;
+ env_->writable_file_error_ = true;
Status s = TryReopen();
ASSERT_TRUE(!s.ok());
}
TEST_F(CorruptionTest, NewFileErrorDuringWrite) {
// Do enough writing to force minor compaction
- env_.writable_file_error_ = true;
+ env_->writable_file_error_ = true;
const int num =
static_cast<int>(3 + (Options().write_buffer_size / kValueSize));
std::string value_storage;
ASSERT_TRUE(!failed || !s.ok());
}
ASSERT_TRUE(!s.ok());
- ASSERT_GE(env_.num_writable_file_errors_, 1);
- env_.writable_file_error_ = false;
+ ASSERT_GE(env_->num_writable_file_errors_, 1);
+ env_->writable_file_error_ = false;
Reopen();
}
TEST_F(CorruptionTest, VerifyChecksumReadahead) {
Options options;
- SpecialEnv senv(Env::Default());
+ SpecialEnv senv(env_->target());
options.env = &senv;
// Disable block cache as we are going to check checksum for
// the same file twice and measure number of reads.
ASSERT_OK(db_->Put(WriteOptions(), "foo", "hello"));
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
ASSERT_OK(dbi->TEST_FlushMemTable());
- ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ ASSERT_OK(
+ dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
Corrupt(kDescriptorFile, 0, 1000);
Status s = TryReopen();
TEST_F(CorruptionTest, CompactionInputError) {
Options options;
+ options.env = env_;
Reopen(&options);
Build(10);
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
TEST_F(CorruptionTest, CompactionInputErrorParanoid) {
Options options;
+ options.env = env_;
options.paranoid_checks = true;
options.write_buffer_size = 131072;
options.max_write_buffer_number = 2;
ASSERT_EQ(static_cast<size_t>(1), metadata.size());
std::string filename = dbname_ + metadata[0].name;
- std::unique_ptr<RandomAccessFile> file;
- ASSERT_OK(options_.env->NewRandomAccessFile(filename, &file, EnvOptions()));
- std::unique_ptr<RandomAccessFileReader> file_reader(
- new RandomAccessFileReader(NewLegacyRandomAccessFileWrapper(file),
- filename));
+ FileOptions file_opts;
+ const auto& fs = options_.env->GetFileSystem();
+ std::unique_ptr<RandomAccessFileReader> file_reader;
+ ASSERT_OK(RandomAccessFileReader::Create(fs, filename, file_opts,
+ &file_reader, nullptr));
uint64_t file_size;
- ASSERT_OK(options_.env->GetFileSize(filename, &file_size));
+ ASSERT_OK(
+ fs->GetFileSize(filename, file_opts.io_options, &file_size, nullptr));
BlockHandle range_del_handle;
- ASSERT_OK(FindMetaBlock(
+ ASSERT_OK(FindMetaBlockInFile(
file_reader.get(), file_size, kBlockBasedTableMagicNumber,
- ImmutableCFOptions(options_), kRangeDelBlock, &range_del_handle));
+ ImmutableOptions(options_), kRangeDelBlockName, &range_del_handle));
ASSERT_OK(TryReopen());
- ASSERT_OK(test::CorruptFile(&env_, filename,
+ ASSERT_OK(test::CorruptFile(env_, filename,
static_cast<int>(range_del_handle.offset()), 1));
ASSERT_TRUE(TryReopen().IsCorruption());
}
TEST_F(CorruptionTest, FileSystemStateCorrupted) {
for (int iter = 0; iter < 2; ++iter) {
Options options;
+ options.env = env_;
options.paranoid_checks = true;
options.create_if_missing = true;
Reopen(&options);
if (iter == 0) { // corrupt file size
std::unique_ptr<WritableFile> file;
- env_.NewWritableFile(filename, &file, EnvOptions());
+ ASSERT_OK(env_->NewWritableFile(filename, &file, EnvOptions()));
ASSERT_OK(file->Append(Slice("corrupted sst")));
file.reset();
Status x = TryReopen(&options);
ASSERT_TRUE(x.IsCorruption());
} else { // delete the file
- ASSERT_OK(env_.DeleteFile(filename));
+ ASSERT_OK(env_->DeleteFile(filename));
Status x = TryReopen(&options);
ASSERT_TRUE(x.IsCorruption());
}
TEST_F(CorruptionTest, ParanoidFileChecksOnFlush) {
Options options;
+ options.env = env_;
options.check_flush_compaction_key_order = false;
options.paranoid_file_checks = true;
options.create_if_missing = true;
options.table_factory = mock;
mock->SetCorruptionMode(mode);
ASSERT_OK(DB::Open(options, dbname_, &db_));
- assert(db_ != nullptr);
+ assert(db_ != nullptr); // suppress false clang-analyze report
Build(10);
s = db_->Flush(FlushOptions());
if (mode == mock::MockTableFactory::kCorruptNone) {
TEST_F(CorruptionTest, ParanoidFileChecksOnCompact) {
Options options;
+ options.env = env_;
options.paranoid_file_checks = true;
options.create_if_missing = true;
options.check_flush_compaction_key_order = false;
delete db_;
db_ = nullptr;
s = DestroyDB(dbname_, options);
+ ASSERT_OK(s);
std::shared_ptr<mock::MockTableFactory> mock =
std::make_shared<mock::MockTableFactory>();
options.table_factory = mock;
ASSERT_OK(DB::Open(options, dbname_, &db_));
- assert(db_ != nullptr);
+ assert(db_ != nullptr); // suppress false clang-analyze report
Build(100, 2);
// ASSERT_OK(db_->Flush(FlushOptions()));
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
ASSERT_OK(dbi->TEST_FlushMemTable());
mock->SetCorruptionMode(mode);
- s = dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true);
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ s = dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr);
if (mode == mock::MockTableFactory::kCorruptNone) {
ASSERT_OK(s);
} else {
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeFirst) {
Options options;
+ options.env = env_;
options.check_flush_compaction_key_order = false;
options.paranoid_file_checks = true;
options.create_if_missing = true;
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_OK(DB::Open(options, dbname_, &db_));
std::string start, end;
- assert(db_ != nullptr);
+ assert(db_ != nullptr); // suppress false clang-analyze report
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(3, &start), Key(7, &end)));
auto snap = db_->GetSnapshot();
} else {
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
ASSERT_OK(dbi->TEST_FlushMemTable());
- ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ ASSERT_OK(
+ dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
}
db_->ReleaseSnapshot(snap);
}
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRange) {
Options options;
+ options.env = env_;
options.check_flush_compaction_key_order = false;
options.paranoid_file_checks = true;
options.create_if_missing = true;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_OK(DB::Open(options, dbname_, &db_));
- assert(db_ != nullptr);
+ assert(db_ != nullptr); // suppress false clang-analyze report
Build(10, 0, 0);
std::string start, end;
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
} else {
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
ASSERT_OK(dbi->TEST_FlushMemTable());
- ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ ASSERT_OK(
+ dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
}
db_->ReleaseSnapshot(snap);
}
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeLast) {
Options options;
+ options.env = env_;
options.check_flush_compaction_key_order = false;
options.paranoid_file_checks = true;
options.create_if_missing = true;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_OK(DB::Open(options, dbname_, &db_));
- assert(db_ != nullptr);
+ assert(db_ != nullptr); // suppress false clang-analyze report
std::string start, end;
Build(10);
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
} else {
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
ASSERT_OK(dbi->TEST_FlushMemTable());
- ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ ASSERT_OK(
+ dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
}
db_->ReleaseSnapshot(snap);
}
TEST_F(CorruptionTest, LogCorruptionErrorsInCompactionIterator) {
Options options;
+ options.env = env_;
options.create_if_missing = true;
options.allow_data_in_errors = true;
auto mode = mock::MockTableFactory::kCorruptKey;
options.table_factory = mock;
ASSERT_OK(DB::Open(options, dbname_, &db_));
- assert(db_ != nullptr);
+ assert(db_ != nullptr); // suppress false clang-analyze report
Build(100, 2);
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
ASSERT_OK(dbi->TEST_FlushMemTable());
- Status s = dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true);
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ Status s =
+ dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr);
ASSERT_NOK(s);
ASSERT_TRUE(s.IsCorruption());
}
TEST_F(CorruptionTest, CompactionKeyOrderCheck) {
Options options;
+ options.env = env_;
options.paranoid_file_checks = false;
options.create_if_missing = true;
options.check_flush_compaction_key_order = false;
std::make_shared<mock::MockTableFactory>();
options.table_factory = mock;
ASSERT_OK(DB::Open(options, dbname_, &db_));
- assert(db_ != nullptr);
+ assert(db_ != nullptr); // suppress false clang-analyze report
mock->SetCorruptionMode(mock::MockTableFactory::kCorruptReorderKey);
Build(100, 2);
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
mock->SetCorruptionMode(mock::MockTableFactory::kCorruptNone);
ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "true"}}));
- ASSERT_NOK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ ASSERT_NOK(
+ dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
}
TEST_F(CorruptionTest, FlushKeyOrderCheck) {
Options options;
+ options.env = env_;
options.paranoid_file_checks = false;
options.create_if_missing = true;
ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "true"}}));
}
TEST_F(CorruptionTest, DisableKeyOrderCheck) {
- Options options;
ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "false"}}));
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
ASSERT_OK(db_->Put(WriteOptions(), "foo2", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), "foo4", "v1"));
ASSERT_OK(dbi->TEST_FlushMemTable());
- ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ ASSERT_OK(
+ dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(CorruptionTest, VerifyWholeTableChecksum) {
CloseDb();
Options options;
- options.env = &env_;
+ options.env = env_;
ASSERT_OK(DestroyDB(dbname_, options));
options.create_if_missing = true;
options.file_checksum_gen_factory =
SyncPoint::GetInstance()->ClearAllCallBacks();
int count{0};
SyncPoint::GetInstance()->SetCallBack(
- "DBImpl::VerifySstFileChecksum:mismatch", [&](void* arg) {
+ "DBImpl::VerifyFullFileChecksum:mismatch", [&](void* arg) {
auto* s = reinterpret_cast<Status*>(arg);
- assert(s);
+ ASSERT_NE(s, nullptr);
++count;
ASSERT_NOK(*s);
});
ASSERT_EQ(1, count);
}
+class CrashDuringRecoveryWithCorruptionTest
+ : public CorruptionTest,
+ public testing::WithParamInterface<std::tuple<bool, bool>> {
+ public:
+ explicit CrashDuringRecoveryWithCorruptionTest()
+ : CorruptionTest(),
+ avoid_flush_during_recovery_(std::get<0>(GetParam())),
+ track_and_verify_wals_in_manifest_(std::get<1>(GetParam())) {}
+
+ protected:
+ const bool avoid_flush_during_recovery_;
+ const bool track_and_verify_wals_in_manifest_;
+};
+
+INSTANTIATE_TEST_CASE_P(CorruptionTest, CrashDuringRecoveryWithCorruptionTest,
+ ::testing::Values(std::make_tuple(true, false),
+ std::make_tuple(false, false),
+ std::make_tuple(true, true),
+ std::make_tuple(false, true)));
+
+// In case of non-TransactionDB with avoid_flush_during_recovery = true, RocksDB
+// won't flush the data from WAL to L0 for all column families if possible. As a
+// result, not all column families can increase their log_numbers, and
+// min_log_number_to_keep won't change.
+// It may prematurely persist a new MANIFEST even before we can declare the DB
+// is in consistent state after recovery (this is when the new WAL is synced)
+// and advances log_numbers for some column families.
+//
+// If there is power failure before we sync the new WAL, we will end up in
+// a situation in which after persisting the MANIFEST, RocksDB will see some
+// column families' log_numbers larger than the corrupted wal, and
+// "Column family inconsistency: SST file contains data beyond the point of
+// corruption" error will be hit, causing recovery to fail.
+//
+// After adding the fix, only after new WAL is synced, RocksDB persist a new
+// MANIFEST with column families to ensure RocksDB is in consistent state.
+// RocksDB writes an empty WriteBatch as a sentinel to the new WAL which is
+// synced immediately afterwards. The sequence number of the sentinel
+// WriteBatch will be the next sequence number immediately after the largest
+// sequence number recovered from previous WALs and MANIFEST because of which DB
+// will be in consistent state.
+// If a future recovery starts from the new MANIFEST, then it means the new WAL
+// is successfully synced. Due to the sentinel empty write batch at the
+// beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point.
+// If future recovery starts from the old MANIFEST, it means the writing the new
+// MANIFEST failed. It won't have the "SST ahead of WAL" error.
+//
+// The combination of corrupting a WAL and injecting an error during subsequent
+// re-open exposes the bug of prematurely persisting a new MANIFEST with
+// advanced ColumnFamilyData::log_number.
+TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecovery) {
+ CloseDb();
+ Options options;
+ options.track_and_verify_wals_in_manifest =
+ track_and_verify_wals_in_manifest_;
+ options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
+ options.avoid_flush_during_recovery = false;
+ options.env = env_;
+ ASSERT_OK(DestroyDB(dbname_, options));
+ options.create_if_missing = true;
+ options.max_write_buffer_number = 8;
+
+ Reopen(&options);
+ Status s;
+ const std::string test_cf_name = "test_cf";
+ ColumnFamilyHandle* cfh = nullptr;
+ s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
+ ASSERT_OK(s);
+ delete cfh;
+ CloseDb();
+
+ std::vector<ColumnFamilyDescriptor> cf_descs;
+ cf_descs.emplace_back(kDefaultColumnFamilyName, options);
+ cf_descs.emplace_back(test_cf_name, options);
+ std::vector<ColumnFamilyHandle*> handles;
+
+ // 1. Open and populate the DB. Write and flush default_cf several times to
+ // advance wal number so that some column families have advanced log_number
+ // while other don't.
+ {
+ ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
+ auto* dbimpl = static_cast_with_check<DBImpl>(db_);
+ assert(dbimpl);
+
+ // Write one key to test_cf.
+ ASSERT_OK(db_->Put(WriteOptions(), handles[1], "old_key", "dontcare"));
+ ASSERT_OK(db_->Flush(FlushOptions(), handles[1]));
+
+ // Write to default_cf and flush this cf several times to advance wal
+ // number. TEST_SwitchMemtable makes sure WALs are not synced and test can
+ // corrupt un-sync WAL.
+ for (int i = 0; i < 2; ++i) {
+ ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i),
+ "value" + std::to_string(i)));
+ ASSERT_OK(dbimpl->TEST_SwitchMemtable());
+ }
+
+ for (auto* h : handles) {
+ delete h;
+ }
+ handles.clear();
+ CloseDb();
+ }
+
+ // 2. Corrupt second last un-syned wal file to emulate power reset which
+ // caused the DB to lose the un-synced WAL.
+ {
+ std::vector<uint64_t> file_nums;
+ GetSortedWalFiles(file_nums);
+ size_t size = file_nums.size();
+ assert(size >= 2);
+ uint64_t log_num = file_nums[size - 2];
+ CorruptFileWithTruncation(FileType::kWalFile, log_num,
+ /*bytes_to_truncate=*/8);
+ }
+
+ // 3. After first crash reopen the DB which contains corrupted WAL. Default
+ // family has higher log number than corrupted wal number.
+ //
+ // Case1: If avoid_flush_during_recovery = true, RocksDB won't flush the data
+ // from WAL to L0 for all column families (test_cf_name in this case). As a
+ // result, not all column families can increase their log_numbers, and
+ // min_log_number_to_keep won't change.
+ //
+ // Case2: If avoid_flush_during_recovery = false, all column families have
+ // flushed their data from WAL to L0 during recovery, and none of them will
+ // ever need to read the WALs again.
+
+ // 4. Fault is injected to fail the recovery.
+ {
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::GetLogSizeAndMaybeTruncate:0", [&](void* arg) {
+ auto* tmp_s = reinterpret_cast<Status*>(arg);
+ assert(tmp_s);
+ *tmp_s = Status::IOError("Injected");
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ handles.clear();
+ options.avoid_flush_during_recovery = true;
+ s = DB::Open(options, dbname_, cf_descs, &handles, &db_);
+ ASSERT_TRUE(s.IsIOError());
+ ASSERT_EQ("IO error: Injected", s.ToString());
+ for (auto* h : handles) {
+ delete h;
+ }
+ CloseDb();
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ }
+
+ // 5. After second crash reopen the db with second corruption. Default family
+ // has higher log number than corrupted wal number.
+ //
+ // Case1: If avoid_flush_during_recovery = true, we persist a new
+ // MANIFEST with advanced log_numbers for some column families only after
+ // syncing the WAL. So during second crash, RocksDB will skip the corrupted
+ // WAL files as they have been moved to different folder. Since newly synced
+ // WAL file's sequence number (sentinel WriteBatch) will be the next
+ // sequence number immediately after the largest sequence number recovered
+ // from previous WALs and MANIFEST, db will be in consistent state and opens
+ // successfully.
+ //
+ // Case2: If avoid_flush_during_recovery = false, the corrupted WAL is below
+ // this number. So during a second crash after persisting the new MANIFEST,
+ // RocksDB will skip the corrupted WAL(s) because they are all below this
+ // bound. Therefore, we won't hit the "column family inconsistency" error
+ // message.
+ {
+ options.avoid_flush_during_recovery = avoid_flush_during_recovery_;
+ ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
+
+ // Verify that data is not lost.
+ {
+ std::string v;
+ ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v));
+ ASSERT_EQ("dontcare", v);
+
+ v.clear();
+ ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(0), &v));
+ ASSERT_EQ("value" + std::to_string(0), v);
+
+ // Since it's corrupting second last wal, below key is not found.
+ v.clear();
+ ASSERT_EQ(db_->Get(ReadOptions(), "key" + std::to_string(1), &v),
+ Status::NotFound());
+ }
+
+ for (auto* h : handles) {
+ delete h;
+ }
+ handles.clear();
+ CloseDb();
+ }
+}
+
+// In case of TransactionDB, it enables two-phase-commit. The prepare section of
+// an uncommitted transaction always need to be kept. Even if we perform flush
+// during recovery, we may still need to hold an old WAL. The
+// min_log_number_to_keep won't change, and "Column family inconsistency: SST
+// file contains data beyond the point of corruption" error will be hit, causing
+// recovery to fail.
+//
+// After adding the fix, only after new WAL is synced, RocksDB persist a new
+// MANIFEST with column families to ensure RocksDB is in consistent state.
+// RocksDB writes an empty WriteBatch as a sentinel to the new WAL which is
+// synced immediately afterwards. The sequence number of the sentinel
+// WriteBatch will be the next sequence number immediately after the largest
+// sequence number recovered from previous WALs and MANIFEST because of which DB
+// will be in consistent state.
+// If a future recovery starts from the new MANIFEST, then it means the new WAL
+// is successfully synced. Due to the sentinel empty write batch at the
+// beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point.
+// If future recovery starts from the old MANIFEST, it means the writing the new
+// MANIFEST failed. It won't have the "SST ahead of WAL" error.
+//
+// The combination of corrupting a WAL and injecting an error during subsequent
+// re-open exposes the bug of prematurely persisting a new MANIFEST with
+// advanced ColumnFamilyData::log_number.
+TEST_P(CrashDuringRecoveryWithCorruptionTest, TxnDbCrashDuringRecovery) {
+ CloseDb();
+ Options options;
+ options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
+ options.track_and_verify_wals_in_manifest =
+ track_and_verify_wals_in_manifest_;
+ options.avoid_flush_during_recovery = false;
+ options.env = env_;
+ ASSERT_OK(DestroyDB(dbname_, options));
+ options.create_if_missing = true;
+ options.max_write_buffer_number = 3;
+ Reopen(&options);
+
+ // Create cf test_cf_name.
+ ColumnFamilyHandle* cfh = nullptr;
+ const std::string test_cf_name = "test_cf";
+ Status s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
+ ASSERT_OK(s);
+ delete cfh;
+ CloseDb();
+
+ std::vector<ColumnFamilyDescriptor> cf_descs;
+ cf_descs.emplace_back(kDefaultColumnFamilyName, options);
+ cf_descs.emplace_back(test_cf_name, options);
+ std::vector<ColumnFamilyHandle*> handles;
+
+ TransactionDB* txn_db = nullptr;
+ TransactionDBOptions txn_db_opts;
+
+ // 1. Open and populate the DB. Write and flush default_cf several times to
+ // advance wal number so that some column families have advanced log_number
+ // while other don't.
+ {
+ ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs,
+ &handles, &txn_db));
+
+ auto* txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions());
+ // Put cf1
+ ASSERT_OK(txn->Put(handles[1], "foo", "value"));
+ ASSERT_OK(txn->SetName("txn0"));
+ ASSERT_OK(txn->Prepare());
+ ASSERT_OK(txn_db->Flush(FlushOptions()));
+
+ delete txn;
+ txn = nullptr;
+
+ auto* dbimpl = static_cast_with_check<DBImpl>(txn_db->GetRootDB());
+ assert(dbimpl);
+
+ // Put and flush cf0
+ for (int i = 0; i < 2; ++i) {
+ ASSERT_OK(txn_db->Put(WriteOptions(), "key" + std::to_string(i),
+ "value" + std::to_string(i)));
+ ASSERT_OK(dbimpl->TEST_SwitchMemtable());
+ }
+
+ // Put cf1
+ txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions());
+ ASSERT_OK(txn->Put(handles[1], "foo1", "value1"));
+ ASSERT_OK(txn->Commit());
+
+ delete txn;
+ txn = nullptr;
+
+ for (auto* h : handles) {
+ delete h;
+ }
+ handles.clear();
+ delete txn_db;
+ }
+
+ // 2. Corrupt second last wal to emulate power reset which caused the DB to
+ // lose the un-synced WAL.
+ {
+ std::vector<uint64_t> file_nums;
+ GetSortedWalFiles(file_nums);
+ size_t size = file_nums.size();
+ assert(size >= 2);
+ uint64_t log_num = file_nums[size - 2];
+ CorruptFileWithTruncation(FileType::kWalFile, log_num,
+ /*bytes_to_truncate=*/8);
+ }
+
+ // 3. After first crash reopen the DB which contains corrupted WAL. Default
+ // family has higher log number than corrupted wal number. There may be old
+ // WAL files that it must not delete because they can contain data of
+ // uncommitted transactions. As a result, min_log_number_to_keep won't change.
+
+ {
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::Open::BeforeSyncWAL", [&](void* arg) {
+ auto* tmp_s = reinterpret_cast<Status*>(arg);
+ assert(tmp_s);
+ *tmp_s = Status::IOError("Injected");
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ handles.clear();
+ s = TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs, &handles,
+ &txn_db);
+ ASSERT_TRUE(s.IsIOError());
+ ASSERT_EQ("IO error: Injected", s.ToString());
+ for (auto* h : handles) {
+ delete h;
+ }
+ CloseDb();
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ }
+
+ // 4. Corrupt max_wal_num.
+ {
+ std::vector<uint64_t> file_nums;
+ GetSortedWalFiles(file_nums);
+ size_t size = file_nums.size();
+ uint64_t log_num = file_nums[size - 1];
+ CorruptFileWithTruncation(FileType::kWalFile, log_num);
+ }
+
+ // 5. After second crash reopen the db with second corruption. Default family
+ // has higher log number than corrupted wal number.
+ // We persist a new MANIFEST with advanced log_numbers for some column
+ // families only after syncing the WAL. So during second crash, RocksDB will
+ // skip the corrupted WAL files as they have been moved to different folder.
+ // Since newly synced WAL file's sequence number (sentinel WriteBatch) will be
+ // the next sequence number immediately after the largest sequence number
+ // recovered from previous WALs and MANIFEST, db will be in consistent state
+ // and opens successfully.
+ {
+ ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs,
+ &handles, &txn_db));
+
+ // Verify that data is not lost.
+ {
+ std::string v;
+ // Key not visible since it's not committed.
+ ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo", &v),
+ Status::NotFound());
+
+ v.clear();
+ ASSERT_OK(txn_db->Get(ReadOptions(), "key" + std::to_string(0), &v));
+ ASSERT_EQ("value" + std::to_string(0), v);
+
+ // Last WAL is corrupted which contains two keys below.
+ v.clear();
+ ASSERT_EQ(txn_db->Get(ReadOptions(), "key" + std::to_string(1), &v),
+ Status::NotFound());
+ v.clear();
+ ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo1", &v),
+ Status::NotFound());
+ }
+
+ for (auto* h : handles) {
+ delete h;
+ }
+ delete txn_db;
+ }
+}
+
+// This test is similar to
+// CrashDuringRecoveryWithCorruptionTest.CrashDuringRecovery except it calls
+// flush and corrupts Last WAL. It calls flush to sync some of the WALs and
+// remaining are unsyned one of which is then corrupted to simulate crash.
+//
+// In case of non-TransactionDB with avoid_flush_during_recovery = true, RocksDB
+// won't flush the data from WAL to L0 for all column families if possible. As a
+// result, not all column families can increase their log_numbers, and
+// min_log_number_to_keep won't change.
+// It may prematurely persist a new MANIFEST even before we can declare the DB
+// is in consistent state after recovery (this is when the new WAL is synced)
+// and advances log_numbers for some column families.
+//
+// If there is power failure before we sync the new WAL, we will end up in
+// a situation in which after persisting the MANIFEST, RocksDB will see some
+// column families' log_numbers larger than the corrupted wal, and
+// "Column family inconsistency: SST file contains data beyond the point of
+// corruption" error will be hit, causing recovery to fail.
+//
+// After adding the fix, only after new WAL is synced, RocksDB persist a new
+// MANIFEST with column families to ensure RocksDB is in consistent state.
+// RocksDB writes an empty WriteBatch as a sentinel to the new WAL which is
+// synced immediately afterwards. The sequence number of the sentinel
+// WriteBatch will be the next sequence number immediately after the largest
+// sequence number recovered from previous WALs and MANIFEST because of which DB
+// will be in consistent state.
+// If a future recovery starts from the new MANIFEST, then it means the new WAL
+// is successfully synced. Due to the sentinel empty write batch at the
+// beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point.
+// If future recovery starts from the old MANIFEST, it means the writing the new
+// MANIFEST failed. It won't have the "SST ahead of WAL" error.
+
+// The combination of corrupting a WAL and injecting an error during subsequent
+// re-open exposes the bug of prematurely persisting a new MANIFEST with
+// advanced ColumnFamilyData::log_number.
+TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecoveryWithFlush) {
+ CloseDb();
+ Options options;
+ options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
+ options.avoid_flush_during_recovery = false;
+ options.env = env_;
+ options.create_if_missing = true;
+
+ ASSERT_OK(DestroyDB(dbname_, options));
+ Reopen(&options);
+
+ ColumnFamilyHandle* cfh = nullptr;
+ const std::string test_cf_name = "test_cf";
+ Status s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
+ ASSERT_OK(s);
+ delete cfh;
+
+ CloseDb();
+
+ std::vector<ColumnFamilyDescriptor> cf_descs;
+ cf_descs.emplace_back(kDefaultColumnFamilyName, options);
+ cf_descs.emplace_back(test_cf_name, options);
+ std::vector<ColumnFamilyHandle*> handles;
+
+ {
+ ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
+
+ // Write one key to test_cf.
+ ASSERT_OK(db_->Put(WriteOptions(), handles[1], "old_key", "dontcare"));
+
+ // Write to default_cf and flush this cf several times to advance wal
+ // number.
+ for (int i = 0; i < 2; ++i) {
+ ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i),
+ "value" + std::to_string(i)));
+ ASSERT_OK(db_->Flush(FlushOptions()));
+ }
+
+ ASSERT_OK(db_->Put(WriteOptions(), handles[1], "dontcare", "dontcare"));
+ for (auto* h : handles) {
+ delete h;
+ }
+ handles.clear();
+ CloseDb();
+ }
+
+ // Corrupt second last un-syned wal file to emulate power reset which
+ // caused the DB to lose the un-synced WAL.
+ {
+ std::vector<uint64_t> file_nums;
+ GetSortedWalFiles(file_nums);
+ size_t size = file_nums.size();
+ uint64_t log_num = file_nums[size - 1];
+ CorruptFileWithTruncation(FileType::kWalFile, log_num,
+ /*bytes_to_truncate=*/8);
+ }
+
+ // Fault is injected to fail the recovery.
+ {
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::GetLogSizeAndMaybeTruncate:0", [&](void* arg) {
+ auto* tmp_s = reinterpret_cast<Status*>(arg);
+ assert(tmp_s);
+ *tmp_s = Status::IOError("Injected");
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ handles.clear();
+ options.avoid_flush_during_recovery = true;
+ s = DB::Open(options, dbname_, cf_descs, &handles, &db_);
+ ASSERT_TRUE(s.IsIOError());
+ ASSERT_EQ("IO error: Injected", s.ToString());
+ for (auto* h : handles) {
+ delete h;
+ }
+ CloseDb();
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ }
+
+ // Reopen db again
+ {
+ options.avoid_flush_during_recovery = avoid_flush_during_recovery_;
+ ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
+
+ // Verify that data is not lost.
+ {
+ std::string v;
+ ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v));
+ ASSERT_EQ("dontcare", v);
+
+ for (int i = 0; i < 2; ++i) {
+ v.clear();
+ ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(i), &v));
+ ASSERT_EQ("value" + std::to_string(i), v);
+ }
+
+ // Since it's corrupting last wal after Flush, below key is not found.
+ v.clear();
+ ASSERT_EQ(db_->Get(ReadOptions(), handles[1], "dontcare", &v),
+ Status::NotFound());
+ }
+
+ for (auto* h : handles) {
+ delete h;
+ }
+ }
+}
+
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
+ RegisterCustomObjects(argc, argv);
return RUN_ALL_TESTS();
}