1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include <atomic>
12 #include "db/db_impl/db_impl.h"
13 #include "db/db_test_util.h"
14 #include "port/port.h"
15 #include "port/stack_trace.h"
16 #include "test_util/fault_injection_test_env.h"
17 #include "test_util/sync_point.h"
18 #include "util/cast_util.h"
19 #include "util/mutexlock.h"
21 namespace ROCKSDB_NAMESPACE {
23 class DBFlushTest : public DBTestBase {
24 public:
25 DBFlushTest() : DBTestBase("/db_flush_test") {}
26 };
28 class DBFlushDirectIOTest : public DBFlushTest,
29 public ::testing::WithParamInterface<bool> {
30 public:
31 DBFlushDirectIOTest() : DBFlushTest() {}
32 };
34 class DBAtomicFlushTest : public DBFlushTest,
35 public ::testing::WithParamInterface<bool> {
36 public:
37 DBAtomicFlushTest() : DBFlushTest() {}
38 };
40 // We had issue when two background threads trying to flush at the same time,
41 // only one of them get committed. The test verifies the issue is fixed.
42 TEST_F(DBFlushTest, FlushWhileWritingManifest) {
43 Options options;
44 options.disable_auto_compactions = true;
45 options.max_background_flushes = 2;
46 options.env = env_;
47 Reopen(options);
48 FlushOptions no_wait;
49 no_wait.wait = false;
50 no_wait.allow_write_stall=true;
52 SyncPoint::GetInstance()->LoadDependency(
53 {{"VersionSet::LogAndApply:WriteManifest",
54 "DBFlushTest::FlushWhileWritingManifest:1"},
55 {"MemTableList::TryInstallMemtableFlushResults:InProgress",
56 "VersionSet::LogAndApply:WriteManifestDone"}});
57 SyncPoint::GetInstance()->EnableProcessing();
59 ASSERT_OK(Put("foo", "v"));
60 ASSERT_OK(dbfull()->Flush(no_wait));
61 TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
62 ASSERT_OK(Put("bar", "v"));
63 ASSERT_OK(dbfull()->Flush(no_wait));
64 // If the issue is hit we will wait here forever.
65 dbfull()->TEST_WaitForFlushMemTable();
66 #ifndef ROCKSDB_LITE
67 ASSERT_EQ(2, TotalTableFiles());
68 #endif // ROCKSDB_LITE
69 }
71 // Disable this test temporarily on Travis as it fails intermittently.
72 // Github issue: #4151
73 TEST_F(DBFlushTest, SyncFail) {
74 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
75 new FaultInjectionTestEnv(env_));
76 Options options;
77 options.disable_auto_compactions = true;
78 options.env = fault_injection_env.get();
80 SyncPoint::GetInstance()->LoadDependency(
81 {{"DBFlushTest::SyncFail:GetVersionRefCount:1",
82 "DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"},
83 {"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables",
84 "DBFlushTest::SyncFail:GetVersionRefCount:2"},
85 {"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
86 {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
87 SyncPoint::GetInstance()->EnableProcessing();
89 CreateAndReopenWithCF({"pikachu"}, options);
90 Put("key", "value");
91 auto* cfd =
92 reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
93 ->cfd();
94 FlushOptions flush_options;
95 flush_options.wait = false;
96 ASSERT_OK(dbfull()->Flush(flush_options));
97 // Flush installs a new super-version. Get the ref count after that.
98 auto current_before = cfd->current();
99 int refs_before = cfd->current()->TEST_refs();
100 TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:1");
101 TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:2");
102 int refs_after_picking_memtables = cfd->current()->TEST_refs();
103 ASSERT_EQ(refs_before + 1, refs_after_picking_memtables);
104 fault_injection_env->SetFilesystemActive(false);
105 TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
106 TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
107 fault_injection_env->SetFilesystemActive(true);
108 // Now the background job will do the flush; wait for it.
109 dbfull()->TEST_WaitForFlushMemTable();
110 #ifndef ROCKSDB_LITE
111 ASSERT_EQ("", FilesPerLevel()); // flush failed.
112 #endif // ROCKSDB_LITE
113 // Backgroun flush job should release ref count to current version.
114 ASSERT_EQ(current_before, cfd->current());
115 ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
116 Destroy(options);
117 }
119 TEST_F(DBFlushTest, SyncSkip) {
120 Options options = CurrentOptions();
122 SyncPoint::GetInstance()->LoadDependency(
123 {{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedLogs:Skip"},
124 {"DBImpl::SyncClosedLogs:Skip", "DBFlushTest::SyncSkip:2"}});
125 SyncPoint::GetInstance()->EnableProcessing();
127 Reopen(options);
128 Put("key", "value");
130 FlushOptions flush_options;
131 flush_options.wait = false;
132 ASSERT_OK(dbfull()->Flush(flush_options));
134 TEST_SYNC_POINT("DBFlushTest::SyncSkip:1");
135 TEST_SYNC_POINT("DBFlushTest::SyncSkip:2");
137 // Now the background job will do the flush; wait for it.
138 dbfull()->TEST_WaitForFlushMemTable();
140 Destroy(options);
141 }
143 TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
144 // Verify setting an empty high-pri (flush) thread pool causes flushes to be
145 // scheduled in the low-pri (compaction) thread pool.
146 Options options = CurrentOptions();
147 options.level0_file_num_compaction_trigger = 4;
148 options.memtable_factory.reset(new SpecialSkipListFactory(1));
149 Reopen(options);
150 env_->SetBackgroundThreads(0, Env::HIGH);
152 std::thread::id tid;
153 int num_flushes = 0, num_compactions = 0;
154 SyncPoint::GetInstance()->SetCallBack(
155 "DBImpl::BGWorkFlush", [&](void* /*arg*/) {
156 if (tid == std::thread::id()) {
157 tid = std::this_thread::get_id();
158 } else {
159 ASSERT_EQ(tid, std::this_thread::get_id());
160 }
161 ++num_flushes;
162 });
163 SyncPoint::GetInstance()->SetCallBack(
164 "DBImpl::BGWorkCompaction", [&](void* /*arg*/) {
165 ASSERT_EQ(tid, std::this_thread::get_id());
166 ++num_compactions;
167 });
168 SyncPoint::GetInstance()->EnableProcessing();
170 ASSERT_OK(Put("key", "val"));
171 for (int i = 0; i < 4; ++i) {
172 ASSERT_OK(Put("key", "val"));
173 dbfull()->TEST_WaitForFlushMemTable();
174 }
175 dbfull()->TEST_WaitForCompact();
176 ASSERT_EQ(4, num_flushes);
177 ASSERT_EQ(1, num_compactions);
178 }
180 TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) {
181 Options options = CurrentOptions();
182 options.write_buffer_size = 100;
183 options.max_write_buffer_number = 4;
184 options.min_write_buffer_number_to_merge = 3;
185 Reopen(options);
187 SyncPoint::GetInstance()->LoadDependency(
188 {{"DBImpl::BGWorkFlush",
189 "DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"},
190 {"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2",
191 "FlushJob::WriteLevel0Table"}});
192 SyncPoint::GetInstance()->EnableProcessing();
194 ASSERT_OK(Put("key1", "value1"));
196 port::Thread t([&]() {
197 // The call wait for flush to finish, i.e. with flush_options.wait = true.
198 ASSERT_OK(Flush());
199 });
201 // Wait for flush start.
202 TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1");
203 // Insert a second memtable before the manual flush finish.
204 // At the end of the manual flush job, it will check if further flush
205 // is needed, but it will not trigger flush of the second memtable because
206 // min_write_buffer_number_to_merge is not reached.
207 ASSERT_OK(Put("key2", "value2"));
208 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
209 TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2");
211 // Manual flush should return, without waiting for flush indefinitely.
212 t.join();
213 }
215 TEST_F(DBFlushTest, ScheduleOnlyOneBgThread) {
216 Options options = CurrentOptions();
217 Reopen(options);
218 SyncPoint::GetInstance()->DisableProcessing();
219 SyncPoint::GetInstance()->ClearAllCallBacks();
220 int called = 0;
221 SyncPoint::GetInstance()->SetCallBack(
222 "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", [&](void* arg) {
223 ASSERT_NE(nullptr, arg);
224 auto unscheduled_flushes = *reinterpret_cast<int*>(arg);
225 ASSERT_EQ(0, unscheduled_flushes);
226 ++called;
227 });
228 SyncPoint::GetInstance()->EnableProcessing();
230 ASSERT_OK(Put("a", "foo"));
231 FlushOptions flush_opts;
232 ASSERT_OK(dbfull()->Flush(flush_opts));
233 ASSERT_EQ(1, called);
235 SyncPoint::GetInstance()->DisableProcessing();
236 SyncPoint::GetInstance()->ClearAllCallBacks();
237 }
239 TEST_P(DBFlushDirectIOTest, DirectIO) {
240 Options options;
241 options.create_if_missing = true;
242 options.disable_auto_compactions = true;
243 options.max_background_flushes = 2;
244 options.use_direct_io_for_flush_and_compaction = GetParam();
245 options.env = new MockEnv(Env::Default());
246 SyncPoint::GetInstance()->SetCallBack(
247 "BuildTable:create_file", [&](void* arg) {
248 bool* use_direct_writes = static_cast<bool*>(arg);
249 ASSERT_EQ(*use_direct_writes,
250 options.use_direct_io_for_flush_and_compaction);
251 });
253 SyncPoint::GetInstance()->EnableProcessing();
254 Reopen(options);
255 ASSERT_OK(Put("foo", "v"));
256 FlushOptions flush_options;
257 flush_options.wait = true;
258 ASSERT_OK(dbfull()->Flush(flush_options));
259 Destroy(options);
260 delete options.env;
261 }
263 TEST_F(DBFlushTest, FlushError) {
264 Options options;
265 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
266 new FaultInjectionTestEnv(env_));
267 options.write_buffer_size = 100;
268 options.max_write_buffer_number = 4;
269 options.min_write_buffer_number_to_merge = 3;
270 options.disable_auto_compactions = true;
271 options.env = fault_injection_env.get();
272 Reopen(options);
274 ASSERT_OK(Put("key1", "value1"));
275 ASSERT_OK(Put("key2", "value2"));
276 fault_injection_env->SetFilesystemActive(false);
277 Status s = dbfull()->TEST_SwitchMemtable();
278 fault_injection_env->SetFilesystemActive(true);
279 Destroy(options);
280 ASSERT_NE(s, Status::OK());
281 }
283 TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) {
284 // Regression test for bug where manual flush hangs forever when the DB
285 // is in read-only mode. Verify it now at least returns, despite failing.
286 Options options;
287 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
288 new FaultInjectionTestEnv(env_));
289 options.env = fault_injection_env.get();
290 options.max_write_buffer_number = 2;
291 Reopen(options);
293 // Trigger a first flush but don't let it run
294 ASSERT_OK(db_->PauseBackgroundWork());
295 ASSERT_OK(Put("key1", "value1"));
296 FlushOptions flush_opts;
297 flush_opts.wait = false;
298 ASSERT_OK(db_->Flush(flush_opts));
300 // Write a key to the second memtable so we have something to flush later
301 // after the DB is in read-only mode.
302 ASSERT_OK(Put("key2", "value2"));
304 // Let the first flush continue, hit an error, and put the DB in read-only
305 // mode.
306 fault_injection_env->SetFilesystemActive(false);
307 ASSERT_OK(db_->ContinueBackgroundWork());
308 dbfull()->TEST_WaitForFlushMemTable();
309 #ifndef ROCKSDB_LITE
310 uint64_t num_bg_errors;
311 ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kBackgroundErrors,
312 &num_bg_errors));
313 ASSERT_GT(num_bg_errors, 0);
314 #endif // ROCKSDB_LITE
316 // In the bug scenario, triggering another flush would cause the second flush
317 // to hang forever. After the fix we expect it to return an error.
318 ASSERT_NOK(db_->Flush(FlushOptions()));
320 Close();
321 }
323 TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) {
324 Options options = CurrentOptions();
325 options.create_if_missing = true;
326 CreateAndReopenWithCF({"pikachu"}, options);
327 SyncPoint::GetInstance()->DisableProcessing();
328 SyncPoint::GetInstance()->LoadDependency(
329 {{"DBImpl::FlushMemTable:AfterScheduleFlush",
330 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
331 {"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
332 "DBImpl::BackgroundCallFlush:start"},
333 {"DBImpl::BackgroundCallFlush:start",
334 "DBImpl::FlushMemTable:BeforeWaitForBgFlush"}});
335 SyncPoint::GetInstance()->EnableProcessing();
336 ASSERT_EQ(2, handles_.size());
337 ASSERT_OK(Put(1, "key", "value"));
338 auto* cfd = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
339 port::Thread drop_cf_thr([&]() {
341 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
342 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
343 ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
344 handles_.resize(1);
346 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
347 });
348 FlushOptions flush_opts;
349 flush_opts.allow_write_stall = true;
350 ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd, flush_opts));
351 drop_cf_thr.join();
352 Close();
353 SyncPoint::GetInstance()->DisableProcessing();
354 }
356 #ifndef ROCKSDB_LITE
357 TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
358 class TestListener : public EventListener {
359 public:
360 void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
361 // There's only one key in each flush.
362 ASSERT_EQ(info.smallest_seqno, info.largest_seqno);
363 ASSERT_NE(0, info.smallest_seqno);
364 if (info.smallest_seqno == seq1) {
365 // First flush completed
366 ASSERT_FALSE(completed1);
367 completed1 = true;
368 CheckFlushResultCommitted(db, seq1);
369 } else {
370 // Second flush completed
371 ASSERT_FALSE(completed2);
372 completed2 = true;
373 ASSERT_EQ(info.smallest_seqno, seq2);
374 CheckFlushResultCommitted(db, seq2);
375 }
376 }
378 void CheckFlushResultCommitted(DB* db, SequenceNumber seq) {
379 DBImpl* db_impl = static_cast_with_check<DBImpl>(db);
380 InstrumentedMutex* mutex = db_impl->mutex();
381 mutex->Lock();
382 auto* cfd =
383 reinterpret_cast<ColumnFamilyHandleImpl*>(db->DefaultColumnFamily())
384 ->cfd();
385 ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber());
386 mutex->Unlock();
387 }
389 std::atomic<SequenceNumber> seq1{0};
390 std::atomic<SequenceNumber> seq2{0};
391 std::atomic<bool> completed1{false};
392 std::atomic<bool> completed2{false};
393 };
394 std::shared_ptr<TestListener> listener = std::make_shared<TestListener>();
396 SyncPoint::GetInstance()->LoadDependency(
397 {{"DBImpl::BackgroundCallFlush:start",
398 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},
399 {"DBImpl::FlushMemTableToOutputFile:Finish",
400 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});
401 SyncPoint::GetInstance()->SetCallBack(
402 "FlushJob::WriteLevel0Table", [&listener](void* arg) {
403 // Wait for the second flush finished, out of mutex.
404 auto* mems = reinterpret_cast<autovector<MemTable*>*>(arg);
405 if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) {
407 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:"
408 "WaitSecond");
409 }
410 });
412 Options options = CurrentOptions();
413 options.create_if_missing = true;
414 options.listeners.push_back(listener);
415 // Setting max_flush_jobs = max_background_jobs / 4 = 2.
416 options.max_background_jobs = 8;
417 // Allow 2 immutable memtables.
418 options.max_write_buffer_number = 3;
419 Reopen(options);
420 SyncPoint::GetInstance()->EnableProcessing();
421 ASSERT_OK(Put("foo", "v"));
422 listener->seq1 = db_->GetLatestSequenceNumber();
423 // t1 will wait for the second flush complete before committing flush result.
424 auto t1 = port::Thread([&]() {
425 // flush_opts.wait = true
426 ASSERT_OK(db_->Flush(FlushOptions()));
427 });
428 // Wait for first flush started.
430 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst");
431 // The second flush will exit early without commit its result. The work
432 // is delegated to the first flush.
433 ASSERT_OK(Put("bar", "v"));
434 listener->seq2 = db_->GetLatestSequenceNumber();
435 FlushOptions flush_opts;
436 flush_opts.wait = false;
437 ASSERT_OK(db_->Flush(flush_opts));
438 t1.join();
439 ASSERT_TRUE(listener->completed1);
440 ASSERT_TRUE(listener->completed2);
441 SyncPoint::GetInstance()->DisableProcessing();
442 SyncPoint::GetInstance()->ClearAllCallBacks();
443 }
444 #endif // !ROCKSDB_LITE
446 TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
447 Options options = CurrentOptions();
448 options.create_if_missing = true;
449 options.atomic_flush = GetParam();
450 options.write_buffer_size = (static_cast<size_t>(64) << 20);
452 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
453 size_t num_cfs = handles_.size();
454 ASSERT_EQ(3, num_cfs);
455 WriteOptions wopts;
456 wopts.disableWAL = true;
457 for (size_t i = 0; i != num_cfs; ++i) {
458 ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
459 }
460 std::vector<int> cf_ids;
461 for (size_t i = 0; i != num_cfs; ++i) {
462 cf_ids.emplace_back(static_cast<int>(i));
463 }
464 ASSERT_OK(Flush(cf_ids));
465 for (size_t i = 0; i != num_cfs; ++i) {
466 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
467 ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
468 ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
469 }
470 }
472 TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) {
473 Options options = CurrentOptions();
474 options.create_if_missing = true;
475 options.atomic_flush = GetParam();
476 // 4KB so that we can easily trigger auto flush.
477 options.write_buffer_size = 4096;
479 SyncPoint::GetInstance()->LoadDependency(
480 {{"DBImpl::BackgroundCallFlush:FlushFinish:0",
481 "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"}});
482 SyncPoint::GetInstance()->EnableProcessing();
484 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
485 size_t num_cfs = handles_.size();
486 ASSERT_EQ(3, num_cfs);
487 WriteOptions wopts;
488 wopts.disableWAL = true;
489 for (size_t i = 0; i != num_cfs; ++i) {
490 ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
491 }
492 // Keep writing to one of them column families to trigger auto flush.
493 for (int i = 0; i != 4000; ++i) {
494 ASSERT_OK(Put(static_cast<int>(num_cfs) - 1 /*cf*/,
495 "key" + std::to_string(i), "value" + std::to_string(i),
496 wopts));
497 }
500 "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck");
501 if (options.atomic_flush) {
502 for (size_t i = 0; i != num_cfs - 1; ++i) {
503 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
504 ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
505 ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
506 }
507 } else {
508 for (size_t i = 0; i != num_cfs - 1; ++i) {
509 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
510 ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
511 ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
512 }
513 }
514 SyncPoint::GetInstance()->DisableProcessing();
515 }
517 TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) {
518 bool atomic_flush = GetParam();
519 if (!atomic_flush) {
520 return;
521 }
522 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
523 new FaultInjectionTestEnv(env_));
524 Options options = CurrentOptions();
525 options.create_if_missing = true;
526 options.atomic_flush = atomic_flush;
527 options.env = fault_injection_env.get();
528 SyncPoint::GetInstance()->DisableProcessing();
529 SyncPoint::GetInstance()->LoadDependency(
530 {{"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1",
531 "DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1"},
532 {"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2",
533 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"}});
534 SyncPoint::GetInstance()->EnableProcessing();
536 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
537 size_t num_cfs = handles_.size();
538 ASSERT_EQ(3, num_cfs);
539 WriteOptions wopts;
540 wopts.disableWAL = true;
541 for (size_t i = 0; i != num_cfs; ++i) {
542 int cf_id = static_cast<int>(i);
543 ASSERT_OK(Put(cf_id, "key", "value", wopts));
544 }
545 FlushOptions flush_opts;
546 flush_opts.wait = false;
547 ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
548 TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1");
549 fault_injection_env->SetFilesystemActive(false);
550 TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2");
551 for (auto* cfh : handles_) {
552 dbfull()->TEST_WaitForFlushMemTable(cfh);
553 }
554 for (size_t i = 0; i != num_cfs; ++i) {
555 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
556 ASSERT_EQ(1, cfh->cfd()->imm()->NumNotFlushed());
557 ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
558 }
559 fault_injection_env->SetFilesystemActive(true);
560 Destroy(options);
561 }
563 TEST_P(DBAtomicFlushTest, FlushMultipleCFs_DropSomeBeforeRequestFlush) {
564 bool atomic_flush = GetParam();
565 if (!atomic_flush) {
566 return;
567 }
568 Options options = CurrentOptions();
569 options.create_if_missing = true;
570 options.atomic_flush = atomic_flush;
571 SyncPoint::GetInstance()->DisableProcessing();
572 SyncPoint::GetInstance()->ClearAllCallBacks();
573 SyncPoint::GetInstance()->EnableProcessing();
575 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
576 size_t num_cfs = handles_.size();
577 ASSERT_EQ(3, num_cfs);
578 WriteOptions wopts;
579 wopts.disableWAL = true;
580 std::vector<int> cf_ids;
581 for (size_t i = 0; i != num_cfs; ++i) {
582 int cf_id = static_cast<int>(i);
583 ASSERT_OK(Put(cf_id, "key", "value", wopts));
584 cf_ids.push_back(cf_id);
585 }
586 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
587 ASSERT_TRUE(Flush(cf_ids).IsColumnFamilyDropped());
588 Destroy(options);
589 }
591 TEST_P(DBAtomicFlushTest,
592 FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun) {
593 bool atomic_flush = GetParam();
594 if (!atomic_flush) {
595 return;
596 }
597 Options options = CurrentOptions();
598 options.create_if_missing = true;
599 options.atomic_flush = atomic_flush;
601 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
603 SyncPoint::GetInstance()->DisableProcessing();
604 SyncPoint::GetInstance()->ClearAllCallBacks();
605 SyncPoint::GetInstance()->LoadDependency(
606 {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
607 "DBAtomicFlushTest::BeforeDropCF"},
608 {"DBAtomicFlushTest::AfterDropCF",
609 "DBImpl::BackgroundCallFlush:start"}});
610 SyncPoint::GetInstance()->EnableProcessing();
612 size_t num_cfs = handles_.size();
613 ASSERT_EQ(3, num_cfs);
614 WriteOptions wopts;
615 wopts.disableWAL = true;
616 for (size_t i = 0; i != num_cfs; ++i) {
617 int cf_id = static_cast<int>(i);
618 ASSERT_OK(Put(cf_id, "key", "value", wopts));
619 }
620 port::Thread user_thread([&]() {
621 TEST_SYNC_POINT("DBAtomicFlushTest::BeforeDropCF");
622 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
623 TEST_SYNC_POINT("DBAtomicFlushTest::AfterDropCF");
624 });
625 FlushOptions flush_opts;
626 flush_opts.wait = true;
627 ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
628 user_thread.join();
629 for (size_t i = 0; i != num_cfs; ++i) {
630 int cf_id = static_cast<int>(i);
631 ASSERT_EQ("value", Get(cf_id, "key"));
632 }
634 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "eevee"}, options);
635 num_cfs = handles_.size();
636 ASSERT_EQ(2, num_cfs);
637 for (size_t i = 0; i != num_cfs; ++i) {
638 int cf_id = static_cast<int>(i);
639 ASSERT_EQ("value", Get(cf_id, "key"));
640 }
641 Destroy(options);
642 }
644 TEST_P(DBAtomicFlushTest, TriggerFlushAndClose) {
645 bool atomic_flush = GetParam();
646 if (!atomic_flush) {
647 return;
648 }
649 const int kNumKeysTriggerFlush = 4;
650 Options options = CurrentOptions();
651 options.create_if_missing = true;
652 options.atomic_flush = atomic_flush;
653 options.memtable_factory.reset(
654 new SpecialSkipListFactory(kNumKeysTriggerFlush));
655 CreateAndReopenWithCF({"pikachu"}, options);
657 for (int i = 0; i != kNumKeysTriggerFlush; ++i) {
658 ASSERT_OK(Put(0, "key" + std::to_string(i), "value" + std::to_string(i)));
659 }
660 SyncPoint::GetInstance()->DisableProcessing();
661 SyncPoint::GetInstance()->ClearAllCallBacks();
662 SyncPoint::GetInstance()->EnableProcessing();
663 ASSERT_OK(Put(0, "key", "value"));
664 Close();
666 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
667 ASSERT_EQ("value", Get(0, "key"));
668 }
670 TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) {
671 bool atomic_flush = GetParam();
672 Options options = CurrentOptions();
673 options.create_if_missing = true;
674 options.atomic_flush = atomic_flush;
675 options.max_write_buffer_number = 4;
676 // Set min_write_buffer_number_to_merge to be greater than 1, so that
677 // a column family with one memtable in the imm will not cause IsFlushPending
678 // to return true when flush_requested_ is false.
679 options.min_write_buffer_number_to_merge = 2;
680 CreateAndReopenWithCF({"pikachu"}, options);
681 ASSERT_EQ(2, handles_.size());
682 ASSERT_OK(dbfull()->PauseBackgroundWork());
683 ASSERT_OK(Put(0, "key00", "value00"));
684 ASSERT_OK(Put(1, "key10", "value10"));
685 FlushOptions flush_opts;
686 flush_opts.wait = false;
687 ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
688 ASSERT_OK(Put(0, "key01", "value01"));
689 // Since max_write_buffer_number is 4, the following flush won't cause write
690 // stall.
691 ASSERT_OK(dbfull()->Flush(flush_opts));
692 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
693 ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
694 handles_[1] = nullptr;
695 ASSERT_OK(dbfull()->ContinueBackgroundWork());
696 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
697 delete handles_[0];
698 handles_.clear();
699 }
701 TEST_P(DBAtomicFlushTest, CFDropRaceWithWaitForFlushMemTables) {
702 bool atomic_flush = GetParam();
703 if (!atomic_flush) {
704 return;
705 }
706 Options options = CurrentOptions();
707 options.create_if_missing = true;
708 options.atomic_flush = atomic_flush;
709 CreateAndReopenWithCF({"pikachu"}, options);
710 SyncPoint::GetInstance()->DisableProcessing();
711 SyncPoint::GetInstance()->LoadDependency(
712 {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
713 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
714 {"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
715 "DBImpl::BackgroundCallFlush:start"},
716 {"DBImpl::BackgroundCallFlush:start",
717 "DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}});
718 SyncPoint::GetInstance()->EnableProcessing();
719 ASSERT_EQ(2, handles_.size());
720 ASSERT_OK(Put(0, "key", "value"));
721 ASSERT_OK(Put(1, "key", "value"));
722 auto* cfd_default =
723 static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily())
724 ->cfd();
725 auto* cfd_pikachu = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
726 port::Thread drop_cf_thr([&]() {
728 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
729 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
730 delete handles_[1];
731 handles_.resize(1);
733 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
734 });
735 FlushOptions flush_opts;
736 flush_opts.allow_write_stall = true;
737 ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default, cfd_pikachu},
738 flush_opts));
739 drop_cf_thr.join();
740 Close();
741 SyncPoint::GetInstance()->DisableProcessing();
742 }
744 TEST_P(DBAtomicFlushTest, RollbackAfterFailToInstallResults) {
745 bool atomic_flush = GetParam();
746 if (!atomic_flush) {
747 return;
748 }
749 auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
750 Options options = CurrentOptions();
751 options.env = fault_injection_env.get();
752 options.create_if_missing = true;
753 options.atomic_flush = atomic_flush;
754 CreateAndReopenWithCF({"pikachu"}, options);
755 ASSERT_EQ(2, handles_.size());
756 for (size_t cf = 0; cf < handles_.size(); ++cf) {
757 ASSERT_OK(Put(static_cast<int>(cf), "a", "value"));
758 }
759 SyncPoint::GetInstance()->DisableProcessing();
760 SyncPoint::GetInstance()->ClearAllCallBacks();
761 SyncPoint::GetInstance()->SetCallBack(
762 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
763 [&](void* /*arg*/) { fault_injection_env->SetFilesystemActive(false); });
764 SyncPoint::GetInstance()->EnableProcessing();
765 FlushOptions flush_opts;
766 Status s = db_->Flush(flush_opts, handles_);
767 ASSERT_NOK(s);
768 fault_injection_env->SetFilesystemActive(true);
769 Close();
770 SyncPoint::GetInstance()->ClearAllCallBacks();
771 }
773 INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
774 testing::Bool());
776 INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest, DBAtomicFlushTest, testing::Bool());
778 } // namespace ROCKSDB_NAMESPACE
780 int main(int argc, char** argv) {
781 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
782 ::testing::InitGoogleTest(&argc, argv);
783 return RUN_ALL_TESTS();
784 }