]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_write_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_write_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include <atomic>
7 #include <fstream>
8 #include <memory>
9 #include <thread>
10 #include <vector>
11
12 #include "db/db_test_util.h"
13 #include "db/write_batch_internal.h"
14 #include "db/write_thread.h"
15 #include "port/port.h"
16 #include "port/stack_trace.h"
17 #include "test_util/sync_point.h"
18 #include "util/random.h"
19 #include "util/string_util.h"
20 #include "utilities/fault_injection_env.h"
21
22 namespace ROCKSDB_NAMESPACE {
23
24 // Test variations of WriteImpl.
25 class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
26 public:
27 DBWriteTest() : DBTestBase("db_write_test", /*env_do_fsync=*/true) {}
28
29 Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
30
31 void Open() { DBTestBase::Reopen(GetOptions()); }
32 };
33
34 class DBWriteTestUnparameterized : public DBTestBase {
35 public:
36 explicit DBWriteTestUnparameterized()
37 : DBTestBase("pipelined_write_test", /*env_do_fsync=*/false) {}
38 };
39
40 // It is invalid to do sync write while disabling WAL.
41 TEST_P(DBWriteTest, SyncAndDisableWAL) {
42 WriteOptions write_options;
43 write_options.sync = true;
44 write_options.disableWAL = true;
45 ASSERT_TRUE(dbfull()->Put(write_options, "foo", "bar").IsInvalidArgument());
46 WriteBatch batch;
47 ASSERT_OK(batch.Put("foo", "bar"));
48 ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
49 }
50
51 TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) {
52 Options options = GetOptions();
53 options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger =
54 4;
55 std::vector<port::Thread> threads;
56 std::atomic<int> thread_num(0);
57 port::Mutex mutex;
58 port::CondVar cv(&mutex);
59 // Guarded by mutex
60 int writers = 0;
61
62 Reopen(options);
63
64 std::function<void()> write_slowdown_func = [&]() {
65 int a = thread_num.fetch_add(1);
66 std::string key = "foo" + std::to_string(a);
67 WriteOptions wo;
68 wo.no_slowdown = false;
69 ASSERT_OK(dbfull()->Put(wo, key, "bar"));
70 };
71 std::function<void()> write_no_slowdown_func = [&]() {
72 int a = thread_num.fetch_add(1);
73 std::string key = "foo" + std::to_string(a);
74 WriteOptions wo;
75 wo.no_slowdown = true;
76 Status s = dbfull()->Put(wo, key, "bar");
77 ASSERT_TRUE(s.ok() || s.IsIncomplete());
78 };
79 std::function<void(void*)> unblock_main_thread_func = [&](void*) {
80 mutex.Lock();
81 ++writers;
82 cv.SignalAll();
83 mutex.Unlock();
84 };
85
86 // Create 3 L0 files and schedule 4th without waiting
87 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
88 ASSERT_OK(Flush());
89 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
90 ASSERT_OK(Flush());
91 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
92 ASSERT_OK(Flush());
93 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
94
95 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
96 "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
97 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
98 {{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:1",
99 "DBImpl::BackgroundCallFlush:start"},
100 {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:2",
101 "DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"},
102 // Make compaction start wait for the write stall to be detected and
103 // implemented by a write group leader
104 {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:3",
105 "BackgroundCallCompaction:0"}});
106 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
107
108 // Schedule creation of 4th L0 file without waiting. This will seal the
109 // memtable and then wait for a sync point before writing the file. We need
110 // to do it this way because SwitchMemtable() needs to enter the
111 // write_thread
112 FlushOptions fopt;
113 fopt.wait = false;
114 ASSERT_OK(dbfull()->Flush(fopt));
115
116 // Create a mix of slowdown/no_slowdown write threads
117 mutex.Lock();
118 // First leader
119 threads.emplace_back(write_slowdown_func);
120 while (writers != 1) {
121 cv.Wait();
122 }
123
124 // Second leader. Will stall writes
125 // Build a writers list with no slowdown in the middle:
126 // +-------------+
127 // | slowdown +<----+ newest
128 // +--+----------+
129 // |
130 // v
131 // +--+----------+
132 // | no slowdown |
133 // +--+----------+
134 // |
135 // v
136 // +--+----------+
137 // | slowdown +
138 // +-------------+
139 threads.emplace_back(write_slowdown_func);
140 while (writers != 2) {
141 cv.Wait();
142 }
143 threads.emplace_back(write_no_slowdown_func);
144 while (writers != 3) {
145 cv.Wait();
146 }
147 threads.emplace_back(write_slowdown_func);
148 while (writers != 4) {
149 cv.Wait();
150 }
151
152 mutex.Unlock();
153
154 TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1");
155 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr));
156 // This would have triggered a write stall. Unblock the write group leader
157 TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2");
158 // The leader is going to create missing newer links. When the leader
159 // finishes, the next leader is going to delay writes and fail writers with
160 // no_slowdown
161
162 TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:3");
163 for (auto& t : threads) {
164 t.join();
165 }
166
167 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
168 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
169 }
170
171 TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
172 Options options = GetOptions();
173 options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger =
174 4;
175 std::vector<port::Thread> threads;
176 std::atomic<int> thread_num(0);
177 port::Mutex mutex;
178 port::CondVar cv(&mutex);
179 // Guarded by mutex
180 int writers = 0;
181
182 Reopen(options);
183
184 std::function<void()> write_slowdown_func = [&]() {
185 int a = thread_num.fetch_add(1);
186 std::string key = "foo" + std::to_string(a);
187 WriteOptions wo;
188 wo.no_slowdown = false;
189 ASSERT_OK(dbfull()->Put(wo, key, "bar"));
190 };
191 std::function<void()> write_no_slowdown_func = [&]() {
192 int a = thread_num.fetch_add(1);
193 std::string key = "foo" + std::to_string(a);
194 WriteOptions wo;
195 wo.no_slowdown = true;
196 Status s = dbfull()->Put(wo, key, "bar");
197 ASSERT_TRUE(s.ok() || s.IsIncomplete());
198 };
199 std::function<void(void*)> unblock_main_thread_func = [&](void*) {
200 mutex.Lock();
201 ++writers;
202 cv.SignalAll();
203 mutex.Unlock();
204 };
205
206 // Create 3 L0 files and schedule 4th without waiting
207 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
208 ASSERT_OK(Flush());
209 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
210 ASSERT_OK(Flush());
211 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
212 ASSERT_OK(Flush());
213 ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
214
215 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
216 "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
217 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
218 {{"DBWriteTest::WriteThreadHangOnWriteStall:1",
219 "DBImpl::BackgroundCallFlush:start"},
220 {"DBWriteTest::WriteThreadHangOnWriteStall:2",
221 "DBImpl::WriteImpl:BeforeLeaderEnters"},
222 // Make compaction start wait for the write stall to be detected and
223 // implemented by a write group leader
224 {"DBWriteTest::WriteThreadHangOnWriteStall:3",
225 "BackgroundCallCompaction:0"}});
226 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
227
228 // Schedule creation of 4th L0 file without waiting. This will seal the
229 // memtable and then wait for a sync point before writing the file. We need
230 // to do it this way because SwitchMemtable() needs to enter the
231 // write_thread
232 FlushOptions fopt;
233 fopt.wait = false;
234 ASSERT_OK(dbfull()->Flush(fopt));
235
236 // Create a mix of slowdown/no_slowdown write threads
237 mutex.Lock();
238 // First leader
239 threads.emplace_back(write_slowdown_func);
240 while (writers != 1) {
241 cv.Wait();
242 }
243 // Second leader. Will stall writes
244 threads.emplace_back(write_slowdown_func);
245 threads.emplace_back(write_no_slowdown_func);
246 threads.emplace_back(write_slowdown_func);
247 threads.emplace_back(write_no_slowdown_func);
248 threads.emplace_back(write_slowdown_func);
249 while (writers != 6) {
250 cv.Wait();
251 }
252 mutex.Unlock();
253
254 TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
255 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr));
256 // This would have triggered a write stall. Unblock the write group leader
257 TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
258 // The leader is going to create missing newer links. When the leader
259 // finishes, the next leader is going to delay writes and fail writers with
260 // no_slowdown
261
262 TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
263 for (auto& t : threads) {
264 t.join();
265 }
266 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
267 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
268 }
269
270 TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
271 constexpr int kNumThreads = 5;
272 std::unique_ptr<FaultInjectionTestEnv> mock_env(
273 new FaultInjectionTestEnv(env_));
274 Options options = GetOptions();
275 options.env = mock_env.get();
276 Reopen(options);
277 std::atomic<int> ready_count{0};
278 std::atomic<int> leader_count{0};
279 std::vector<port::Thread> threads;
280 mock_env->SetFilesystemActive(false);
281
282 // Wait until all threads linked to write threads, to make sure
283 // all threads join the same batch group.
284 SyncPoint::GetInstance()->SetCallBack(
285 "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
286 ready_count++;
287 auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
288 if (w->state == WriteThread::STATE_GROUP_LEADER) {
289 leader_count++;
290 while (ready_count < kNumThreads) {
291 // busy waiting
292 }
293 }
294 });
295 SyncPoint::GetInstance()->EnableProcessing();
296 for (int i = 0; i < kNumThreads; i++) {
297 threads.push_back(port::Thread(
298 [&](int index) {
299 // All threads should fail.
300 auto res = Put("key" + std::to_string(index), "value");
301 if (options.manual_wal_flush) {
302 ASSERT_TRUE(res.ok());
303 // we should see fs error when we do the flush
304
305 // TSAN reports a false alarm for lock-order-inversion but Open and
306 // FlushWAL are not run concurrently. Disabling this until TSAN is
307 // fixed.
308 // res = dbfull()->FlushWAL(false);
309 // ASSERT_FALSE(res.ok());
310 } else {
311 ASSERT_FALSE(res.ok());
312 }
313 },
314 i));
315 }
316 for (int i = 0; i < kNumThreads; i++) {
317 threads[i].join();
318 }
319 ASSERT_EQ(1, leader_count);
320
321 // The Failed PUT operations can cause a BG error to be set.
322 // Mark it as Checked for the ASSERT_STATUS_CHECKED
323 dbfull()->Resume().PermitUncheckedError();
324
325 // Close before mock_env destruct.
326 Close();
327 }
328
329 TEST_F(DBWriteTestUnparameterized, PipelinedWriteRace) {
330 // This test was written to trigger a race in ExitAsBatchGroupLeader in case
331 // enable_pipelined_write_ was true.
332 // Writers for which ShouldWriteToMemtable() evaluates to false are removed
333 // from the write_group via CompleteFollower/ CompleteLeader. Writers in the
334 // middle of the group are fully unlinked, but if that writers is the
335 // last_writer, then we did not update the predecessor's link_older, i.e.,
336 // this writer was still reachable via newest_writer_.
337 //
338 // But the problem was, that CompleteFollower already wakes up the thread
339 // owning that writer before the writer has been removed. This resulted in a
340 // race - if the leader thread was fast enough, then everything was fine.
341 // However, if the woken up thread finished the current write operation and
342 // then performed yet another write, then a new writer instance was added
343 // to newest_writer_. It is possible that the new writer is located on the
344 // same address on stack, and if this happened, then we had a problem,
345 // because the old code tried to find the last_writer in the list to unlink
346 // it, which in this case produced a cycle in the list.
347 // Whether two invocations of PipelinedWriteImpl() by the same thread actually
348 // allocate the writer on the same address depends on the OS and/or compiler,
349 // so it is rather hard to create a deterministic test for this.
350
351 Options options = GetDefaultOptions();
352 options.create_if_missing = true;
353 options.enable_pipelined_write = true;
354 std::vector<port::Thread> threads;
355
356 std::atomic<int> write_counter{0};
357 std::atomic<int> active_writers{0};
358 std::atomic<bool> second_write_starting{false};
359 std::atomic<bool> second_write_in_progress{false};
360 std::atomic<WriteThread::Writer*> leader{nullptr};
361 std::atomic<bool> finished_WAL_write{false};
362
363 DestroyAndReopen(options);
364
365 auto write_one_doc = [&]() {
366 int a = write_counter.fetch_add(1);
367 std::string key = "foo" + std::to_string(a);
368 WriteOptions wo;
369 ASSERT_OK(dbfull()->Put(wo, key, "bar"));
370 --active_writers;
371 };
372
373 auto write_two_docs = [&]() {
374 write_one_doc();
375 second_write_starting = true;
376 write_one_doc();
377 };
378
379 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
380 "WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
381 if (second_write_starting.load()) {
382 second_write_in_progress = true;
383 return;
384 }
385 auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
386 if (w->state == WriteThread::STATE_GROUP_LEADER) {
387 active_writers++;
388 if (leader.load() == nullptr) {
389 leader.store(w);
390 while (active_writers.load() < 2) {
391 // wait for another thread to join the write_group
392 }
393 }
394 } else {
395 // we disable the memtable for all followers so that they they are
396 // removed from the write_group before enqueuing it for the memtable
397 // write
398 w->disable_memtable = true;
399 active_writers++;
400 }
401 });
402
403 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
404 "WriteThread::ExitAsBatchGroupLeader:Start", [&](void* arg) {
405 auto* wg = reinterpret_cast<WriteThread::WriteGroup*>(arg);
406 if (wg->leader == leader && !finished_WAL_write) {
407 finished_WAL_write = true;
408 while (active_writers.load() < 3) {
409 // wait for the new writer to be enqueued
410 }
411 }
412 });
413
414 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
415 "WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters",
416 [&](void* arg) {
417 auto* wg = reinterpret_cast<WriteThread::WriteGroup*>(arg);
418 if (wg->leader == leader) {
419 while (!second_write_in_progress.load()) {
420 // wait for the old follower thread to start the next write
421 }
422 }
423 });
424
425 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
426
427 // start leader + one follower
428 threads.emplace_back(write_one_doc);
429 while (leader.load() == nullptr) {
430 // wait for leader
431 }
432
433 // we perform two writes in the follower, so that for the second write
434 // the thread reinserts a Writer with the same address
435 threads.emplace_back(write_two_docs);
436
437 // wait for the leader to enter ExitAsBatchGroupLeader
438 while (!finished_WAL_write.load()) {
439 // wait for write_group to have finished the WAL writes
440 }
441
442 // start another writer thread to be enqueued before the leader can
443 // complete the writers from its write_group
444 threads.emplace_back(write_one_doc);
445
446 for (auto& t : threads) {
447 t.join();
448 }
449 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
450 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
451 }
452
453 TEST_P(DBWriteTest, ManualWalFlushInEffect) {
454 Options options = GetOptions();
455 Reopen(options);
456 // try the 1st WAL created during open
457 ASSERT_TRUE(Put("key" + std::to_string(0), "value").ok());
458 ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty());
459 ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
460 ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
461 // try the 2nd wal created during SwitchWAL
462 ASSERT_OK(dbfull()->TEST_SwitchWAL());
463 ASSERT_TRUE(Put("key" + std::to_string(0), "value").ok());
464 ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty());
465 ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
466 ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
467 }
468
469 TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) {
470 // Repro race condition bug where unflushed WAL data extended the synced size
471 // recorded to MANIFEST despite being unrecoverable.
472 Options options = GetOptions();
473 std::unique_ptr<FaultInjectionTestEnv> fault_env(
474 new FaultInjectionTestEnv(env_));
475 options.env = fault_env.get();
476 options.manual_wal_flush = true;
477 options.track_and_verify_wals_in_manifest = true;
478 Reopen(options);
479
480 ASSERT_OK(Put("key1", "val1"));
481
482 SyncPoint::GetInstance()->SetCallBack(
483 "DBImpl::SyncWAL:Begin",
484 [this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); });
485 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
486
487 ASSERT_OK(db_->FlushWAL(true /* sync */));
488
489 // Ensure callback ran.
490 ASSERT_EQ("val2", Get("key2"));
491
492 Close();
493
494 // Simulate full loss of unsynced data. This drops "key2" -> "val2" from the
495 // DB WAL.
496 fault_env->DropUnsyncedFileData();
497
498 Reopen(options);
499
500 // Need to close before `fault_env` goes out of scope.
501 Close();
502 }
503
504 TEST_P(DBWriteTest, InactiveWalFullySyncedBeforeUntracked) {
505 // Repro bug where a WAL is appended and switched after
506 // `FlushWAL(true /* sync */)`'s sync finishes and before it untracks fully
507 // synced inactive logs. Previously such a WAL would be wrongly untracked
508 // so the final append would never be synced.
509 Options options = GetOptions();
510 std::unique_ptr<FaultInjectionTestEnv> fault_env(
511 new FaultInjectionTestEnv(env_));
512 options.env = fault_env.get();
513 Reopen(options);
514
515 ASSERT_OK(Put("key1", "val1"));
516
517 SyncPoint::GetInstance()->SetCallBack(
518 "DBImpl::SyncWAL:BeforeMarkLogsSynced:1", [this](void* /* arg */) {
519 ASSERT_OK(Put("key2", "val2"));
520 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
521 });
522 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
523
524 ASSERT_OK(db_->FlushWAL(true /* sync */));
525
526 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
527 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
528
529 ASSERT_OK(Put("key3", "val3"));
530
531 ASSERT_OK(db_->FlushWAL(true /* sync */));
532
533 Close();
534
535 // Simulate full loss of unsynced data. This should drop nothing since we did
536 // `FlushWAL(true /* sync */)` before `Close()`.
537 fault_env->DropUnsyncedFileData();
538
539 Reopen(options);
540
541 ASSERT_EQ("val1", Get("key1"));
542 ASSERT_EQ("val2", Get("key2"));
543 ASSERT_EQ("val3", Get("key3"));
544
545 // Need to close before `fault_env` goes out of scope.
546 Close();
547 }
548
549 TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
550 std::unique_ptr<FaultInjectionTestEnv> mock_env(
551 new FaultInjectionTestEnv(env_));
552 Options options = GetOptions();
553 options.env = mock_env.get();
554 Reopen(options);
555 for (int i = 0; i < 2; i++) {
556 // Forcibly fail WAL write for the first Put only. Subsequent Puts should
557 // fail due to read-only mode
558 mock_env->SetFilesystemActive(i != 0);
559 auto res = Put("key" + std::to_string(i), "value");
560 // TSAN reports a false alarm for lock-order-inversion but Open and
561 // FlushWAL are not run concurrently. Disabling this until TSAN is
562 // fixed.
563 /*
564 if (options.manual_wal_flush && i == 0) {
565 // even with manual_wal_flush the 2nd Put should return error because of
566 // the read-only mode
567 ASSERT_TRUE(res.ok());
568 // we should see fs error when we do the flush
569 res = dbfull()->FlushWAL(false);
570 }
571 */
572 if (!options.manual_wal_flush) {
573 ASSERT_NOK(res);
574 } else {
575 ASSERT_OK(res);
576 }
577 }
578 // Close before mock_env destruct.
579 Close();
580 }
581
582 TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
583 Random rnd(301);
584 std::unique_ptr<FaultInjectionTestEnv> mock_env(
585 new FaultInjectionTestEnv(env_));
586 Options options = GetOptions();
587 options.env = mock_env.get();
588 options.writable_file_max_buffer_size = 4 * 1024 * 1024;
589 options.write_buffer_size = 3 * 512 * 1024;
590 options.wal_bytes_per_sync = 256 * 1024;
591 options.manual_wal_flush = true;
592 Reopen(options);
593 mock_env->SetFilesystemActive(false, Status::IOError("Not active"));
594 Status s;
595 for (int i = 0; i < 4 * 512; ++i) {
596 s = Put(Key(i), rnd.RandomString(1024));
597 if (!s.ok()) {
598 break;
599 }
600 }
601 ASSERT_EQ(s.severity(), Status::Severity::kFatalError);
602
603 mock_env->SetFilesystemActive(true);
604 // Close before mock_env destruct.
605 Close();
606 }
607
608 // Test that db->LockWAL() flushes the WAL after locking.
609 TEST_P(DBWriteTest, LockWalInEffect) {
610 Options options = GetOptions();
611 Reopen(options);
612 // try the 1st WAL created during open
613 ASSERT_OK(Put("key" + std::to_string(0), "value"));
614 ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty());
615 ASSERT_OK(dbfull()->LockWAL());
616 ASSERT_TRUE(dbfull()->WALBufferIsEmpty(false));
617 ASSERT_OK(dbfull()->UnlockWAL());
618 // try the 2nd wal created during SwitchWAL
619 ASSERT_OK(dbfull()->TEST_SwitchWAL());
620 ASSERT_OK(Put("key" + std::to_string(0), "value"));
621 ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty());
622 ASSERT_OK(dbfull()->LockWAL());
623 ASSERT_TRUE(dbfull()->WALBufferIsEmpty(false));
624 ASSERT_OK(dbfull()->UnlockWAL());
625 }
626
627 TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
628 Options options = GetOptions();
629 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
630 options.statistics->set_stats_level(StatsLevel::kAll);
631 Reopen(options);
632 std::string wal_key_prefix = "WAL_KEY_";
633 std::string no_wal_key_prefix = "K_";
634 // 100 KB value each for NO-WAL operation
635 std::string no_wal_value(1024 * 100, 'X');
636 // 1B value each for WAL operation
637 std::string wal_value = "0";
638 std::thread threads[10];
639 for (int t = 0; t < 10; t++) {
640 threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix,
641 no_wal_value, this] {
642 for (int i = 0; i < 10; i++) {
643 ROCKSDB_NAMESPACE::WriteOptions write_option_disable;
644 write_option_disable.disableWAL = true;
645 ROCKSDB_NAMESPACE::WriteOptions write_option_default;
646 std::string no_wal_key =
647 no_wal_key_prefix + std::to_string(t) + "_" + std::to_string(i);
648 ASSERT_OK(this->Put(no_wal_key, no_wal_value, write_option_disable));
649 std::string wal_key =
650 wal_key_prefix + std::to_string(i) + "_" + std::to_string(i);
651 ASSERT_OK(this->Put(wal_key, wal_value, write_option_default));
652 ASSERT_OK(dbfull()->SyncWAL());
653 }
654 return;
655 });
656 }
657 for (auto& t : threads) {
658 t.join();
659 }
660 uint64_t bytes_num = options.statistics->getTickerCount(
661 ROCKSDB_NAMESPACE::Tickers::WAL_FILE_BYTES);
662 // written WAL size should less than 100KB (even included HEADER & FOOTER
663 // overhead)
664 ASSERT_LE(bytes_num, 1024 * 100);
665 }
666
667 INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
668 testing::Values(DBTestBase::kDefault,
669 DBTestBase::kConcurrentWALWrites,
670 DBTestBase::kPipelinedWrite));
671
672 } // namespace ROCKSDB_NAMESPACE
673
674 int main(int argc, char** argv) {
675 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
676 ::testing::InitGoogleTest(&argc, argv);
677 RegisterCustomObjects(argc, argv);
678 return RUN_ALL_TESTS();
679 }