#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "utilities/fault_injection_env.h"
+#include "utilities/fault_injection_fs.h"
namespace ROCKSDB_NAMESPACE {
class CheckpointTest : public testing::Test {
snapshot_name_ = test::PerThreadDBPath(env_, "snapshot");
std::string snapshot_tmp_name = snapshot_name_ + ".tmp";
EXPECT_OK(DestroyDB(snapshot_name_, options));
- env_->DeleteDir(snapshot_name_);
+ test::DeleteDir(env_, snapshot_name_);
EXPECT_OK(DestroyDB(snapshot_tmp_name, options));
- env_->DeleteDir(snapshot_tmp_name);
+ test::DeleteDir(env_, snapshot_tmp_name);
Reopen(options);
export_path_ = test::PerThreadDBPath("/export");
- DestroyDir(env_, export_path_);
+ DestroyDir(env_, export_path_).PermitUncheckedError();
cfh_reverse_comp_ = nullptr;
metadata_ = nullptr;
}
options.db_paths.emplace_back(dbname_ + "_4", 0);
EXPECT_OK(DestroyDB(dbname_, options));
EXPECT_OK(DestroyDB(snapshot_name_, options));
- DestroyDir(env_, export_path_);
+ DestroyDir(env_, export_path_).PermitUncheckedError();
}
// Return the current option configuration.
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
}
- Status TryReopenWithColumnFamilies(
- const std::vector<std::string>& cfs,
- const std::vector<Options>& options) {
+ Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
+ const std::vector<Options>& options) {
Close();
EXPECT_EQ(cfs.size(), options.size());
std::vector<ColumnFamilyDescriptor> column_families;
return TryReopenWithColumnFamilies(cfs, v_opts);
}
- void Reopen(const Options& options) {
- ASSERT_OK(TryReopen(options));
- }
+ void Reopen(const Options& options) { ASSERT_OK(TryReopen(options)); }
void CompactAll() {
for (auto h : handles_) {
return db_->Put(wo, handles_[cf], k, v);
}
- Status Delete(const std::string& k) {
- return db_->Delete(WriteOptions(), k);
- }
+ Status Delete(const std::string& k) { return db_->Delete(WriteOptions(), k); }
Status Delete(int cf, const std::string& k) {
return db_->Delete(WriteOptions(), handles_[cf], k);
ASSERT_OK(DestroyDB(dbname_, options));
// Create a database
- Status s;
options.create_if_missing = true;
ASSERT_OK(DB::Open(options, dbname_, &db_));
std::string key = std::string("foo");
}
}
+TEST_F(CheckpointTest, CheckpointWithBlob) {
+ // Create a database with a blob file
+ Options options = CurrentOptions();
+ options.create_if_missing = true;
+ options.enable_blob_files = true;
+ options.min_blob_size = 0;
+
+ Reopen(options);
+
+ constexpr char key[] = "key";
+ constexpr char blob[] = "blob";
+
+ ASSERT_OK(Put(key, blob));
+ ASSERT_OK(Flush());
+
+ // Create a checkpoint
+ Checkpoint* checkpoint = nullptr;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+
+ std::unique_ptr<Checkpoint> checkpoint_guard(checkpoint);
+
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+
+ // Make sure it contains the blob file
+ std::vector<std::string> files;
+ ASSERT_OK(env_->GetChildren(snapshot_name_, &files));
+
+ bool blob_file_found = false;
+ for (const auto& file : files) {
+ uint64_t number = 0;
+ FileType type = kWalFile;
+
+ if (ParseFileName(file, &number, &type) && type == kBlobFile) {
+ blob_file_found = true;
+ break;
+ }
+ }
+
+ ASSERT_TRUE(blob_file_found);
+
+ // Make sure the checkpoint can be opened and the blob value read
+ options.create_if_missing = false;
+ DB* checkpoint_db = nullptr;
+ ASSERT_OK(DB::Open(options, snapshot_name_, &checkpoint_db));
+
+ std::unique_ptr<DB> checkpoint_db_guard(checkpoint_db);
+
+ PinnableSlice value;
+ ASSERT_OK(checkpoint_db->Get(
+ ReadOptions(), checkpoint_db->DefaultColumnFamily(), key, &value));
+
+ ASSERT_EQ(value, blob);
+}
+
TEST_F(CheckpointTest, ExportColumnFamilyWithLinks) {
// Create a database
- Status s;
auto options = CurrentOptions();
options.create_if_missing = true;
CreateAndReopenWithCF({}, options);
int num_files_expected) {
ASSERT_EQ(metadata.files.size(), num_files_expected);
std::vector<std::string> subchildren;
- env_->GetChildren(export_path_, &subchildren);
- int num_children = 0;
- for (const auto& child : subchildren) {
- if (child != "." && child != "..") {
- ++num_children;
- }
- }
- ASSERT_EQ(num_children, num_files_expected);
+ ASSERT_OK(env_->GetChildren(export_path_, &subchildren));
+ ASSERT_EQ(subchildren.size(), num_files_expected);
};
// Test DefaultColumnFamily
export_path_, &metadata_));
verify_files_exported(*metadata_, 1);
ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name());
- DestroyDir(env_, export_path_);
+ ASSERT_OK(DestroyDir(env_, export_path_));
delete metadata_;
metadata_ = nullptr;
export_path_, &metadata_));
verify_files_exported(*metadata_, 2);
ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name());
- DestroyDir(env_, export_path_);
+ ASSERT_OK(DestroyDir(env_, export_path_));
delete metadata_;
metadata_ = nullptr;
delete checkpoint;
TEST_F(CheckpointTest, ExportColumnFamilyNegativeTest) {
// Create a database
- Status s;
auto options = CurrentOptions();
options.create_if_missing = true;
CreateAndReopenWithCF({}, options);
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
// Export onto existing directory
- env_->CreateDirIfMissing(export_path_);
+ ASSERT_OK(env_->CreateDirIfMissing(export_path_));
ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(),
export_path_, &metadata_),
Status::InvalidArgument("Specified export_dir exists"));
- DestroyDir(env_, export_path_);
+ ASSERT_OK(DestroyDir(env_, export_path_));
// Export with invalid directory specification
export_path_ = "";
std::string result;
std::vector<ColumnFamilyHandle*> cphandles;
- Status s;
// Take a snapshot
ROCKSDB_NAMESPACE::port::Thread t([&]() {
Checkpoint* checkpoint;
// Open snapshot and verify contents while DB is running
options.create_if_missing = false;
std::vector<std::string> cfs;
- cfs= {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"};
+ cfs = {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"};
std::vector<ColumnFamilyDescriptor> column_families;
- for (size_t i = 0; i < cfs.size(); ++i) {
- column_families.push_back(ColumnFamilyDescriptor(cfs[i], options));
- }
- ASSERT_OK(DB::Open(options, snapshot_name_,
- column_families, &cphandles, &snapshotDB));
+ for (size_t i = 0; i < cfs.size(); ++i) {
+ column_families.push_back(ColumnFamilyDescriptor(cfs[i], options));
+ }
+ ASSERT_OK(DB::Open(options, snapshot_name_, column_families, &cphandles,
+ &snapshotDB));
ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result));
ASSERT_EQ("Default1", result);
ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result));
ASSERT_EQ("eleven", result);
ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result));
for (auto h : cphandles) {
- delete h;
+ delete h;
}
cphandles.clear();
delete snapshotDB;
ASSERT_OK(Put(0, "Default", "Default"));
ASSERT_OK(Put(1, "one", "one"));
- Flush();
+ ASSERT_OK(Flush());
ASSERT_OK(Put(2, "two", "two"));
DB* snapshotDB;
std::string result;
std::vector<ColumnFamilyHandle*> cphandles;
- Status s;
// Take a snapshot
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCallFlush:start", [&](void* /*arg*/) {
{// Get past the flush in the checkpoint thread before adding any keys to
// the db so the checkpoint thread won't hit the WriteManifest
// syncpoints.
- {"DBImpl::GetLiveFiles:1",
+ {"CheckpointImpl::CreateCheckpoint:FlushDone",
"CheckpointTest::CurrentFileModifiedWhileCheckpointing:PrePut"},
// Roll the manifest during checkpointing right after live files are
// snapshotted.
Close();
const std::string dbname = test::PerThreadDBPath("transaction_testdb");
ASSERT_OK(DestroyDB(dbname, CurrentOptions()));
- env_->DeleteDir(dbname);
+ test::DeleteDir(env_, dbname);
Options options = CurrentOptions();
options.allow_2pc = true;
TransactionDBOptions txn_db_options;
TransactionDB* txdb;
Status s = TransactionDB::Open(options, txn_db_options, dbname, &txdb);
- assert(s.ok());
+ ASSERT_OK(s);
ColumnFamilyHandle* cfa;
ColumnFamilyHandle* cfb;
ColumnFamilyOptions cf_options;
ASSERT_EQ(txdb->GetTransactionByName("xid"), txn);
s = txn->Put(Slice("foo"), Slice("bar"));
+ ASSERT_OK(s);
s = txn->Put(cfa, Slice("foocfa"), Slice("barcfa"));
ASSERT_OK(s);
// Writing prepare into middle of first WAL, then flush WALs many times
ASSERT_OK(tx->Prepare());
ASSERT_OK(tx->Commit());
if (i % 10000 == 0) {
- txdb->Flush(FlushOptions());
+ ASSERT_OK(txdb->Flush(FlushOptions()));
}
if (i == 88888) {
ASSERT_OK(txn->Prepare());
// No more than two logs files should exist.
std::vector<std::string> files;
- env_->GetChildren(snapshot_name_, &files);
+ ASSERT_OK(env_->GetChildren(snapshot_name_, &files));
int num_log_files = 0;
for (auto& file : files) {
uint64_t num;
for (std::string checkpoint_dir : {"", "/", "////"}) {
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
- ASSERT_TRUE(checkpoint->CreateCheckpoint("").IsInvalidArgument());
+ ASSERT_TRUE(
+ checkpoint->CreateCheckpoint(checkpoint_dir).IsInvalidArgument());
delete checkpoint;
}
}
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
delete checkpoint;
- env->DropUnsyncedFileData();
+ ASSERT_OK(env->DropUnsyncedFileData());
// make sure it's openable even though whatever data that wasn't synced got
// dropped.
db_ = nullptr;
}
+TEST_F(CheckpointTest, CheckpointOptionsFileFailedToPersist) {
+ // Regression test for a bug where checkpoint failed on a DB where persisting
+ // OPTIONS file failed and the DB was opened with
+ // `fail_if_options_file_error == false`.
+ Options options = CurrentOptions();
+ options.fail_if_options_file_error = false;
+ auto fault_fs = std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
+
+ // Setup `FaultInjectionTestFS` and `SyncPoint` callbacks to fail one
+ // operation when inside the OPTIONS file persisting code.
+ std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
+ fault_fs->SetRandomMetadataWriteError(1 /* one_in */);
+ SyncPoint::GetInstance()->SetCallBack(
+ "PersistRocksDBOptions:start", [fault_fs](void* /* arg */) {
+ fault_fs->EnableMetadataWriteErrorInjection();
+ });
+ SyncPoint::GetInstance()->SetCallBack(
+ "FaultInjectionTestFS::InjectMetadataWriteError:Injected",
+ [fault_fs](void* /* arg */) {
+ fault_fs->DisableMetadataWriteErrorInjection();
+ });
+ options.env = fault_fs_env.get();
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Reopen(options);
+ ASSERT_OK(Put("key1", "val1"));
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+ delete checkpoint;
+
+ // Make sure it's usable.
+ options.env = env_;
+ DB* snapshot_db;
+ ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db));
+ ReadOptions read_opts;
+ std::string get_result;
+ ASSERT_OK(snapshot_db->Get(read_opts, "key1", &get_result));
+ ASSERT_EQ("val1", get_result);
+ delete snapshot_db;
+ delete db_;
+ db_ = nullptr;
+}
+
TEST_F(CheckpointTest, CheckpointReadOnlyDB) {
ASSERT_OK(Put("foo", "foo_value"));
ASSERT_OK(Flush());
delete snapshot_db;
}
+TEST_F(CheckpointTest, CheckpointWithDbPath) {
+ Options options = CurrentOptions();
+ options.db_paths.emplace_back(dbname_ + "_2", 0);
+ Reopen(options);
+ ASSERT_OK(Put("key1", "val1"));
+ Flush();
+ Checkpoint* checkpoint;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
+ // Currently not supported
+ ASSERT_TRUE(checkpoint->CreateCheckpoint(snapshot_name_).IsNotSupported());
+ delete checkpoint;
+}
+
+TEST_F(CheckpointTest, PutRaceWithCheckpointTrackedWalSync) {
+ // Repro for a race condition where a user write comes in after the checkpoint
+ // syncs WAL for `track_and_verify_wals_in_manifest` but before the
+ // corresponding MANIFEST update. With the bug, that scenario resulted in an
+ // unopenable DB with error "Corruption: Size mismatch: WAL ...".
+ Options options = CurrentOptions();
+ std::unique_ptr<FaultInjectionTestEnv> fault_env(
+ new FaultInjectionTestEnv(env_));
+ options.env = fault_env.get();
+ options.track_and_verify_wals_in_manifest = true;
+ Reopen(options);
+
+ ASSERT_OK(Put("key1", "val1"));
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::SyncWAL:BeforeMarkLogsSynced:1",
+ [this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ std::unique_ptr<Checkpoint> checkpoint;
+ {
+ Checkpoint* checkpoint_ptr;
+ ASSERT_OK(Checkpoint::Create(db_, &checkpoint_ptr));
+ checkpoint.reset(checkpoint_ptr);
+ }
+
+ ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
+
+ // Ensure callback ran.
+ ASSERT_EQ("val2", Get("key2"));
+
+ Close();
+
+ // Simulate full loss of unsynced data. This drops "key2" -> "val2" from the
+ // DB WAL.
+ fault_env->DropUnsyncedFileData();
+
+ // Before the bug fix, reopening the DB would fail because the MANIFEST's
+ // AddWal entry indicated the WAL should be synced through "key2" -> "val2".
+ Reopen(options);
+
+ // Need to close before `fault_env` goes out of scope.
+ Close();
+}
+
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {