1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright 2014 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 // This test uses a custom Env to keep track of the state of a filesystem as of
11 // the last "sync". It then checks for data loss errors by purposely dropping
12 // file data (or entire files) not protected by a "sync".
14 #include "db/db_impl/db_impl.h"
15 #include "db/log_format.h"
16 #include "db/version_set.h"
17 #include "env/mock_env.h"
18 #include "file/filename.h"
19 #include "rocksdb/cache.h"
20 #include "rocksdb/db.h"
21 #include "rocksdb/env.h"
22 #include "rocksdb/table.h"
23 #include "rocksdb/write_batch.h"
24 #include "test_util/sync_point.h"
25 #include "test_util/testharness.h"
26 #include "test_util/testutil.h"
27 #include "util/mutexlock.h"
28 #include "util/random.h"
29 #include "utilities/fault_injection_env.h"
31 namespace ROCKSDB_NAMESPACE
{
33 static const int kValueSize
= 1000;
34 static const int kMaxNumValues
= 2000;
35 static const size_t kNumIterations
= 3;
37 enum FaultInjectionOptionConfig
{
46 class FaultInjectionTest
47 : public testing::Test
,
48 public testing::WithParamInterface
<std::tuple
<
49 bool, FaultInjectionOptionConfig
, FaultInjectionOptionConfig
>> {
52 int non_inclusive_end_range_
; // kEnd or equivalent to that
53 // When need to make sure data is persistent, sync WAL
55 // When need to make sure data is persistent, call DB::CompactRange()
56 bool sync_use_compact_
;
58 bool sequential_order_
;
62 enum ExpectedVerifResult
{ kValExpectFound
, kValExpectNoError
};
64 kResetDropUnsyncedData
,
65 kResetDropRandomUnsyncedData
,
66 kResetDeleteUnsyncedFiles
,
67 kResetDropAndDeleteUnsynced
70 std::unique_ptr
<Env
> base_env_
;
71 FaultInjectionTestEnv
* env_
;
73 std::shared_ptr
<Cache
> tiny_cache_
;
78 : option_config_(std::get
<1>(GetParam())),
79 non_inclusive_end_range_(std::get
<2>(GetParam())),
81 sync_use_compact_(true),
86 ~FaultInjectionTest() override
{
87 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
88 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
91 bool ChangeOptions() {
93 if (option_config_
>= non_inclusive_end_range_
) {
96 if (option_config_
== kMultiLevels
) {
97 base_env_
.reset(new MockEnv(Env::Default()));
103 // Return the current option configuration.
104 Options
CurrentOptions() {
105 sync_use_wal_
= false;
106 sync_use_compact_
= true;
108 switch (option_config_
) {
110 options
.wal_dir
= test::PerThreadDBPath(env_
, "fault_test_wal");
112 case kDifferentDataDir
:
113 options
.db_paths
.emplace_back(
114 test::PerThreadDBPath(env_
, "fault_test_data"), 1000000U);
117 sync_use_wal_
= true;
118 sync_use_compact_
= false;
121 options
.wal_dir
= test::PerThreadDBPath(env_
, "/fault_test_wal");
122 sync_use_wal_
= true;
123 sync_use_compact_
= false;
126 options
.write_buffer_size
= 64 * 1024;
127 options
.target_file_size_base
= 64 * 1024;
128 options
.level0_file_num_compaction_trigger
= 2;
129 options
.level0_slowdown_writes_trigger
= 2;
130 options
.level0_stop_writes_trigger
= 4;
131 options
.max_bytes_for_level_base
= 128 * 1024;
132 options
.max_write_buffer_number
= 2;
133 options
.max_background_compactions
= 8;
134 options
.max_background_flushes
= 8;
135 sync_use_wal_
= true;
136 sync_use_compact_
= false;
145 assert(db_
== nullptr);
146 assert(tiny_cache_
== nullptr);
147 assert(env_
== nullptr);
150 new FaultInjectionTestEnv(base_env_
? base_env_
.get() : Env::Default());
152 options_
= CurrentOptions();
154 options_
.paranoid_checks
= true;
156 BlockBasedTableOptions table_options
;
157 tiny_cache_
= NewLRUCache(100);
158 table_options
.block_cache
= tiny_cache_
;
159 options_
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
161 dbname_
= test::PerThreadDBPath("fault_test");
163 EXPECT_OK(DestroyDB(dbname_
, options_
));
165 options_
.create_if_missing
= true;
167 options_
.create_if_missing
= false;
171 void SetUp() override
{
172 sequential_order_
= std::get
<0>(GetParam());
176 void TearDown() override
{
179 Status s
= DestroyDB(dbname_
, options_
);
189 void Build(const WriteOptions
& write_options
, int start_idx
, int num_vals
) {
190 std::string key_space
, value_space
;
192 for (int i
= start_idx
; i
< start_idx
+ num_vals
; i
++) {
193 Slice key
= Key(i
, &key_space
);
195 batch
.Put(key
, Value(i
, &value_space
));
196 ASSERT_OK(db_
->Write(write_options
, &batch
));
200 Status
ReadValue(int i
, std::string
* val
) const {
201 std::string key_space
, value_space
;
202 Slice key
= Key(i
, &key_space
);
203 Value(i
, &value_space
);
205 return db_
->Get(options
, key
, val
);
208 Status
Verify(int start_idx
, int num_vals
,
209 ExpectedVerifResult expected
) const {
211 std::string value_space
;
213 for (int i
= start_idx
; i
< start_idx
+ num_vals
&& s
.ok(); i
++) {
214 Value(i
, &value_space
);
215 s
= ReadValue(i
, &val
);
217 EXPECT_EQ(value_space
, val
);
219 if (expected
== kValExpectFound
) {
221 fprintf(stderr
, "Error when read %dth record (expect found): %s\n", i
,
222 s
.ToString().c_str());
225 } else if (!s
.ok() && !s
.IsNotFound()) {
226 fprintf(stderr
, "Error when read %dth record: %s\n", i
,
227 s
.ToString().c_str());
234 // Return the ith key
235 Slice
Key(int i
, std::string
* storage
) const {
236 unsigned long long num
= i
;
237 if (!sequential_order_
) {
239 const int m
= 0x5bd1e995;
244 snprintf(buf
, sizeof(buf
), "%016d", static_cast<int>(num
));
245 storage
->assign(buf
, strlen(buf
));
246 return Slice(*storage
);
249 // Return the value to associate with the specified key
250 Slice
Value(int k
, std::string
* storage
) const {
252 *storage
= r
.RandomString(kValueSize
);
253 return Slice(*storage
);
264 Status s
= DB::Open(options_
, dbname_
, &db_
);
265 assert(db_
!= nullptr);
269 void DeleteAllData() {
270 Iterator
* iter
= db_
->NewIterator(ReadOptions());
271 WriteOptions options
;
272 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
273 ASSERT_OK(db_
->Delete(WriteOptions(), iter
->key()));
278 FlushOptions flush_options
;
279 flush_options
.wait
= true;
280 db_
->Flush(flush_options
);
283 // rnd cannot be null for kResetDropRandomUnsyncedData
284 void ResetDBState(ResetMethod reset_method
, Random
* rnd
= nullptr) {
285 env_
->AssertNoOpenFile();
286 switch (reset_method
) {
287 case kResetDropUnsyncedData
:
288 ASSERT_OK(env_
->DropUnsyncedFileData());
290 case kResetDropRandomUnsyncedData
:
291 ASSERT_OK(env_
->DropRandomUnsyncedFileData(rnd
));
293 case kResetDeleteUnsyncedFiles
:
294 ASSERT_OK(env_
->DeleteFilesCreatedAfterLastDirSync());
296 case kResetDropAndDeleteUnsynced
:
297 ASSERT_OK(env_
->DropUnsyncedFileData());
298 ASSERT_OK(env_
->DeleteFilesCreatedAfterLastDirSync());
305 void PartialCompactTestPreFault(int num_pre_sync
, int num_post_sync
) {
308 WriteOptions write_options
;
309 write_options
.sync
= sync_use_wal_
;
311 Build(write_options
, 0, num_pre_sync
);
312 if (sync_use_compact_
) {
313 db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
315 write_options
.sync
= false;
316 Build(write_options
, num_pre_sync
, num_post_sync
);
319 void PartialCompactTestReopenWithFault(ResetMethod reset_method
,
320 int num_pre_sync
, int num_post_sync
,
321 Random
* rnd
= nullptr) {
322 env_
->SetFilesystemActive(false);
324 ResetDBState(reset_method
, rnd
);
326 ASSERT_OK(Verify(0, num_pre_sync
, FaultInjectionTest::kValExpectFound
));
327 ASSERT_OK(Verify(num_pre_sync
, num_post_sync
,
328 FaultInjectionTest::kValExpectNoError
));
329 WaitCompactionFinish();
330 ASSERT_OK(Verify(0, num_pre_sync
, FaultInjectionTest::kValExpectFound
));
331 ASSERT_OK(Verify(num_pre_sync
, num_post_sync
,
332 FaultInjectionTest::kValExpectNoError
));
335 void NoWriteTestPreFault() {
338 void NoWriteTestReopenWithFault(ResetMethod reset_method
) {
340 ResetDBState(reset_method
);
344 void WaitCompactionFinish() {
345 static_cast<DBImpl
*>(db_
->GetRootDB())->TEST_WaitForCompact();
346 ASSERT_OK(db_
->Put(WriteOptions(), "", ""));
350 class FaultInjectionTestSplitted
: public FaultInjectionTest
{};
352 TEST_P(FaultInjectionTestSplitted
, FaultTest
) {
356 for (size_t idx
= 0; idx
< kNumIterations
; idx
++) {
357 int num_pre_sync
= rnd
.Uniform(kMaxNumValues
);
358 int num_post_sync
= rnd
.Uniform(kMaxNumValues
);
360 PartialCompactTestPreFault(num_pre_sync
, num_post_sync
);
361 PartialCompactTestReopenWithFault(kResetDropUnsyncedData
, num_pre_sync
,
363 NoWriteTestPreFault();
364 NoWriteTestReopenWithFault(kResetDropUnsyncedData
);
366 PartialCompactTestPreFault(num_pre_sync
, num_post_sync
);
367 PartialCompactTestReopenWithFault(kResetDropRandomUnsyncedData
,
368 num_pre_sync
, num_post_sync
, &rnd
);
369 NoWriteTestPreFault();
370 NoWriteTestReopenWithFault(kResetDropUnsyncedData
);
372 // Setting a separate data path won't pass the test as we don't sync
373 // it after creating new files,
374 PartialCompactTestPreFault(num_pre_sync
, num_post_sync
);
375 PartialCompactTestReopenWithFault(kResetDropAndDeleteUnsynced
,
376 num_pre_sync
, num_post_sync
);
377 NoWriteTestPreFault();
378 NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced
);
380 PartialCompactTestPreFault(num_pre_sync
, num_post_sync
);
381 // No new files created so we expect all values since no files will be
383 PartialCompactTestReopenWithFault(kResetDeleteUnsyncedFiles
, num_pre_sync
,
385 NoWriteTestPreFault();
386 NoWriteTestReopenWithFault(kResetDeleteUnsyncedFiles
);
388 } while (ChangeOptions());
391 // Previous log file is not fsynced if sync is forced after log rolling.
392 TEST_P(FaultInjectionTest
, WriteOptionSyncTest
) {
393 test::SleepingBackgroundTask sleeping_task_low
;
394 env_
->SetBackgroundThreads(1, Env::HIGH
);
395 // Block the job queue to prevent flush job from running.
396 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
397 Env::Priority::HIGH
);
398 sleeping_task_low
.WaitUntilSleeping();
400 WriteOptions write_options
;
401 write_options
.sync
= false;
403 std::string key_space
, value_space
;
405 db_
->Put(write_options
, Key(1, &key_space
), Value(1, &value_space
)));
406 FlushOptions flush_options
;
407 flush_options
.wait
= false;
408 ASSERT_OK(db_
->Flush(flush_options
));
409 write_options
.sync
= true;
411 db_
->Put(write_options
, Key(2, &key_space
), Value(2, &value_space
)));
412 db_
->FlushWAL(false);
414 env_
->SetFilesystemActive(false);
415 NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced
);
416 sleeping_task_low
.WakeUp();
417 sleeping_task_low
.WaitUntilDone();
421 Value(2, &value_space
);
422 ASSERT_OK(ReadValue(2, &val
));
423 ASSERT_EQ(value_space
, val
);
425 Value(1, &value_space
);
426 ASSERT_OK(ReadValue(1, &val
));
427 ASSERT_EQ(value_space
, val
);
430 TEST_P(FaultInjectionTest
, UninstalledCompaction
) {
431 options_
.target_file_size_base
= 32 * 1024;
432 options_
.write_buffer_size
= 100 << 10; // 100KB
433 options_
.level0_file_num_compaction_trigger
= 6;
434 options_
.level0_stop_writes_trigger
= 1 << 10;
435 options_
.level0_slowdown_writes_trigger
= 1 << 10;
436 options_
.max_background_compactions
= 1;
439 if (!sequential_order_
) {
440 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
441 {"FaultInjectionTest::FaultTest:0", "DBImpl::BGWorkCompaction"},
442 {"CompactionJob::Run():End", "FaultInjectionTest::FaultTest:1"},
443 {"FaultInjectionTest::FaultTest:2",
444 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
447 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
450 Build(WriteOptions(), 0, kNumKeys
);
451 FlushOptions flush_options
;
452 flush_options
.wait
= true;
453 db_
->Flush(flush_options
);
454 ASSERT_OK(db_
->Put(WriteOptions(), "", ""));
455 TEST_SYNC_POINT("FaultInjectionTest::FaultTest:0");
456 TEST_SYNC_POINT("FaultInjectionTest::FaultTest:1");
457 env_
->SetFilesystemActive(false);
458 TEST_SYNC_POINT("FaultInjectionTest::FaultTest:2");
460 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
461 ResetDBState(kResetDropUnsyncedData
);
463 std::atomic
<bool> opened(false);
464 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
465 "DBImpl::Open:Opened", [&](void* /*arg*/) { opened
.store(true); });
466 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
467 "DBImpl::BGWorkCompaction",
468 [&](void* /*arg*/) { ASSERT_TRUE(opened
.load()); });
469 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
471 ASSERT_OK(Verify(0, kNumKeys
, FaultInjectionTest::kValExpectFound
));
472 WaitCompactionFinish();
473 ASSERT_OK(Verify(0, kNumKeys
, FaultInjectionTest::kValExpectFound
));
474 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
475 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
478 TEST_P(FaultInjectionTest
, ManualLogSyncTest
) {
479 test::SleepingBackgroundTask sleeping_task_low
;
480 env_
->SetBackgroundThreads(1, Env::HIGH
);
481 // Block the job queue to prevent flush job from running.
482 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task_low
,
483 Env::Priority::HIGH
);
484 sleeping_task_low
.WaitUntilSleeping();
486 WriteOptions write_options
;
487 write_options
.sync
= false;
489 std::string key_space
, value_space
;
491 db_
->Put(write_options
, Key(1, &key_space
), Value(1, &value_space
)));
492 FlushOptions flush_options
;
493 flush_options
.wait
= false;
494 ASSERT_OK(db_
->Flush(flush_options
));
496 db_
->Put(write_options
, Key(2, &key_space
), Value(2, &value_space
)));
497 ASSERT_OK(db_
->FlushWAL(true));
499 env_
->SetFilesystemActive(false);
500 NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced
);
501 sleeping_task_low
.WakeUp();
502 sleeping_task_low
.WaitUntilDone();
506 Value(2, &value_space
);
507 ASSERT_OK(ReadValue(2, &val
));
508 ASSERT_EQ(value_space
, val
);
510 Value(1, &value_space
);
511 ASSERT_OK(ReadValue(1, &val
));
512 ASSERT_EQ(value_space
, val
);
515 TEST_P(FaultInjectionTest
, WriteBatchWalTerminationTest
) {
517 Options options
= CurrentOptions();
522 wo
.disableWAL
= false;
524 batch
.Put("cats", "dogs");
525 batch
.MarkWalTerminationPoint();
526 batch
.Put("boys", "girls");
527 ASSERT_OK(db_
->Write(wo
, &batch
));
529 env_
->SetFilesystemActive(false);
530 NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced
);
534 ASSERT_OK(db_
->Get(ro
, "cats", &val
));
535 ASSERT_EQ("dogs", val
);
536 ASSERT_EQ(db_
->Get(ro
, "boys", &val
), Status::NotFound());
539 INSTANTIATE_TEST_CASE_P(
540 FaultTest
, FaultInjectionTest
,
541 ::testing::Values(std::make_tuple(false, kDefault
, kEnd
),
542 std::make_tuple(true, kDefault
, kEnd
)));
544 INSTANTIATE_TEST_CASE_P(
545 FaultTest
, FaultInjectionTestSplitted
,
546 ::testing::Values(std::make_tuple(false, kDefault
, kSyncWal
),
547 std::make_tuple(true, kDefault
, kSyncWal
),
548 std::make_tuple(false, kSyncWal
, kEnd
),
549 std::make_tuple(true, kSyncWal
, kEnd
)));
551 } // namespace ROCKSDB_NAMESPACE
553 int main(int argc
, char** argv
) {
554 ::testing::InitGoogleTest(&argc
, argv
);
555 return RUN_ALL_TESTS();