1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 // Syncpoint prevents us building and running tests in release
19 #include "db/db_impl.h"
20 #include "port/port.h"
21 #include "port/stack_trace.h"
22 #include "rocksdb/db.h"
23 #include "rocksdb/env.h"
24 #include "rocksdb/utilities/checkpoint.h"
25 #include "rocksdb/utilities/transaction_db.h"
26 #include "util/fault_injection_test_env.h"
27 #include "util/sync_point.h"
28 #include "util/testharness.h"
31 class CheckpointTest
: public testing::Test
{
33 // Sequence of option configurations to try
41 std::string alternative_wal_dir_
;
44 Options last_options_
;
45 std::vector
<ColumnFamilyHandle
*> handles_
;
46 std::string snapshot_name_
;
48 CheckpointTest() : env_(Env::Default()) {
49 env_
->SetBackgroundThreads(1, Env::LOW
);
50 env_
->SetBackgroundThreads(1, Env::HIGH
);
51 dbname_
= test::PerThreadDBPath(env_
, "checkpoint_test");
52 alternative_wal_dir_
= dbname_
+ "/wal";
53 auto options
= CurrentOptions();
54 auto delete_options
= options
;
55 delete_options
.wal_dir
= alternative_wal_dir_
;
56 EXPECT_OK(DestroyDB(dbname_
, delete_options
));
57 // Destroy it for not alternative WAL dir is used.
58 EXPECT_OK(DestroyDB(dbname_
, options
));
60 snapshot_name_
= test::PerThreadDBPath(env_
, "snapshot");
61 std::string snapshot_tmp_name
= snapshot_name_
+ ".tmp";
62 EXPECT_OK(DestroyDB(snapshot_name_
, options
));
63 env_
->DeleteDir(snapshot_name_
);
64 EXPECT_OK(DestroyDB(snapshot_tmp_name
, options
));
65 env_
->DeleteDir(snapshot_tmp_name
);
70 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
71 rocksdb::SyncPoint::GetInstance()->LoadDependency({});
72 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
75 options
.db_paths
.emplace_back(dbname_
, 0);
76 options
.db_paths
.emplace_back(dbname_
+ "_2", 0);
77 options
.db_paths
.emplace_back(dbname_
+ "_3", 0);
78 options
.db_paths
.emplace_back(dbname_
+ "_4", 0);
79 EXPECT_OK(DestroyDB(dbname_
, options
));
80 EXPECT_OK(DestroyDB(snapshot_name_
, options
));
83 // Return the current option configuration.
84 Options
CurrentOptions() {
87 options
.create_if_missing
= true;
91 void CreateColumnFamilies(const std::vector
<std::string
>& cfs
,
92 const Options
& options
) {
93 ColumnFamilyOptions
cf_opts(options
);
94 size_t cfi
= handles_
.size();
95 handles_
.resize(cfi
+ cfs
.size());
97 ASSERT_OK(db_
->CreateColumnFamily(cf_opts
, cf
, &handles_
[cfi
++]));
101 void CreateAndReopenWithCF(const std::vector
<std::string
>& cfs
,
102 const Options
& options
) {
103 CreateColumnFamilies(cfs
, options
);
104 std::vector
<std::string
> cfs_plus_default
= cfs
;
105 cfs_plus_default
.insert(cfs_plus_default
.begin(), kDefaultColumnFamilyName
);
106 ReopenWithColumnFamilies(cfs_plus_default
, options
);
109 void ReopenWithColumnFamilies(const std::vector
<std::string
>& cfs
,
110 const std::vector
<Options
>& options
) {
111 ASSERT_OK(TryReopenWithColumnFamilies(cfs
, options
));
114 void ReopenWithColumnFamilies(const std::vector
<std::string
>& cfs
,
115 const Options
& options
) {
116 ASSERT_OK(TryReopenWithColumnFamilies(cfs
, options
));
119 Status
TryReopenWithColumnFamilies(
120 const std::vector
<std::string
>& cfs
,
121 const std::vector
<Options
>& options
) {
123 EXPECT_EQ(cfs
.size(), options
.size());
124 std::vector
<ColumnFamilyDescriptor
> column_families
;
125 for (size_t i
= 0; i
< cfs
.size(); ++i
) {
126 column_families
.push_back(ColumnFamilyDescriptor(cfs
[i
], options
[i
]));
128 DBOptions db_opts
= DBOptions(options
[0]);
129 return DB::Open(db_opts
, dbname_
, column_families
, &handles_
, &db_
);
132 Status
TryReopenWithColumnFamilies(const std::vector
<std::string
>& cfs
,
133 const Options
& options
) {
135 std::vector
<Options
> v_opts(cfs
.size(), options
);
136 return TryReopenWithColumnFamilies(cfs
, v_opts
);
139 void Reopen(const Options
& options
) {
140 ASSERT_OK(TryReopen(options
));
144 for (auto h
: handles_
) {
152 void DestroyAndReopen(const Options
& options
) {
153 // Destroy using last options
154 Destroy(last_options_
);
155 ASSERT_OK(TryReopen(options
));
158 void Destroy(const Options
& options
) {
160 ASSERT_OK(DestroyDB(dbname_
, options
));
163 Status
ReadOnlyReopen(const Options
& options
) {
164 return DB::OpenForReadOnly(options
, dbname_
, &db_
);
167 Status
TryReopen(const Options
& options
) {
169 last_options_
= options
;
170 return DB::Open(options
, dbname_
, &db_
);
173 Status
Flush(int cf
= 0) {
175 return db_
->Flush(FlushOptions());
177 return db_
->Flush(FlushOptions(), handles_
[cf
]);
181 Status
Put(const Slice
& k
, const Slice
& v
, WriteOptions wo
= WriteOptions()) {
182 return db_
->Put(wo
, k
, v
);
185 Status
Put(int cf
, const Slice
& k
, const Slice
& v
,
186 WriteOptions wo
= WriteOptions()) {
187 return db_
->Put(wo
, handles_
[cf
], k
, v
);
190 Status
Delete(const std::string
& k
) {
191 return db_
->Delete(WriteOptions(), k
);
194 Status
Delete(int cf
, const std::string
& k
) {
195 return db_
->Delete(WriteOptions(), handles_
[cf
], k
);
198 std::string
Get(const std::string
& k
, const Snapshot
* snapshot
= nullptr) {
200 options
.verify_checksums
= true;
201 options
.snapshot
= snapshot
;
203 Status s
= db_
->Get(options
, k
, &result
);
204 if (s
.IsNotFound()) {
205 result
= "NOT_FOUND";
206 } else if (!s
.ok()) {
207 result
= s
.ToString();
212 std::string
Get(int cf
, const std::string
& k
,
213 const Snapshot
* snapshot
= nullptr) {
215 options
.verify_checksums
= true;
216 options
.snapshot
= snapshot
;
218 Status s
= db_
->Get(options
, handles_
[cf
], k
, &result
);
219 if (s
.IsNotFound()) {
220 result
= "NOT_FOUND";
221 } else if (!s
.ok()) {
222 result
= s
.ToString();
228 TEST_F(CheckpointTest
, GetSnapshotLink
) {
229 for (uint64_t log_size_for_flush
: {0, 1000000}) {
232 ReadOptions roptions
;
234 Checkpoint
* checkpoint
;
236 options
= CurrentOptions();
239 ASSERT_OK(DestroyDB(dbname_
, options
));
243 options
.create_if_missing
= true;
244 ASSERT_OK(DB::Open(options
, dbname_
, &db_
));
245 std::string key
= std::string("foo");
246 ASSERT_OK(Put(key
, "v1"));
248 ASSERT_OK(Checkpoint::Create(db_
, &checkpoint
));
249 ASSERT_OK(checkpoint
->CreateCheckpoint(snapshot_name_
, log_size_for_flush
));
250 ASSERT_OK(Put(key
, "v2"));
251 ASSERT_EQ("v2", Get(key
));
253 ASSERT_EQ("v2", Get(key
));
254 // Open snapshot and verify contents while DB is running
255 options
.create_if_missing
= false;
256 ASSERT_OK(DB::Open(options
, snapshot_name_
, &snapshotDB
));
257 ASSERT_OK(snapshotDB
->Get(roptions
, key
, &result
));
258 ASSERT_EQ("v1", result
);
260 snapshotDB
= nullptr;
264 // Destroy original DB
265 ASSERT_OK(DestroyDB(dbname_
, options
));
267 // Open snapshot and verify contents
268 options
.create_if_missing
= false;
269 dbname_
= snapshot_name_
;
270 ASSERT_OK(DB::Open(options
, dbname_
, &db_
));
271 ASSERT_EQ("v1", Get(key
));
274 ASSERT_OK(DestroyDB(dbname_
, options
));
278 dbname_
= test::PerThreadDBPath(env_
, "db_test");
282 TEST_F(CheckpointTest
, CheckpointCF
) {
283 Options options
= CurrentOptions();
284 CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options
);
285 rocksdb::SyncPoint::GetInstance()->LoadDependency(
286 {{"CheckpointTest::CheckpointCF:2", "DBImpl::GetLiveFiles:2"},
287 {"DBImpl::GetLiveFiles:1", "CheckpointTest::CheckpointCF:1"}});
289 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
291 ASSERT_OK(Put(0, "Default", "Default"));
292 ASSERT_OK(Put(1, "one", "one"));
293 ASSERT_OK(Put(2, "two", "two"));
294 ASSERT_OK(Put(3, "three", "three"));
295 ASSERT_OK(Put(4, "four", "four"));
296 ASSERT_OK(Put(5, "five", "five"));
299 ReadOptions roptions
;
301 std::vector
<ColumnFamilyHandle
*> cphandles
;
305 rocksdb::port::Thread
t([&]() {
306 Checkpoint
* checkpoint
;
307 ASSERT_OK(Checkpoint::Create(db_
, &checkpoint
));
308 ASSERT_OK(checkpoint
->CreateCheckpoint(snapshot_name_
));
311 TEST_SYNC_POINT("CheckpointTest::CheckpointCF:1");
312 ASSERT_OK(Put(0, "Default", "Default1"));
313 ASSERT_OK(Put(1, "one", "eleven"));
314 ASSERT_OK(Put(2, "two", "twelve"));
315 ASSERT_OK(Put(3, "three", "thirteen"));
316 ASSERT_OK(Put(4, "four", "fourteen"));
317 ASSERT_OK(Put(5, "five", "fifteen"));
318 TEST_SYNC_POINT("CheckpointTest::CheckpointCF:2");
320 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
321 ASSERT_OK(Put(1, "one", "twentyone"));
322 ASSERT_OK(Put(2, "two", "twentytwo"));
323 ASSERT_OK(Put(3, "three", "twentythree"));
324 ASSERT_OK(Put(4, "four", "twentyfour"));
325 ASSERT_OK(Put(5, "five", "twentyfive"));
328 // Open snapshot and verify contents while DB is running
329 options
.create_if_missing
= false;
330 std::vector
<std::string
> cfs
;
331 cfs
= {kDefaultColumnFamilyName
, "one", "two", "three", "four", "five"};
332 std::vector
<ColumnFamilyDescriptor
> column_families
;
333 for (size_t i
= 0; i
< cfs
.size(); ++i
) {
334 column_families
.push_back(ColumnFamilyDescriptor(cfs
[i
], options
));
336 ASSERT_OK(DB::Open(options
, snapshot_name_
,
337 column_families
, &cphandles
, &snapshotDB
));
338 ASSERT_OK(snapshotDB
->Get(roptions
, cphandles
[0], "Default", &result
));
339 ASSERT_EQ("Default1", result
);
340 ASSERT_OK(snapshotDB
->Get(roptions
, cphandles
[1], "one", &result
));
341 ASSERT_EQ("eleven", result
);
342 ASSERT_OK(snapshotDB
->Get(roptions
, cphandles
[2], "two", &result
));
343 for (auto h
: cphandles
) {
348 snapshotDB
= nullptr;
351 TEST_F(CheckpointTest
, CheckpointCFNoFlush
) {
352 Options options
= CurrentOptions();
353 CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options
);
355 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
357 ASSERT_OK(Put(0, "Default", "Default"));
358 ASSERT_OK(Put(1, "one", "one"));
360 ASSERT_OK(Put(2, "two", "two"));
363 ReadOptions roptions
;
365 std::vector
<ColumnFamilyHandle
*> cphandles
;
369 rocksdb::SyncPoint::GetInstance()->SetCallBack(
370 "DBImpl::BackgroundCallFlush:start", [&](void* /*arg*/) {
371 // Flush should never trigger.
374 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
375 Checkpoint
* checkpoint
;
376 ASSERT_OK(Checkpoint::Create(db_
, &checkpoint
));
377 ASSERT_OK(checkpoint
->CreateCheckpoint(snapshot_name_
, 1000000));
378 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
381 ASSERT_OK(Put(1, "one", "two"));
383 ASSERT_OK(Put(2, "two", "twentytwo"));
385 EXPECT_OK(DestroyDB(dbname_
, options
));
387 // Open snapshot and verify contents while DB is running
388 options
.create_if_missing
= false;
389 std::vector
<std::string
> cfs
;
390 cfs
= {kDefaultColumnFamilyName
, "one", "two", "three", "four", "five"};
391 std::vector
<ColumnFamilyDescriptor
> column_families
;
392 for (size_t i
= 0; i
< cfs
.size(); ++i
) {
393 column_families
.push_back(ColumnFamilyDescriptor(cfs
[i
], options
));
395 ASSERT_OK(DB::Open(options
, snapshot_name_
, column_families
, &cphandles
,
397 ASSERT_OK(snapshotDB
->Get(roptions
, cphandles
[0], "Default", &result
));
398 ASSERT_EQ("Default", result
);
399 ASSERT_OK(snapshotDB
->Get(roptions
, cphandles
[1], "one", &result
));
400 ASSERT_EQ("one", result
);
401 ASSERT_OK(snapshotDB
->Get(roptions
, cphandles
[2], "two", &result
));
402 ASSERT_EQ("two", result
);
403 for (auto h
: cphandles
) {
408 snapshotDB
= nullptr;
411 TEST_F(CheckpointTest
, CurrentFileModifiedWhileCheckpointing
) {
412 Options options
= CurrentOptions();
413 options
.max_manifest_file_size
= 0; // always rollover manifest for file add
416 rocksdb::SyncPoint::GetInstance()->LoadDependency(
417 {// Get past the flush in the checkpoint thread before adding any keys to
418 // the db so the checkpoint thread won't hit the WriteManifest
420 {"DBImpl::GetLiveFiles:1",
421 "CheckpointTest::CurrentFileModifiedWhileCheckpointing:PrePut"},
422 // Roll the manifest during checkpointing right after live files are
424 {"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
425 "VersionSet::LogAndApply:WriteManifest"},
426 {"VersionSet::LogAndApply:WriteManifestDone",
427 "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}});
428 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
430 rocksdb::port::Thread
t([&]() {
431 Checkpoint
* checkpoint
;
432 ASSERT_OK(Checkpoint::Create(db_
, &checkpoint
));
433 ASSERT_OK(checkpoint
->CreateCheckpoint(snapshot_name_
));
437 "CheckpointTest::CurrentFileModifiedWhileCheckpointing:PrePut");
438 ASSERT_OK(Put("Default", "Default1"));
442 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
445 // Successful Open() implies that CURRENT pointed to the manifest in the
447 ASSERT_OK(DB::Open(options
, snapshot_name_
, &snapshotDB
));
449 snapshotDB
= nullptr;
452 TEST_F(CheckpointTest
, CurrentFileModifiedWhileCheckpointing2PC
) {
454 const std::string dbname
= test::PerThreadDBPath("transaction_testdb");
455 ASSERT_OK(DestroyDB(dbname
, CurrentOptions()));
456 env_
->DeleteDir(dbname
);
458 Options options
= CurrentOptions();
459 options
.allow_2pc
= true;
460 // allow_2pc is implicitly set with tx prepare
461 // options.allow_2pc = true;
462 TransactionDBOptions txn_db_options
;
464 Status s
= TransactionDB::Open(options
, txn_db_options
, dbname
, &txdb
);
466 ColumnFamilyHandle
* cfa
;
467 ColumnFamilyHandle
* cfb
;
468 ColumnFamilyOptions cf_options
;
469 ASSERT_OK(txdb
->CreateColumnFamily(cf_options
, "CFA", &cfa
));
471 WriteOptions write_options
;
472 // Insert something into CFB so lots of log files will be kept
473 // before creating the checkpoint.
474 ASSERT_OK(txdb
->CreateColumnFamily(cf_options
, "CFB", &cfb
));
475 ASSERT_OK(txdb
->Put(write_options
, cfb
, "", ""));
477 ReadOptions read_options
;
479 TransactionOptions txn_options
;
480 Transaction
* txn
= txdb
->BeginTransaction(write_options
, txn_options
);
481 s
= txn
->SetName("xid");
483 ASSERT_EQ(txdb
->GetTransactionByName("xid"), txn
);
485 s
= txn
->Put(Slice("foo"), Slice("bar"));
486 s
= txn
->Put(cfa
, Slice("foocfa"), Slice("barcfa"));
488 // Writing prepare into middle of first WAL, then flush WALs many times
489 for (int i
= 1; i
<= 100000; i
++) {
490 Transaction
* tx
= txdb
->BeginTransaction(write_options
, txn_options
);
491 ASSERT_OK(tx
->SetName("x"));
492 ASSERT_OK(tx
->Put(Slice(std::to_string(i
)), Slice("val")));
493 ASSERT_OK(tx
->Put(cfa
, Slice("aaa"), Slice("111")));
494 ASSERT_OK(tx
->Prepare());
495 ASSERT_OK(tx
->Commit());
496 if (i
% 10000 == 0) {
497 txdb
->Flush(FlushOptions());
500 ASSERT_OK(txn
->Prepare());
504 rocksdb::SyncPoint::GetInstance()->LoadDependency(
505 {{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
506 "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit"},
507 {"CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit",
508 "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}});
509 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
510 rocksdb::port::Thread
t([&]() {
511 Checkpoint
* checkpoint
;
512 ASSERT_OK(Checkpoint::Create(txdb
, &checkpoint
));
513 ASSERT_OK(checkpoint
->CreateCheckpoint(snapshot_name_
));
517 "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit");
518 ASSERT_OK(txn
->Commit());
521 "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit");
524 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
526 // No more than two logs files should exist.
527 std::vector
<std::string
> files
;
528 env_
->GetChildren(snapshot_name_
, &files
);
529 int num_log_files
= 0;
530 for (auto& file
: files
) {
533 WalFileType log_type
;
534 if (ParseFileName(file
, &num
, &type
, &log_type
) && type
== kLogFile
) {
538 // One flush after preapare + one outstanding file before checkpoint + one log
539 // file generated after checkpoint.
540 ASSERT_LE(num_log_files
, 3);
542 TransactionDB
* snapshotDB
;
543 std::vector
<ColumnFamilyDescriptor
> column_families
;
544 column_families
.push_back(
545 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, ColumnFamilyOptions()));
546 column_families
.push_back(
547 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
548 column_families
.push_back(
549 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
550 std::vector
<rocksdb::ColumnFamilyHandle
*> cf_handles
;
551 ASSERT_OK(TransactionDB::Open(options
, txn_db_options
, snapshot_name_
,
552 column_families
, &cf_handles
, &snapshotDB
));
553 ASSERT_OK(snapshotDB
->Get(read_options
, "foo", &value
));
554 ASSERT_EQ(value
, "bar");
555 ASSERT_OK(snapshotDB
->Get(read_options
, cf_handles
[1], "foocfa", &value
));
556 ASSERT_EQ(value
, "barcfa");
560 delete cf_handles
[0];
561 delete cf_handles
[1];
562 delete cf_handles
[2];
564 snapshotDB
= nullptr;
568 TEST_F(CheckpointTest
, CheckpointInvalidDirectoryName
) {
569 for (std::string checkpoint_dir
: {"", "/", "////"}) {
570 Checkpoint
* checkpoint
;
571 ASSERT_OK(Checkpoint::Create(db_
, &checkpoint
));
572 ASSERT_TRUE(checkpoint
->CreateCheckpoint("").IsInvalidArgument());
577 TEST_F(CheckpointTest
, CheckpointWithParallelWrites
) {
578 // When run with TSAN, this exposes the data race fixed in
579 // https://github.com/facebook/rocksdb/pull/3603
580 ASSERT_OK(Put("key1", "val1"));
581 port::Thread
thread([this]() { ASSERT_OK(Put("key2", "val2")); });
582 Checkpoint
* checkpoint
;
583 ASSERT_OK(Checkpoint::Create(db_
, &checkpoint
));
584 ASSERT_OK(checkpoint
->CreateCheckpoint(snapshot_name_
));
589 TEST_F(CheckpointTest
, CheckpointWithUnsyncedDataDropped
) {
590 Options options
= CurrentOptions();
591 std::unique_ptr
<FaultInjectionTestEnv
> env(new FaultInjectionTestEnv(env_
));
592 options
.env
= env
.get();
594 ASSERT_OK(Put("key1", "val1"));
595 Checkpoint
* checkpoint
;
596 ASSERT_OK(Checkpoint::Create(db_
, &checkpoint
));
597 ASSERT_OK(checkpoint
->CreateCheckpoint(snapshot_name_
));
599 env
->DropUnsyncedFileData();
601 // make sure it's openable even though whatever data that wasn't synced got
605 ASSERT_OK(DB::Open(options
, snapshot_name_
, &snapshot_db
));
606 ReadOptions read_opts
;
607 std::string get_result
;
608 ASSERT_OK(snapshot_db
->Get(read_opts
, "key1", &get_result
));
609 ASSERT_EQ("val1", get_result
);
615 } // namespace rocksdb
617 int main(int argc
, char** argv
) {
618 rocksdb::port::InstallStackTraceHandler();
619 ::testing::InitGoogleTest(&argc
, argv
);
620 return RUN_ALL_TESTS();
626 int main(int /*argc*/, char** /*argv*/) {
627 fprintf(stderr
, "SKIPPED as Checkpoint is not supported in ROCKSDB_LITE\n");
631 #endif // !ROCKSDB_LITE