1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
12 #include "db/db_impl/db_impl.h"
13 #include "db/db_test_util.h"
14 #include "port/port.h"
15 #include "port/stack_trace.h"
16 #include "test_util/fault_injection_test_env.h"
17 #include "test_util/sync_point.h"
18 #include "util/cast_util.h"
19 #include "util/mutexlock.h"
21 namespace ROCKSDB_NAMESPACE
{
23 class DBFlushTest
: public DBTestBase
{
25 DBFlushTest() : DBTestBase("/db_flush_test") {}
28 class DBFlushDirectIOTest
: public DBFlushTest
,
29 public ::testing::WithParamInterface
<bool> {
31 DBFlushDirectIOTest() : DBFlushTest() {}
34 class DBAtomicFlushTest
: public DBFlushTest
,
35 public ::testing::WithParamInterface
<bool> {
37 DBAtomicFlushTest() : DBFlushTest() {}
40 // We had issue when two background threads trying to flush at the same time,
41 // only one of them get committed. The test verifies the issue is fixed.
42 TEST_F(DBFlushTest
, FlushWhileWritingManifest
) {
44 options
.disable_auto_compactions
= true;
45 options
.max_background_flushes
= 2;
50 no_wait
.allow_write_stall
=true;
52 SyncPoint::GetInstance()->LoadDependency(
53 {{"VersionSet::LogAndApply:WriteManifest",
54 "DBFlushTest::FlushWhileWritingManifest:1"},
55 {"MemTableList::TryInstallMemtableFlushResults:InProgress",
56 "VersionSet::LogAndApply:WriteManifestDone"}});
57 SyncPoint::GetInstance()->EnableProcessing();
59 ASSERT_OK(Put("foo", "v"));
60 ASSERT_OK(dbfull()->Flush(no_wait
));
61 TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
62 ASSERT_OK(Put("bar", "v"));
63 ASSERT_OK(dbfull()->Flush(no_wait
));
64 // If the issue is hit we will wait here forever.
65 dbfull()->TEST_WaitForFlushMemTable();
67 ASSERT_EQ(2, TotalTableFiles());
68 #endif // ROCKSDB_LITE
71 // Disable this test temporarily on Travis as it fails intermittently.
72 // Github issue: #4151
73 TEST_F(DBFlushTest
, SyncFail
) {
74 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env(
75 new FaultInjectionTestEnv(env_
));
77 options
.disable_auto_compactions
= true;
78 options
.env
= fault_injection_env
.get();
80 SyncPoint::GetInstance()->LoadDependency(
81 {{"DBFlushTest::SyncFail:GetVersionRefCount:1",
82 "DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"},
83 {"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables",
84 "DBFlushTest::SyncFail:GetVersionRefCount:2"},
85 {"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
86 {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
87 SyncPoint::GetInstance()->EnableProcessing();
89 CreateAndReopenWithCF({"pikachu"}, options
);
92 reinterpret_cast<ColumnFamilyHandleImpl
*>(db_
->DefaultColumnFamily())
94 FlushOptions flush_options
;
95 flush_options
.wait
= false;
96 ASSERT_OK(dbfull()->Flush(flush_options
));
97 // Flush installs a new super-version. Get the ref count after that.
98 auto current_before
= cfd
->current();
99 int refs_before
= cfd
->current()->TEST_refs();
100 TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:1");
101 TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:2");
102 int refs_after_picking_memtables
= cfd
->current()->TEST_refs();
103 ASSERT_EQ(refs_before
+ 1, refs_after_picking_memtables
);
104 fault_injection_env
->SetFilesystemActive(false);
105 TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
106 TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
107 fault_injection_env
->SetFilesystemActive(true);
108 // Now the background job will do the flush; wait for it.
109 dbfull()->TEST_WaitForFlushMemTable();
111 ASSERT_EQ("", FilesPerLevel()); // flush failed.
112 #endif // ROCKSDB_LITE
113 // Backgroun flush job should release ref count to current version.
114 ASSERT_EQ(current_before
, cfd
->current());
115 ASSERT_EQ(refs_before
, cfd
->current()->TEST_refs());
119 TEST_F(DBFlushTest
, SyncSkip
) {
120 Options options
= CurrentOptions();
122 SyncPoint::GetInstance()->LoadDependency(
123 {{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedLogs:Skip"},
124 {"DBImpl::SyncClosedLogs:Skip", "DBFlushTest::SyncSkip:2"}});
125 SyncPoint::GetInstance()->EnableProcessing();
130 FlushOptions flush_options
;
131 flush_options
.wait
= false;
132 ASSERT_OK(dbfull()->Flush(flush_options
));
134 TEST_SYNC_POINT("DBFlushTest::SyncSkip:1");
135 TEST_SYNC_POINT("DBFlushTest::SyncSkip:2");
137 // Now the background job will do the flush; wait for it.
138 dbfull()->TEST_WaitForFlushMemTable();
143 TEST_F(DBFlushTest
, FlushInLowPriThreadPool
) {
144 // Verify setting an empty high-pri (flush) thread pool causes flushes to be
145 // scheduled in the low-pri (compaction) thread pool.
146 Options options
= CurrentOptions();
147 options
.level0_file_num_compaction_trigger
= 4;
148 options
.memtable_factory
.reset(new SpecialSkipListFactory(1));
150 env_
->SetBackgroundThreads(0, Env::HIGH
);
153 int num_flushes
= 0, num_compactions
= 0;
154 SyncPoint::GetInstance()->SetCallBack(
155 "DBImpl::BGWorkFlush", [&](void* /*arg*/) {
156 if (tid
== std::thread::id()) {
157 tid
= std::this_thread::get_id();
159 ASSERT_EQ(tid
, std::this_thread::get_id());
163 SyncPoint::GetInstance()->SetCallBack(
164 "DBImpl::BGWorkCompaction", [&](void* /*arg*/) {
165 ASSERT_EQ(tid
, std::this_thread::get_id());
168 SyncPoint::GetInstance()->EnableProcessing();
170 ASSERT_OK(Put("key", "val"));
171 for (int i
= 0; i
< 4; ++i
) {
172 ASSERT_OK(Put("key", "val"));
173 dbfull()->TEST_WaitForFlushMemTable();
175 dbfull()->TEST_WaitForCompact();
176 ASSERT_EQ(4, num_flushes
);
177 ASSERT_EQ(1, num_compactions
);
180 TEST_F(DBFlushTest
, ManualFlushWithMinWriteBufferNumberToMerge
) {
181 Options options
= CurrentOptions();
182 options
.write_buffer_size
= 100;
183 options
.max_write_buffer_number
= 4;
184 options
.min_write_buffer_number_to_merge
= 3;
187 SyncPoint::GetInstance()->LoadDependency(
188 {{"DBImpl::BGWorkFlush",
189 "DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"},
190 {"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2",
191 "FlushJob::WriteLevel0Table"}});
192 SyncPoint::GetInstance()->EnableProcessing();
194 ASSERT_OK(Put("key1", "value1"));
196 port::Thread
t([&]() {
197 // The call wait for flush to finish, i.e. with flush_options.wait = true.
201 // Wait for flush start.
202 TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1");
203 // Insert a second memtable before the manual flush finish.
204 // At the end of the manual flush job, it will check if further flush
205 // is needed, but it will not trigger flush of the second memtable because
206 // min_write_buffer_number_to_merge is not reached.
207 ASSERT_OK(Put("key2", "value2"));
208 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
209 TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2");
211 // Manual flush should return, without waiting for flush indefinitely.
215 TEST_F(DBFlushTest
, ScheduleOnlyOneBgThread
) {
216 Options options
= CurrentOptions();
218 SyncPoint::GetInstance()->DisableProcessing();
219 SyncPoint::GetInstance()->ClearAllCallBacks();
221 SyncPoint::GetInstance()->SetCallBack(
222 "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", [&](void* arg
) {
223 ASSERT_NE(nullptr, arg
);
224 auto unscheduled_flushes
= *reinterpret_cast<int*>(arg
);
225 ASSERT_EQ(0, unscheduled_flushes
);
228 SyncPoint::GetInstance()->EnableProcessing();
230 ASSERT_OK(Put("a", "foo"));
231 FlushOptions flush_opts
;
232 ASSERT_OK(dbfull()->Flush(flush_opts
));
233 ASSERT_EQ(1, called
);
235 SyncPoint::GetInstance()->DisableProcessing();
236 SyncPoint::GetInstance()->ClearAllCallBacks();
239 TEST_P(DBFlushDirectIOTest
, DirectIO
) {
241 options
.create_if_missing
= true;
242 options
.disable_auto_compactions
= true;
243 options
.max_background_flushes
= 2;
244 options
.use_direct_io_for_flush_and_compaction
= GetParam();
245 options
.env
= new MockEnv(Env::Default());
246 SyncPoint::GetInstance()->SetCallBack(
247 "BuildTable:create_file", [&](void* arg
) {
248 bool* use_direct_writes
= static_cast<bool*>(arg
);
249 ASSERT_EQ(*use_direct_writes
,
250 options
.use_direct_io_for_flush_and_compaction
);
253 SyncPoint::GetInstance()->EnableProcessing();
255 ASSERT_OK(Put("foo", "v"));
256 FlushOptions flush_options
;
257 flush_options
.wait
= true;
258 ASSERT_OK(dbfull()->Flush(flush_options
));
263 TEST_F(DBFlushTest
, FlushError
) {
265 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env(
266 new FaultInjectionTestEnv(env_
));
267 options
.write_buffer_size
= 100;
268 options
.max_write_buffer_number
= 4;
269 options
.min_write_buffer_number_to_merge
= 3;
270 options
.disable_auto_compactions
= true;
271 options
.env
= fault_injection_env
.get();
274 ASSERT_OK(Put("key1", "value1"));
275 ASSERT_OK(Put("key2", "value2"));
276 fault_injection_env
->SetFilesystemActive(false);
277 Status s
= dbfull()->TEST_SwitchMemtable();
278 fault_injection_env
->SetFilesystemActive(true);
280 ASSERT_NE(s
, Status::OK());
283 TEST_F(DBFlushTest
, ManualFlushFailsInReadOnlyMode
) {
284 // Regression test for bug where manual flush hangs forever when the DB
285 // is in read-only mode. Verify it now at least returns, despite failing.
287 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env(
288 new FaultInjectionTestEnv(env_
));
289 options
.env
= fault_injection_env
.get();
290 options
.max_write_buffer_number
= 2;
293 // Trigger a first flush but don't let it run
294 ASSERT_OK(db_
->PauseBackgroundWork());
295 ASSERT_OK(Put("key1", "value1"));
296 FlushOptions flush_opts
;
297 flush_opts
.wait
= false;
298 ASSERT_OK(db_
->Flush(flush_opts
));
300 // Write a key to the second memtable so we have something to flush later
301 // after the DB is in read-only mode.
302 ASSERT_OK(Put("key2", "value2"));
304 // Let the first flush continue, hit an error, and put the DB in read-only
306 fault_injection_env
->SetFilesystemActive(false);
307 ASSERT_OK(db_
->ContinueBackgroundWork());
308 dbfull()->TEST_WaitForFlushMemTable();
310 uint64_t num_bg_errors
;
311 ASSERT_TRUE(db_
->GetIntProperty(DB::Properties::kBackgroundErrors
,
313 ASSERT_GT(num_bg_errors
, 0);
314 #endif // ROCKSDB_LITE
316 // In the bug scenario, triggering another flush would cause the second flush
317 // to hang forever. After the fix we expect it to return an error.
318 ASSERT_NOK(db_
->Flush(FlushOptions()));
323 TEST_F(DBFlushTest
, CFDropRaceWithWaitForFlushMemTables
) {
324 Options options
= CurrentOptions();
325 options
.create_if_missing
= true;
326 CreateAndReopenWithCF({"pikachu"}, options
);
327 SyncPoint::GetInstance()->DisableProcessing();
328 SyncPoint::GetInstance()->LoadDependency(
329 {{"DBImpl::FlushMemTable:AfterScheduleFlush",
330 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
331 {"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
332 "DBImpl::BackgroundCallFlush:start"},
333 {"DBImpl::BackgroundCallFlush:start",
334 "DBImpl::FlushMemTable:BeforeWaitForBgFlush"}});
335 SyncPoint::GetInstance()->EnableProcessing();
336 ASSERT_EQ(2, handles_
.size());
337 ASSERT_OK(Put(1, "key", "value"));
338 auto* cfd
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[1])->cfd();
339 port::Thread
drop_cf_thr([&]() {
341 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
342 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
343 ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_
[1]));
346 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
348 FlushOptions flush_opts
;
349 flush_opts
.allow_write_stall
= true;
350 ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd
, flush_opts
));
353 SyncPoint::GetInstance()->DisableProcessing();
357 TEST_F(DBFlushTest
, FireOnFlushCompletedAfterCommittedResult
) {
358 class TestListener
: public EventListener
{
360 void OnFlushCompleted(DB
* db
, const FlushJobInfo
& info
) override
{
361 // There's only one key in each flush.
362 ASSERT_EQ(info
.smallest_seqno
, info
.largest_seqno
);
363 ASSERT_NE(0, info
.smallest_seqno
);
364 if (info
.smallest_seqno
== seq1
) {
365 // First flush completed
366 ASSERT_FALSE(completed1
);
368 CheckFlushResultCommitted(db
, seq1
);
370 // Second flush completed
371 ASSERT_FALSE(completed2
);
373 ASSERT_EQ(info
.smallest_seqno
, seq2
);
374 CheckFlushResultCommitted(db
, seq2
);
378 void CheckFlushResultCommitted(DB
* db
, SequenceNumber seq
) {
379 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
);
380 InstrumentedMutex
* mutex
= db_impl
->mutex();
383 reinterpret_cast<ColumnFamilyHandleImpl
*>(db
->DefaultColumnFamily())
385 ASSERT_LT(seq
, cfd
->imm()->current()->GetEarliestSequenceNumber());
389 std::atomic
<SequenceNumber
> seq1
{0};
390 std::atomic
<SequenceNumber
> seq2
{0};
391 std::atomic
<bool> completed1
{false};
392 std::atomic
<bool> completed2
{false};
394 std::shared_ptr
<TestListener
> listener
= std::make_shared
<TestListener
>();
396 SyncPoint::GetInstance()->LoadDependency(
397 {{"DBImpl::BackgroundCallFlush:start",
398 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},
399 {"DBImpl::FlushMemTableToOutputFile:Finish",
400 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});
401 SyncPoint::GetInstance()->SetCallBack(
402 "FlushJob::WriteLevel0Table", [&listener
](void* arg
) {
403 // Wait for the second flush finished, out of mutex.
404 auto* mems
= reinterpret_cast<autovector
<MemTable
*>*>(arg
);
405 if (mems
->front()->GetEarliestSequenceNumber() == listener
->seq1
- 1) {
407 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:"
412 Options options
= CurrentOptions();
413 options
.create_if_missing
= true;
414 options
.listeners
.push_back(listener
);
415 // Setting max_flush_jobs = max_background_jobs / 4 = 2.
416 options
.max_background_jobs
= 8;
417 // Allow 2 immutable memtables.
418 options
.max_write_buffer_number
= 3;
420 SyncPoint::GetInstance()->EnableProcessing();
421 ASSERT_OK(Put("foo", "v"));
422 listener
->seq1
= db_
->GetLatestSequenceNumber();
423 // t1 will wait for the second flush complete before committing flush result.
424 auto t1
= port::Thread([&]() {
425 // flush_opts.wait = true
426 ASSERT_OK(db_
->Flush(FlushOptions()));
428 // Wait for first flush started.
430 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst");
431 // The second flush will exit early without commit its result. The work
432 // is delegated to the first flush.
433 ASSERT_OK(Put("bar", "v"));
434 listener
->seq2
= db_
->GetLatestSequenceNumber();
435 FlushOptions flush_opts
;
436 flush_opts
.wait
= false;
437 ASSERT_OK(db_
->Flush(flush_opts
));
439 ASSERT_TRUE(listener
->completed1
);
440 ASSERT_TRUE(listener
->completed2
);
441 SyncPoint::GetInstance()->DisableProcessing();
442 SyncPoint::GetInstance()->ClearAllCallBacks();
444 #endif // !ROCKSDB_LITE
446 TEST_P(DBAtomicFlushTest
, ManualAtomicFlush
) {
447 Options options
= CurrentOptions();
448 options
.create_if_missing
= true;
449 options
.atomic_flush
= GetParam();
450 options
.write_buffer_size
= (static_cast<size_t>(64) << 20);
452 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
453 size_t num_cfs
= handles_
.size();
454 ASSERT_EQ(3, num_cfs
);
456 wopts
.disableWAL
= true;
457 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
458 ASSERT_OK(Put(static_cast<int>(i
) /*cf*/, "key", "value", wopts
));
460 std::vector
<int> cf_ids
;
461 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
462 cf_ids
.emplace_back(static_cast<int>(i
));
464 ASSERT_OK(Flush(cf_ids
));
465 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
466 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
467 ASSERT_EQ(0, cfh
->cfd()->imm()->NumNotFlushed());
468 ASSERT_TRUE(cfh
->cfd()->mem()->IsEmpty());
472 TEST_P(DBAtomicFlushTest
, AtomicFlushTriggeredByMemTableFull
) {
473 Options options
= CurrentOptions();
474 options
.create_if_missing
= true;
475 options
.atomic_flush
= GetParam();
476 // 4KB so that we can easily trigger auto flush.
477 options
.write_buffer_size
= 4096;
479 SyncPoint::GetInstance()->LoadDependency(
480 {{"DBImpl::BackgroundCallFlush:FlushFinish:0",
481 "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"}});
482 SyncPoint::GetInstance()->EnableProcessing();
484 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
485 size_t num_cfs
= handles_
.size();
486 ASSERT_EQ(3, num_cfs
);
488 wopts
.disableWAL
= true;
489 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
490 ASSERT_OK(Put(static_cast<int>(i
) /*cf*/, "key", "value", wopts
));
492 // Keep writing to one of them column families to trigger auto flush.
493 for (int i
= 0; i
!= 4000; ++i
) {
494 ASSERT_OK(Put(static_cast<int>(num_cfs
) - 1 /*cf*/,
495 "key" + std::to_string(i
), "value" + std::to_string(i
),
500 "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck");
501 if (options
.atomic_flush
) {
502 for (size_t i
= 0; i
!= num_cfs
- 1; ++i
) {
503 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
504 ASSERT_EQ(0, cfh
->cfd()->imm()->NumNotFlushed());
505 ASSERT_TRUE(cfh
->cfd()->mem()->IsEmpty());
508 for (size_t i
= 0; i
!= num_cfs
- 1; ++i
) {
509 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
510 ASSERT_EQ(0, cfh
->cfd()->imm()->NumNotFlushed());
511 ASSERT_FALSE(cfh
->cfd()->mem()->IsEmpty());
514 SyncPoint::GetInstance()->DisableProcessing();
517 TEST_P(DBAtomicFlushTest
, AtomicFlushRollbackSomeJobs
) {
518 bool atomic_flush
= GetParam();
522 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env(
523 new FaultInjectionTestEnv(env_
));
524 Options options
= CurrentOptions();
525 options
.create_if_missing
= true;
526 options
.atomic_flush
= atomic_flush
;
527 options
.env
= fault_injection_env
.get();
528 SyncPoint::GetInstance()->DisableProcessing();
529 SyncPoint::GetInstance()->LoadDependency(
530 {{"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1",
531 "DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1"},
532 {"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2",
533 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"}});
534 SyncPoint::GetInstance()->EnableProcessing();
536 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
537 size_t num_cfs
= handles_
.size();
538 ASSERT_EQ(3, num_cfs
);
540 wopts
.disableWAL
= true;
541 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
542 int cf_id
= static_cast<int>(i
);
543 ASSERT_OK(Put(cf_id
, "key", "value", wopts
));
545 FlushOptions flush_opts
;
546 flush_opts
.wait
= false;
547 ASSERT_OK(dbfull()->Flush(flush_opts
, handles_
));
548 TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1");
549 fault_injection_env
->SetFilesystemActive(false);
550 TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2");
551 for (auto* cfh
: handles_
) {
552 dbfull()->TEST_WaitForFlushMemTable(cfh
);
554 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
555 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
556 ASSERT_EQ(1, cfh
->cfd()->imm()->NumNotFlushed());
557 ASSERT_TRUE(cfh
->cfd()->mem()->IsEmpty());
559 fault_injection_env
->SetFilesystemActive(true);
563 TEST_P(DBAtomicFlushTest
, FlushMultipleCFs_DropSomeBeforeRequestFlush
) {
564 bool atomic_flush
= GetParam();
568 Options options
= CurrentOptions();
569 options
.create_if_missing
= true;
570 options
.atomic_flush
= atomic_flush
;
571 SyncPoint::GetInstance()->DisableProcessing();
572 SyncPoint::GetInstance()->ClearAllCallBacks();
573 SyncPoint::GetInstance()->EnableProcessing();
575 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
576 size_t num_cfs
= handles_
.size();
577 ASSERT_EQ(3, num_cfs
);
579 wopts
.disableWAL
= true;
580 std::vector
<int> cf_ids
;
581 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
582 int cf_id
= static_cast<int>(i
);
583 ASSERT_OK(Put(cf_id
, "key", "value", wopts
));
584 cf_ids
.push_back(cf_id
);
586 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
587 ASSERT_TRUE(Flush(cf_ids
).IsColumnFamilyDropped());
591 TEST_P(DBAtomicFlushTest
,
592 FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun
) {
593 bool atomic_flush
= GetParam();
597 Options options
= CurrentOptions();
598 options
.create_if_missing
= true;
599 options
.atomic_flush
= atomic_flush
;
601 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
603 SyncPoint::GetInstance()->DisableProcessing();
604 SyncPoint::GetInstance()->ClearAllCallBacks();
605 SyncPoint::GetInstance()->LoadDependency(
606 {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
607 "DBAtomicFlushTest::BeforeDropCF"},
608 {"DBAtomicFlushTest::AfterDropCF",
609 "DBImpl::BackgroundCallFlush:start"}});
610 SyncPoint::GetInstance()->EnableProcessing();
612 size_t num_cfs
= handles_
.size();
613 ASSERT_EQ(3, num_cfs
);
615 wopts
.disableWAL
= true;
616 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
617 int cf_id
= static_cast<int>(i
);
618 ASSERT_OK(Put(cf_id
, "key", "value", wopts
));
620 port::Thread
user_thread([&]() {
621 TEST_SYNC_POINT("DBAtomicFlushTest::BeforeDropCF");
622 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
623 TEST_SYNC_POINT("DBAtomicFlushTest::AfterDropCF");
625 FlushOptions flush_opts
;
626 flush_opts
.wait
= true;
627 ASSERT_OK(dbfull()->Flush(flush_opts
, handles_
));
629 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
630 int cf_id
= static_cast<int>(i
);
631 ASSERT_EQ("value", Get(cf_id
, "key"));
634 ReopenWithColumnFamilies({kDefaultColumnFamilyName
, "eevee"}, options
);
635 num_cfs
= handles_
.size();
636 ASSERT_EQ(2, num_cfs
);
637 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
638 int cf_id
= static_cast<int>(i
);
639 ASSERT_EQ("value", Get(cf_id
, "key"));
644 TEST_P(DBAtomicFlushTest
, TriggerFlushAndClose
) {
645 bool atomic_flush
= GetParam();
649 const int kNumKeysTriggerFlush
= 4;
650 Options options
= CurrentOptions();
651 options
.create_if_missing
= true;
652 options
.atomic_flush
= atomic_flush
;
653 options
.memtable_factory
.reset(
654 new SpecialSkipListFactory(kNumKeysTriggerFlush
));
655 CreateAndReopenWithCF({"pikachu"}, options
);
657 for (int i
= 0; i
!= kNumKeysTriggerFlush
; ++i
) {
658 ASSERT_OK(Put(0, "key" + std::to_string(i
), "value" + std::to_string(i
)));
660 SyncPoint::GetInstance()->DisableProcessing();
661 SyncPoint::GetInstance()->ClearAllCallBacks();
662 SyncPoint::GetInstance()->EnableProcessing();
663 ASSERT_OK(Put(0, "key", "value"));
666 ReopenWithColumnFamilies({kDefaultColumnFamilyName
, "pikachu"}, options
);
667 ASSERT_EQ("value", Get(0, "key"));
670 TEST_P(DBAtomicFlushTest
, PickMemtablesRaceWithBackgroundFlush
) {
671 bool atomic_flush
= GetParam();
672 Options options
= CurrentOptions();
673 options
.create_if_missing
= true;
674 options
.atomic_flush
= atomic_flush
;
675 options
.max_write_buffer_number
= 4;
676 // Set min_write_buffer_number_to_merge to be greater than 1, so that
677 // a column family with one memtable in the imm will not cause IsFlushPending
678 // to return true when flush_requested_ is false.
679 options
.min_write_buffer_number_to_merge
= 2;
680 CreateAndReopenWithCF({"pikachu"}, options
);
681 ASSERT_EQ(2, handles_
.size());
682 ASSERT_OK(dbfull()->PauseBackgroundWork());
683 ASSERT_OK(Put(0, "key00", "value00"));
684 ASSERT_OK(Put(1, "key10", "value10"));
685 FlushOptions flush_opts
;
686 flush_opts
.wait
= false;
687 ASSERT_OK(dbfull()->Flush(flush_opts
, handles_
));
688 ASSERT_OK(Put(0, "key01", "value01"));
689 // Since max_write_buffer_number is 4, the following flush won't cause write
691 ASSERT_OK(dbfull()->Flush(flush_opts
));
692 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
693 ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_
[1]));
694 handles_
[1] = nullptr;
695 ASSERT_OK(dbfull()->ContinueBackgroundWork());
696 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_
[0]));
701 TEST_P(DBAtomicFlushTest
, CFDropRaceWithWaitForFlushMemTables
) {
702 bool atomic_flush
= GetParam();
706 Options options
= CurrentOptions();
707 options
.create_if_missing
= true;
708 options
.atomic_flush
= atomic_flush
;
709 CreateAndReopenWithCF({"pikachu"}, options
);
710 SyncPoint::GetInstance()->DisableProcessing();
711 SyncPoint::GetInstance()->LoadDependency(
712 {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
713 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
714 {"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
715 "DBImpl::BackgroundCallFlush:start"},
716 {"DBImpl::BackgroundCallFlush:start",
717 "DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}});
718 SyncPoint::GetInstance()->EnableProcessing();
719 ASSERT_EQ(2, handles_
.size());
720 ASSERT_OK(Put(0, "key", "value"));
721 ASSERT_OK(Put(1, "key", "value"));
723 static_cast<ColumnFamilyHandleImpl
*>(dbfull()->DefaultColumnFamily())
725 auto* cfd_pikachu
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[1])->cfd();
726 port::Thread
drop_cf_thr([&]() {
728 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
729 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
733 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
735 FlushOptions flush_opts
;
736 flush_opts
.allow_write_stall
= true;
737 ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default
, cfd_pikachu
},
741 SyncPoint::GetInstance()->DisableProcessing();
744 TEST_P(DBAtomicFlushTest
, RollbackAfterFailToInstallResults
) {
745 bool atomic_flush
= GetParam();
749 auto fault_injection_env
= std::make_shared
<FaultInjectionTestEnv
>(env_
);
750 Options options
= CurrentOptions();
751 options
.env
= fault_injection_env
.get();
752 options
.create_if_missing
= true;
753 options
.atomic_flush
= atomic_flush
;
754 CreateAndReopenWithCF({"pikachu"}, options
);
755 ASSERT_EQ(2, handles_
.size());
756 for (size_t cf
= 0; cf
< handles_
.size(); ++cf
) {
757 ASSERT_OK(Put(static_cast<int>(cf
), "a", "value"));
759 SyncPoint::GetInstance()->DisableProcessing();
760 SyncPoint::GetInstance()->ClearAllCallBacks();
761 SyncPoint::GetInstance()->SetCallBack(
762 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
763 [&](void* /*arg*/) { fault_injection_env
->SetFilesystemActive(false); });
764 SyncPoint::GetInstance()->EnableProcessing();
765 FlushOptions flush_opts
;
766 Status s
= db_
->Flush(flush_opts
, handles_
);
768 fault_injection_env
->SetFilesystemActive(true);
770 SyncPoint::GetInstance()->ClearAllCallBacks();
773 INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest
, DBFlushDirectIOTest
,
776 INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest
, DBAtomicFlushTest
, testing::Bool());
778 } // namespace ROCKSDB_NAMESPACE
780 int main(int argc
, char** argv
) {
781 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
782 ::testing::InitGoogleTest(&argc
, argv
);
783 return RUN_ALL_TESTS();