1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 // Syncpoint prevents us building and running tests in release
19 #include "db/db_impl.h"
20 #include "port/stack_trace.h"
21 #include "port/port.h"
22 #include "rocksdb/db.h"
23 #include "rocksdb/env.h"
24 #include "rocksdb/utilities/checkpoint.h"
25 #include "rocksdb/utilities/transaction_db.h"
26 #include "util/sync_point.h"
27 #include "util/testharness.h"
30 class CheckpointTest
: public testing::Test
{
32 // Sequence of option configurations to try
40 std::string alternative_wal_dir_
;
43 Options last_options_
;
44 std::vector
<ColumnFamilyHandle
*> handles_
;
46 CheckpointTest() : env_(Env::Default()) {
47 env_
->SetBackgroundThreads(1, Env::LOW
);
48 env_
->SetBackgroundThreads(1, Env::HIGH
);
49 dbname_
= test::TmpDir(env_
) + "/db_test";
50 alternative_wal_dir_
= dbname_
+ "/wal";
51 auto options
= CurrentOptions();
52 auto delete_options
= options
;
53 delete_options
.wal_dir
= alternative_wal_dir_
;
54 EXPECT_OK(DestroyDB(dbname_
, delete_options
));
55 // Destroy it for not alternative WAL dir is used.
56 EXPECT_OK(DestroyDB(dbname_
, options
));
62 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
63 rocksdb::SyncPoint::GetInstance()->LoadDependency({});
64 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
67 options
.db_paths
.emplace_back(dbname_
, 0);
68 options
.db_paths
.emplace_back(dbname_
+ "_2", 0);
69 options
.db_paths
.emplace_back(dbname_
+ "_3", 0);
70 options
.db_paths
.emplace_back(dbname_
+ "_4", 0);
71 EXPECT_OK(DestroyDB(dbname_
, options
));
74 // Return the current option configuration.
75 Options
CurrentOptions() {
78 options
.create_if_missing
= true;
82 void CreateColumnFamilies(const std::vector
<std::string
>& cfs
,
83 const Options
& options
) {
84 ColumnFamilyOptions
cf_opts(options
);
85 size_t cfi
= handles_
.size();
86 handles_
.resize(cfi
+ cfs
.size());
88 ASSERT_OK(db_
->CreateColumnFamily(cf_opts
, cf
, &handles_
[cfi
++]));
92 void CreateAndReopenWithCF(const std::vector
<std::string
>& cfs
,
93 const Options
& options
) {
94 CreateColumnFamilies(cfs
, options
);
95 std::vector
<std::string
> cfs_plus_default
= cfs
;
96 cfs_plus_default
.insert(cfs_plus_default
.begin(), kDefaultColumnFamilyName
);
97 ReopenWithColumnFamilies(cfs_plus_default
, options
);
100 void ReopenWithColumnFamilies(const std::vector
<std::string
>& cfs
,
101 const std::vector
<Options
>& options
) {
102 ASSERT_OK(TryReopenWithColumnFamilies(cfs
, options
));
105 void ReopenWithColumnFamilies(const std::vector
<std::string
>& cfs
,
106 const Options
& options
) {
107 ASSERT_OK(TryReopenWithColumnFamilies(cfs
, options
));
110 Status
TryReopenWithColumnFamilies(
111 const std::vector
<std::string
>& cfs
,
112 const std::vector
<Options
>& options
) {
114 EXPECT_EQ(cfs
.size(), options
.size());
115 std::vector
<ColumnFamilyDescriptor
> column_families
;
116 for (size_t i
= 0; i
< cfs
.size(); ++i
) {
117 column_families
.push_back(ColumnFamilyDescriptor(cfs
[i
], options
[i
]));
119 DBOptions db_opts
= DBOptions(options
[0]);
120 return DB::Open(db_opts
, dbname_
, column_families
, &handles_
, &db_
);
123 Status
TryReopenWithColumnFamilies(const std::vector
<std::string
>& cfs
,
124 const Options
& options
) {
126 std::vector
<Options
> v_opts(cfs
.size(), options
);
127 return TryReopenWithColumnFamilies(cfs
, v_opts
);
130 void Reopen(const Options
& options
) {
131 ASSERT_OK(TryReopen(options
));
135 for (auto h
: handles_
) {
143 void DestroyAndReopen(const Options
& options
) {
144 // Destroy using last options
145 Destroy(last_options_
);
146 ASSERT_OK(TryReopen(options
));
149 void Destroy(const Options
& options
) {
151 ASSERT_OK(DestroyDB(dbname_
, options
));
154 Status
ReadOnlyReopen(const Options
& options
) {
155 return DB::OpenForReadOnly(options
, dbname_
, &db_
);
158 Status
TryReopen(const Options
& options
) {
160 last_options_
= options
;
161 return DB::Open(options
, dbname_
, &db_
);
164 Status
Flush(int cf
= 0) {
166 return db_
->Flush(FlushOptions());
168 return db_
->Flush(FlushOptions(), handles_
[cf
]);
172 Status
Put(const Slice
& k
, const Slice
& v
, WriteOptions wo
= WriteOptions()) {
173 return db_
->Put(wo
, k
, v
);
176 Status
Put(int cf
, const Slice
& k
, const Slice
& v
,
177 WriteOptions wo
= WriteOptions()) {
178 return db_
->Put(wo
, handles_
[cf
], k
, v
);
181 Status
Delete(const std::string
& k
) {
182 return db_
->Delete(WriteOptions(), k
);
185 Status
Delete(int cf
, const std::string
& k
) {
186 return db_
->Delete(WriteOptions(), handles_
[cf
], k
);
189 std::string
Get(const std::string
& k
, const Snapshot
* snapshot
= nullptr) {
191 options
.verify_checksums
= true;
192 options
.snapshot
= snapshot
;
194 Status s
= db_
->Get(options
, k
, &result
);
195 if (s
.IsNotFound()) {
196 result
= "NOT_FOUND";
197 } else if (!s
.ok()) {
198 result
= s
.ToString();
203 std::string
Get(int cf
, const std::string
& k
,
204 const Snapshot
* snapshot
= nullptr) {
206 options
.verify_checksums
= true;
207 options
.snapshot
= snapshot
;
209 Status s
= db_
->Get(options
, handles_
[cf
], k
, &result
);
210 if (s
.IsNotFound()) {
211 result
= "NOT_FOUND";
212 } else if (!s
.ok()) {
213 result
= s
.ToString();
219 TEST_F(CheckpointTest
, GetSnapshotLink
) {
220 for (uint64_t log_size_for_fush
: {0, 1000000}) {
222 const std::string snapshot_name
= test::TmpDir(env_
) + "/snapshot";
224 ReadOptions roptions
;
226 Checkpoint
* checkpoint
;
228 options
= CurrentOptions();
231 ASSERT_OK(DestroyDB(dbname_
, options
));
232 ASSERT_OK(DestroyDB(snapshot_name
, options
));
233 env_
->DeleteDir(snapshot_name
);
237 options
.create_if_missing
= true;
238 ASSERT_OK(DB::Open(options
, dbname_
, &db_
));
239 std::string key
= std::string("foo");
240 ASSERT_OK(Put(key
, "v1"));
242 ASSERT_OK(Checkpoint::Create(db_
, &checkpoint
));
243 ASSERT_OK(checkpoint
->CreateCheckpoint(snapshot_name
, log_size_for_fush
));
244 ASSERT_OK(Put(key
, "v2"));
245 ASSERT_EQ("v2", Get(key
));
247 ASSERT_EQ("v2", Get(key
));
248 // Open snapshot and verify contents while DB is running
249 options
.create_if_missing
= false;
250 ASSERT_OK(DB::Open(options
, snapshot_name
, &snapshotDB
));
251 ASSERT_OK(snapshotDB
->Get(roptions
, key
, &result
));
252 ASSERT_EQ("v1", result
);
254 snapshotDB
= nullptr;
258 // Destroy original DB
259 ASSERT_OK(DestroyDB(dbname_
, options
));
261 // Open snapshot and verify contents
262 options
.create_if_missing
= false;
263 dbname_
= snapshot_name
;
264 ASSERT_OK(DB::Open(options
, dbname_
, &db_
));
265 ASSERT_EQ("v1", Get(key
));
268 ASSERT_OK(DestroyDB(dbname_
, options
));
272 dbname_
= test::TmpDir(env_
) + "/db_test";
276 TEST_F(CheckpointTest
, CheckpointCF
) {
277 Options options
= CurrentOptions();
278 CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options
);
279 rocksdb::SyncPoint::GetInstance()->LoadDependency(
280 {{"CheckpointTest::CheckpointCF:2", "DBImpl::GetLiveFiles:2"},
281 {"DBImpl::GetLiveFiles:1", "CheckpointTest::CheckpointCF:1"}});
283 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
285 ASSERT_OK(Put(0, "Default", "Default"));
286 ASSERT_OK(Put(1, "one", "one"));
287 ASSERT_OK(Put(2, "two", "two"));
288 ASSERT_OK(Put(3, "three", "three"));
289 ASSERT_OK(Put(4, "four", "four"));
290 ASSERT_OK(Put(5, "five", "five"));
292 const std::string snapshot_name
= test::TmpDir(env_
) + "/snapshot";
294 ReadOptions roptions
;
296 std::vector
<ColumnFamilyHandle
*> cphandles
;
298 ASSERT_OK(DestroyDB(snapshot_name
, options
));
299 env_
->DeleteDir(snapshot_name
);
303 rocksdb::port::Thread
t([&]() {
304 Checkpoint
* checkpoint
;
305 ASSERT_OK(Checkpoint::Create(db_
, &checkpoint
));
306 ASSERT_OK(checkpoint
->CreateCheckpoint(snapshot_name
));
309 TEST_SYNC_POINT("CheckpointTest::CheckpointCF:1");
310 ASSERT_OK(Put(0, "Default", "Default1"));
311 ASSERT_OK(Put(1, "one", "eleven"));
312 ASSERT_OK(Put(2, "two", "twelve"));
313 ASSERT_OK(Put(3, "three", "thirteen"));
314 ASSERT_OK(Put(4, "four", "fourteen"));
315 ASSERT_OK(Put(5, "five", "fifteen"));
316 TEST_SYNC_POINT("CheckpointTest::CheckpointCF:2");
318 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
319 ASSERT_OK(Put(1, "one", "twentyone"));
320 ASSERT_OK(Put(2, "two", "twentytwo"));
321 ASSERT_OK(Put(3, "three", "twentythree"));
322 ASSERT_OK(Put(4, "four", "twentyfour"));
323 ASSERT_OK(Put(5, "five", "twentyfive"));
326 // Open snapshot and verify contents while DB is running
327 options
.create_if_missing
= false;
328 std::vector
<std::string
> cfs
;
329 cfs
= {kDefaultColumnFamilyName
, "one", "two", "three", "four", "five"};
330 std::vector
<ColumnFamilyDescriptor
> column_families
;
331 for (size_t i
= 0; i
< cfs
.size(); ++i
) {
332 column_families
.push_back(ColumnFamilyDescriptor(cfs
[i
], options
));
334 ASSERT_OK(DB::Open(options
, snapshot_name
,
335 column_families
, &cphandles
, &snapshotDB
));
336 ASSERT_OK(snapshotDB
->Get(roptions
, cphandles
[0], "Default", &result
));
337 ASSERT_EQ("Default1", result
);
338 ASSERT_OK(snapshotDB
->Get(roptions
, cphandles
[1], "one", &result
));
339 ASSERT_EQ("eleven", result
);
340 ASSERT_OK(snapshotDB
->Get(roptions
, cphandles
[2], "two", &result
));
341 for (auto h
: cphandles
) {
346 snapshotDB
= nullptr;
347 ASSERT_OK(DestroyDB(snapshot_name
, options
));
350 TEST_F(CheckpointTest
, CheckpointCFNoFlush
) {
351 Options options
= CurrentOptions();
352 CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options
);
354 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
356 ASSERT_OK(Put(0, "Default", "Default"));
357 ASSERT_OK(Put(1, "one", "one"));
359 ASSERT_OK(Put(2, "two", "two"));
361 const std::string snapshot_name
= test::TmpDir(env_
) + "/snapshot";
363 ReadOptions roptions
;
365 std::vector
<ColumnFamilyHandle
*> cphandles
;
367 ASSERT_OK(DestroyDB(snapshot_name
, options
));
368 env_
->DeleteDir(snapshot_name
);
372 rocksdb::SyncPoint::GetInstance()->SetCallBack(
373 "DBImpl::BackgroundCallFlush:start", [&](void* arg
) {
374 // Flush should never trigger.
377 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
378 Checkpoint
* checkpoint
;
379 ASSERT_OK(Checkpoint::Create(db_
, &checkpoint
));
380 ASSERT_OK(checkpoint
->CreateCheckpoint(snapshot_name
, 1000000));
381 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
384 ASSERT_OK(Put(1, "one", "two"));
386 ASSERT_OK(Put(2, "two", "twentytwo"));
388 EXPECT_OK(DestroyDB(dbname_
, options
));
390 // Open snapshot and verify contents while DB is running
391 options
.create_if_missing
= false;
392 std::vector
<std::string
> cfs
;
393 cfs
= {kDefaultColumnFamilyName
, "one", "two", "three", "four", "five"};
394 std::vector
<ColumnFamilyDescriptor
> column_families
;
395 for (size_t i
= 0; i
< cfs
.size(); ++i
) {
396 column_families
.push_back(ColumnFamilyDescriptor(cfs
[i
], options
));
398 ASSERT_OK(DB::Open(options
, snapshot_name
, column_families
, &cphandles
,
400 ASSERT_OK(snapshotDB
->Get(roptions
, cphandles
[0], "Default", &result
));
401 ASSERT_EQ("Default", result
);
402 ASSERT_OK(snapshotDB
->Get(roptions
, cphandles
[1], "one", &result
));
403 ASSERT_EQ("one", result
);
404 ASSERT_OK(snapshotDB
->Get(roptions
, cphandles
[2], "two", &result
));
405 ASSERT_EQ("two", result
);
406 for (auto h
: cphandles
) {
411 snapshotDB
= nullptr;
412 ASSERT_OK(DestroyDB(snapshot_name
, options
));
415 TEST_F(CheckpointTest
, CurrentFileModifiedWhileCheckpointing
) {
416 const std::string kSnapshotName
= test::TmpDir(env_
) + "/snapshot";
417 ASSERT_OK(DestroyDB(kSnapshotName
, CurrentOptions()));
418 env_
->DeleteDir(kSnapshotName
);
420 Options options
= CurrentOptions();
421 options
.max_manifest_file_size
= 0; // always rollover manifest for file add
424 rocksdb::SyncPoint::GetInstance()->LoadDependency(
425 {// Get past the flush in the checkpoint thread before adding any keys to
426 // the db so the checkpoint thread won't hit the WriteManifest
428 {"DBImpl::GetLiveFiles:1",
429 "CheckpointTest::CurrentFileModifiedWhileCheckpointing:PrePut"},
430 // Roll the manifest during checkpointing right after live files are
432 {"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
433 "VersionSet::LogAndApply:WriteManifest"},
434 {"VersionSet::LogAndApply:WriteManifestDone",
435 "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}});
436 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
438 rocksdb::port::Thread
t([&]() {
439 Checkpoint
* checkpoint
;
440 ASSERT_OK(Checkpoint::Create(db_
, &checkpoint
));
441 ASSERT_OK(checkpoint
->CreateCheckpoint(kSnapshotName
));
445 "CheckpointTest::CurrentFileModifiedWhileCheckpointing:PrePut");
446 ASSERT_OK(Put("Default", "Default1"));
450 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
453 // Successful Open() implies that CURRENT pointed to the manifest in the
455 ASSERT_OK(DB::Open(options
, kSnapshotName
, &snapshotDB
));
457 snapshotDB
= nullptr;
460 TEST_F(CheckpointTest
, CurrentFileModifiedWhileCheckpointing2PC
) {
462 const std::string kSnapshotName
= test::TmpDir(env_
) + "/snapshot";
463 const std::string dbname
= test::TmpDir() + "/transaction_testdb";
464 ASSERT_OK(DestroyDB(kSnapshotName
, CurrentOptions()));
465 ASSERT_OK(DestroyDB(dbname
, CurrentOptions()));
466 env_
->DeleteDir(kSnapshotName
);
467 env_
->DeleteDir(dbname
);
469 Options options
= CurrentOptions();
470 options
.allow_2pc
= true;
471 // allow_2pc is implicitly set with tx prepare
472 // options.allow_2pc = true;
473 TransactionDBOptions txn_db_options
;
475 Status s
= TransactionDB::Open(options
, txn_db_options
, dbname
, &txdb
);
477 ColumnFamilyHandle
* cfa
;
478 ColumnFamilyHandle
* cfb
;
479 ColumnFamilyOptions cf_options
;
480 ASSERT_OK(txdb
->CreateColumnFamily(cf_options
, "CFA", &cfa
));
482 WriteOptions write_options
;
483 // Insert something into CFB so lots of log files will be kept
484 // before creating the checkpoint.
485 ASSERT_OK(txdb
->CreateColumnFamily(cf_options
, "CFB", &cfb
));
486 ASSERT_OK(txdb
->Put(write_options
, cfb
, "", ""));
488 ReadOptions read_options
;
490 TransactionOptions txn_options
;
491 Transaction
* txn
= txdb
->BeginTransaction(write_options
, txn_options
);
492 s
= txn
->SetName("xid");
494 ASSERT_EQ(txdb
->GetTransactionByName("xid"), txn
);
496 s
= txn
->Put(Slice("foo"), Slice("bar"));
497 s
= txn
->Put(cfa
, Slice("foocfa"), Slice("barcfa"));
499 // Writing prepare into middle of first WAL, then flush WALs many times
500 for (int i
= 1; i
<= 100000; i
++) {
501 Transaction
* tx
= txdb
->BeginTransaction(write_options
, txn_options
);
502 ASSERT_OK(tx
->SetName("x"));
503 ASSERT_OK(tx
->Put(Slice(std::to_string(i
)), Slice("val")));
504 ASSERT_OK(tx
->Put(cfa
, Slice("aaa"), Slice("111")));
505 ASSERT_OK(tx
->Prepare());
506 ASSERT_OK(tx
->Commit());
507 if (i
% 10000 == 0) {
508 txdb
->Flush(FlushOptions());
511 ASSERT_OK(txn
->Prepare());
515 rocksdb::SyncPoint::GetInstance()->LoadDependency(
516 {{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
517 "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit"},
518 {"CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit",
519 "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}});
520 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
521 rocksdb::port::Thread
t([&]() {
522 Checkpoint
* checkpoint
;
523 ASSERT_OK(Checkpoint::Create(txdb
, &checkpoint
));
524 ASSERT_OK(checkpoint
->CreateCheckpoint(kSnapshotName
));
528 "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PreCommit");
529 ASSERT_OK(txn
->Commit());
532 "CheckpointTest::CurrentFileModifiedWhileCheckpointing2PC:PostCommit");
535 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
537 // No more than two logs files should exist.
538 std::vector
<std::string
> files
;
539 env_
->GetChildren(kSnapshotName
, &files
);
540 int num_log_files
= 0;
541 for (auto& file
: files
) {
544 WalFileType log_type
;
545 if (ParseFileName(file
, &num
, &type
, &log_type
) && type
== kLogFile
) {
549 // One flush after preapare + one outstanding file before checkpoint + one log
550 // file generated after checkpoint.
551 ASSERT_LE(num_log_files
, 3);
553 TransactionDB
* snapshotDB
;
554 std::vector
<ColumnFamilyDescriptor
> column_families
;
555 column_families
.push_back(
556 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, ColumnFamilyOptions()));
557 column_families
.push_back(
558 ColumnFamilyDescriptor("CFA", ColumnFamilyOptions()));
559 column_families
.push_back(
560 ColumnFamilyDescriptor("CFB", ColumnFamilyOptions()));
561 std::vector
<rocksdb::ColumnFamilyHandle
*> cf_handles
;
562 ASSERT_OK(TransactionDB::Open(options
, txn_db_options
, kSnapshotName
,
563 column_families
, &cf_handles
, &snapshotDB
));
564 ASSERT_OK(snapshotDB
->Get(read_options
, "foo", &value
));
565 ASSERT_EQ(value
, "bar");
566 ASSERT_OK(snapshotDB
->Get(read_options
, cf_handles
[1], "foocfa", &value
));
567 ASSERT_EQ(value
, "barcfa");
571 delete cf_handles
[0];
572 delete cf_handles
[1];
573 delete cf_handles
[2];
575 snapshotDB
= nullptr;
579 } // namespace rocksdb
581 int main(int argc
, char** argv
) {
582 rocksdb::port::InstallStackTraceHandler();
583 ::testing::InitGoogleTest(&argc
, argv
);
584 return RUN_ALL_TESTS();
590 int main(int argc
, char** argv
) {
591 fprintf(stderr
, "SKIPPED as Checkpoint is not supported in ROCKSDB_LITE\n");
595 #endif // !ROCKSDB_LITE