1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
12 #include "db/db_test_util.h"
13 #include "db/write_batch_internal.h"
14 #include "db/write_thread.h"
15 #include "port/port.h"
16 #include "port/stack_trace.h"
17 #include "test_util/sync_point.h"
18 #include "util/random.h"
19 #include "util/string_util.h"
20 #include "utilities/fault_injection_env.h"
22 namespace ROCKSDB_NAMESPACE
{
24 // Test variations of WriteImpl.
25 class DBWriteTest
: public DBTestBase
, public testing::WithParamInterface
<int> {
27 DBWriteTest() : DBTestBase("/db_write_test", /*env_do_fsync=*/true) {}
29 Options
GetOptions() { return DBTestBase::GetOptions(GetParam()); }
31 void Open() { DBTestBase::Reopen(GetOptions()); }
34 // It is invalid to do sync write while disabling WAL.
35 TEST_P(DBWriteTest
, SyncAndDisableWAL
) {
36 WriteOptions write_options
;
37 write_options
.sync
= true;
38 write_options
.disableWAL
= true;
39 ASSERT_TRUE(dbfull()->Put(write_options
, "foo", "bar").IsInvalidArgument());
41 ASSERT_OK(batch
.Put("foo", "bar"));
42 ASSERT_TRUE(dbfull()->Write(write_options
, &batch
).IsInvalidArgument());
45 TEST_P(DBWriteTest
, WriteStallRemoveNoSlowdownWrite
) {
46 Options options
= GetOptions();
47 options
.level0_stop_writes_trigger
= options
.level0_slowdown_writes_trigger
=
49 std::vector
<port::Thread
> threads
;
50 std::atomic
<int> thread_num(0);
52 port::CondVar
cv(&mutex
);
58 std::function
<void()> write_slowdown_func
= [&]() {
59 int a
= thread_num
.fetch_add(1);
60 std::string key
= "foo" + std::to_string(a
);
62 wo
.no_slowdown
= false;
63 dbfull()->Put(wo
, key
, "bar");
65 std::function
<void()> write_no_slowdown_func
= [&]() {
66 int a
= thread_num
.fetch_add(1);
67 std::string key
= "foo" + std::to_string(a
);
69 wo
.no_slowdown
= true;
70 dbfull()->Put(wo
, key
, "bar");
72 std::function
<void(void*)> unblock_main_thread_func
= [&](void*) {
79 // Create 3 L0 files and schedule 4th without waiting
80 Put("foo" + std::to_string(thread_num
.fetch_add(1)), "bar");
82 Put("foo" + std::to_string(thread_num
.fetch_add(1)), "bar");
84 Put("foo" + std::to_string(thread_num
.fetch_add(1)), "bar");
86 Put("foo" + std::to_string(thread_num
.fetch_add(1)), "bar");
88 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
89 "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func
);
90 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
91 {{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:1",
92 "DBImpl::BackgroundCallFlush:start"},
93 {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:2",
94 "DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"},
95 // Make compaction start wait for the write stall to be detected and
96 // implemented by a write group leader
97 {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:3",
98 "BackgroundCallCompaction:0"}});
99 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
101 // Schedule creation of 4th L0 file without waiting. This will seal the
102 // memtable and then wait for a sync point before writing the file. We need
103 // to do it this way because SwitchMemtable() needs to enter the
107 dbfull()->Flush(fopt
);
109 // Create a mix of slowdown/no_slowdown write threads
112 threads
.emplace_back(write_slowdown_func
);
113 while (writers
!= 1) {
117 // Second leader. Will stall writes
118 // Build a writers list with no slowdown in the middle:
120 // | slowdown +<----+ newest
132 threads
.emplace_back(write_slowdown_func
);
133 while (writers
!= 2) {
136 threads
.emplace_back(write_no_slowdown_func
);
137 while (writers
!= 3) {
140 threads
.emplace_back(write_slowdown_func
);
141 while (writers
!= 4) {
147 TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1");
148 dbfull()->TEST_WaitForFlushMemTable(nullptr);
149 // This would have triggered a write stall. Unblock the write group leader
150 TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2");
151 // The leader is going to create missing newer links. When the leader
152 // finishes, the next leader is going to delay writes and fail writers with
155 TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:3");
156 for (auto& t
: threads
) {
160 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
161 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
164 TEST_P(DBWriteTest
, WriteThreadHangOnWriteStall
) {
165 Options options
= GetOptions();
166 options
.level0_stop_writes_trigger
= options
.level0_slowdown_writes_trigger
= 4;
167 std::vector
<port::Thread
> threads
;
168 std::atomic
<int> thread_num(0);
170 port::CondVar
cv(&mutex
);
176 std::function
<void()> write_slowdown_func
= [&]() {
177 int a
= thread_num
.fetch_add(1);
178 std::string key
= "foo" + std::to_string(a
);
180 wo
.no_slowdown
= false;
181 dbfull()->Put(wo
, key
, "bar");
183 std::function
<void()> write_no_slowdown_func
= [&]() {
184 int a
= thread_num
.fetch_add(1);
185 std::string key
= "foo" + std::to_string(a
);
187 wo
.no_slowdown
= true;
188 dbfull()->Put(wo
, key
, "bar");
190 std::function
<void(void *)> unblock_main_thread_func
= [&](void *) {
197 // Create 3 L0 files and schedule 4th without waiting
198 Put("foo" + std::to_string(thread_num
.fetch_add(1)), "bar");
200 Put("foo" + std::to_string(thread_num
.fetch_add(1)), "bar");
202 Put("foo" + std::to_string(thread_num
.fetch_add(1)), "bar");
204 Put("foo" + std::to_string(thread_num
.fetch_add(1)), "bar");
206 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
207 "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func
);
208 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
209 {{"DBWriteTest::WriteThreadHangOnWriteStall:1",
210 "DBImpl::BackgroundCallFlush:start"},
211 {"DBWriteTest::WriteThreadHangOnWriteStall:2",
212 "DBImpl::WriteImpl:BeforeLeaderEnters"},
213 // Make compaction start wait for the write stall to be detected and
214 // implemented by a write group leader
215 {"DBWriteTest::WriteThreadHangOnWriteStall:3",
216 "BackgroundCallCompaction:0"}});
217 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
219 // Schedule creation of 4th L0 file without waiting. This will seal the
220 // memtable and then wait for a sync point before writing the file. We need
221 // to do it this way because SwitchMemtable() needs to enter the
225 dbfull()->Flush(fopt
);
227 // Create a mix of slowdown/no_slowdown write threads
230 threads
.emplace_back(write_slowdown_func
);
231 while (writers
!= 1) {
234 // Second leader. Will stall writes
235 threads
.emplace_back(write_slowdown_func
);
236 threads
.emplace_back(write_no_slowdown_func
);
237 threads
.emplace_back(write_slowdown_func
);
238 threads
.emplace_back(write_no_slowdown_func
);
239 threads
.emplace_back(write_slowdown_func
);
240 while (writers
!= 6) {
245 TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
246 dbfull()->TEST_WaitForFlushMemTable(nullptr);
247 // This would have triggered a write stall. Unblock the write group leader
248 TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
249 // The leader is going to create missing newer links. When the leader finishes,
250 // the next leader is going to delay writes and fail writers with no_slowdown
252 TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
253 for (auto& t
: threads
) {
256 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
257 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
260 TEST_P(DBWriteTest
, IOErrorOnWALWritePropagateToWriteThreadFollower
) {
261 constexpr int kNumThreads
= 5;
262 std::unique_ptr
<FaultInjectionTestEnv
> mock_env(
263 new FaultInjectionTestEnv(env_
));
264 Options options
= GetOptions();
265 options
.env
= mock_env
.get();
267 std::atomic
<int> ready_count
{0};
268 std::atomic
<int> leader_count
{0};
269 std::vector
<port::Thread
> threads
;
270 mock_env
->SetFilesystemActive(false);
272 // Wait until all threads linked to write threads, to make sure
273 // all threads join the same batch group.
274 SyncPoint::GetInstance()->SetCallBack(
275 "WriteThread::JoinBatchGroup:Wait", [&](void* arg
) {
277 auto* w
= reinterpret_cast<WriteThread::Writer
*>(arg
);
278 if (w
->state
== WriteThread::STATE_GROUP_LEADER
) {
280 while (ready_count
< kNumThreads
) {
285 SyncPoint::GetInstance()->EnableProcessing();
286 for (int i
= 0; i
< kNumThreads
; i
++) {
287 threads
.push_back(port::Thread(
289 // All threads should fail.
290 auto res
= Put("key" + ToString(index
), "value");
291 if (options
.manual_wal_flush
) {
292 ASSERT_TRUE(res
.ok());
293 // we should see fs error when we do the flush
295 // TSAN reports a false alarm for lock-order-inversion but Open and
296 // FlushWAL are not run concurrently. Disabling this until TSAN is
298 // res = dbfull()->FlushWAL(false);
299 // ASSERT_FALSE(res.ok());
301 ASSERT_FALSE(res
.ok());
306 for (int i
= 0; i
< kNumThreads
; i
++) {
309 ASSERT_EQ(1, leader_count
);
310 // Close before mock_env destruct.
314 TEST_P(DBWriteTest
, ManualWalFlushInEffect
) {
315 Options options
= GetOptions();
317 // try the 1st WAL created during open
318 ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
319 ASSERT_TRUE(options
.manual_wal_flush
!= dbfull()->TEST_WALBufferIsEmpty());
320 ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
321 ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
322 // try the 2nd wal created during SwitchWAL
323 dbfull()->TEST_SwitchWAL();
324 ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
325 ASSERT_TRUE(options
.manual_wal_flush
!= dbfull()->TEST_WALBufferIsEmpty());
326 ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
327 ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
330 TEST_P(DBWriteTest
, IOErrorOnWALWriteTriggersReadOnlyMode
) {
331 std::unique_ptr
<FaultInjectionTestEnv
> mock_env(
332 new FaultInjectionTestEnv(env_
));
333 Options options
= GetOptions();
334 options
.env
= mock_env
.get();
336 for (int i
= 0; i
< 2; i
++) {
337 // Forcibly fail WAL write for the first Put only. Subsequent Puts should
338 // fail due to read-only mode
339 mock_env
->SetFilesystemActive(i
!= 0);
340 auto res
= Put("key" + ToString(i
), "value");
341 // TSAN reports a false alarm for lock-order-inversion but Open and
342 // FlushWAL are not run concurrently. Disabling this until TSAN is
345 if (options.manual_wal_flush && i == 0) {
346 // even with manual_wal_flush the 2nd Put should return error because of
347 // the read-only mode
348 ASSERT_TRUE(res.ok());
349 // we should see fs error when we do the flush
350 res = dbfull()->FlushWAL(false);
353 if (!options
.manual_wal_flush
) {
354 ASSERT_FALSE(res
.ok());
357 // Close before mock_env destruct.
361 TEST_P(DBWriteTest
, IOErrorOnSwitchMemtable
) {
363 std::unique_ptr
<FaultInjectionTestEnv
> mock_env(
364 new FaultInjectionTestEnv(env_
));
365 Options options
= GetOptions();
366 options
.env
= mock_env
.get();
367 options
.writable_file_max_buffer_size
= 4 * 1024 * 1024;
368 options
.write_buffer_size
= 3 * 512 * 1024;
369 options
.wal_bytes_per_sync
= 256 * 1024;
370 options
.manual_wal_flush
= true;
372 mock_env
->SetFilesystemActive(false, Status::IOError("Not active"));
374 for (int i
= 0; i
< 4 * 512; ++i
) {
375 s
= Put(Key(i
), rnd
.RandomString(1024));
380 ASSERT_EQ(s
.severity(), Status::Severity::kFatalError
);
382 mock_env
->SetFilesystemActive(true);
383 // Close before mock_env destruct.
387 // Test that db->LockWAL() flushes the WAL after locking.
388 TEST_P(DBWriteTest
, LockWalInEffect
) {
389 Options options
= GetOptions();
391 // try the 1st WAL created during open
392 ASSERT_OK(Put("key" + ToString(0), "value"));
393 ASSERT_TRUE(options
.manual_wal_flush
!= dbfull()->TEST_WALBufferIsEmpty());
394 ASSERT_OK(dbfull()->LockWAL());
395 ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
396 ASSERT_OK(dbfull()->UnlockWAL());
397 // try the 2nd wal created during SwitchWAL
398 dbfull()->TEST_SwitchWAL();
399 ASSERT_OK(Put("key" + ToString(0), "value"));
400 ASSERT_TRUE(options
.manual_wal_flush
!= dbfull()->TEST_WALBufferIsEmpty());
401 ASSERT_OK(dbfull()->LockWAL());
402 ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
403 ASSERT_OK(dbfull()->UnlockWAL());
406 TEST_P(DBWriteTest
, ConcurrentlyDisabledWAL
) {
407 Options options
= GetOptions();
408 options
.statistics
= ROCKSDB_NAMESPACE::CreateDBStatistics();
409 options
.statistics
->set_stats_level(StatsLevel::kAll
);
411 std::string wal_key_prefix
= "WAL_KEY_";
412 std::string no_wal_key_prefix
= "K_";
413 // 100 KB value each for NO-WAL operation
414 std::string
no_wal_value(1024 * 100, 'X');
415 // 1B value each for WAL operation
416 std::string wal_value
= "0";
417 std::thread threads
[10];
418 for (int t
= 0; t
< 10; t
++) {
419 threads
[t
] = std::thread([t
, wal_key_prefix
, wal_value
, no_wal_key_prefix
, no_wal_value
, this] {
420 for(int i
= 0; i
< 10; i
++) {
421 ROCKSDB_NAMESPACE::WriteOptions write_option_disable
;
422 write_option_disable
.disableWAL
= true;
423 ROCKSDB_NAMESPACE::WriteOptions write_option_default
;
424 std::string no_wal_key
= no_wal_key_prefix
+ std::to_string(t
) +
425 "_" + std::to_string(i
);
426 this->Put(no_wal_key
, no_wal_value
, write_option_disable
);
427 std::string wal_key
=
428 wal_key_prefix
+ std::to_string(i
) + "_" + std::to_string(i
);
429 this->Put(wal_key
, wal_value
, write_option_default
);
435 for (auto& t
: threads
) {
438 uint64_t bytes_num
= options
.statistics
->getTickerCount(
439 ROCKSDB_NAMESPACE::Tickers::WAL_FILE_BYTES
);
440 // written WAL size should less than 100KB (even included HEADER & FOOTER overhead)
441 ASSERT_LE(bytes_num
, 1024 * 100);
444 INSTANTIATE_TEST_CASE_P(DBWriteTestInstance
, DBWriteTest
,
445 testing::Values(DBTestBase::kDefault
,
446 DBTestBase::kConcurrentWALWrites
,
447 DBTestBase::kPipelinedWrite
));
449 } // namespace ROCKSDB_NAMESPACE
451 int main(int argc
, char** argv
) {
452 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
453 ::testing::InitGoogleTest(&argc
, argv
);
454 return RUN_ALL_TESTS();