1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
12 #include "db/db_impl/db_impl.h"
13 #include "db/db_test_util.h"
14 #include "file/filename.h"
15 #include "port/port.h"
16 #include "port/stack_trace.h"
17 #include "test_util/sync_point.h"
18 #include "util/cast_util.h"
19 #include "util/mutexlock.h"
20 #include "utilities/fault_injection_env.h"
22 namespace ROCKSDB_NAMESPACE
{
24 class DBFlushTest
: public DBTestBase
{
26 DBFlushTest() : DBTestBase("/db_flush_test", /*env_do_fsync=*/true) {}
29 class DBFlushDirectIOTest
: public DBFlushTest
,
30 public ::testing::WithParamInterface
<bool> {
32 DBFlushDirectIOTest() : DBFlushTest() {}
35 class DBAtomicFlushTest
: public DBFlushTest
,
36 public ::testing::WithParamInterface
<bool> {
38 DBAtomicFlushTest() : DBFlushTest() {}
41 // We had issue when two background threads trying to flush at the same time,
42 // only one of them get committed. The test verifies the issue is fixed.
43 TEST_F(DBFlushTest
, FlushWhileWritingManifest
) {
45 options
.disable_auto_compactions
= true;
46 options
.max_background_flushes
= 2;
51 no_wait
.allow_write_stall
=true;
53 SyncPoint::GetInstance()->LoadDependency(
54 {{"VersionSet::LogAndApply:WriteManifest",
55 "DBFlushTest::FlushWhileWritingManifest:1"},
56 {"MemTableList::TryInstallMemtableFlushResults:InProgress",
57 "VersionSet::LogAndApply:WriteManifestDone"}});
58 SyncPoint::GetInstance()->EnableProcessing();
60 ASSERT_OK(Put("foo", "v"));
61 ASSERT_OK(dbfull()->Flush(no_wait
));
62 TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
63 ASSERT_OK(Put("bar", "v"));
64 ASSERT_OK(dbfull()->Flush(no_wait
));
65 // If the issue is hit we will wait here forever.
66 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
68 ASSERT_EQ(2, TotalTableFiles());
69 #endif // ROCKSDB_LITE
72 // Disable this test temporarily on Travis as it fails intermittently.
73 // Github issue: #4151
74 TEST_F(DBFlushTest
, SyncFail
) {
75 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env(
76 new FaultInjectionTestEnv(env_
));
78 options
.disable_auto_compactions
= true;
79 options
.env
= fault_injection_env
.get();
81 SyncPoint::GetInstance()->LoadDependency(
82 {{"DBFlushTest::SyncFail:GetVersionRefCount:1",
83 "DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"},
84 {"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables",
85 "DBFlushTest::SyncFail:GetVersionRefCount:2"},
86 {"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
87 {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
88 SyncPoint::GetInstance()->EnableProcessing();
90 CreateAndReopenWithCF({"pikachu"}, options
);
91 ASSERT_OK(Put("key", "value"));
93 static_cast_with_check
<ColumnFamilyHandleImpl
>(db_
->DefaultColumnFamily())
95 FlushOptions flush_options
;
96 flush_options
.wait
= false;
97 ASSERT_OK(dbfull()->Flush(flush_options
));
98 // Flush installs a new super-version. Get the ref count after that.
99 auto current_before
= cfd
->current();
100 int refs_before
= cfd
->current()->TEST_refs();
101 TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:1");
102 TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:2");
103 int refs_after_picking_memtables
= cfd
->current()->TEST_refs();
104 ASSERT_EQ(refs_before
+ 1, refs_after_picking_memtables
);
105 fault_injection_env
->SetFilesystemActive(false);
106 TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
107 TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
108 fault_injection_env
->SetFilesystemActive(true);
109 // Now the background job will do the flush; wait for it.
110 // Returns the IO error happend during flush.
111 ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
113 ASSERT_EQ("", FilesPerLevel()); // flush failed.
114 #endif // ROCKSDB_LITE
115 // Backgroun flush job should release ref count to current version.
116 ASSERT_EQ(current_before
, cfd
->current());
117 ASSERT_EQ(refs_before
, cfd
->current()->TEST_refs());
121 TEST_F(DBFlushTest
, SyncSkip
) {
122 Options options
= CurrentOptions();
124 SyncPoint::GetInstance()->LoadDependency(
125 {{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedLogs:Skip"},
126 {"DBImpl::SyncClosedLogs:Skip", "DBFlushTest::SyncSkip:2"}});
127 SyncPoint::GetInstance()->EnableProcessing();
130 ASSERT_OK(Put("key", "value"));
132 FlushOptions flush_options
;
133 flush_options
.wait
= false;
134 ASSERT_OK(dbfull()->Flush(flush_options
));
136 TEST_SYNC_POINT("DBFlushTest::SyncSkip:1");
137 TEST_SYNC_POINT("DBFlushTest::SyncSkip:2");
139 // Now the background job will do the flush; wait for it.
140 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
145 TEST_F(DBFlushTest
, FlushInLowPriThreadPool
) {
146 // Verify setting an empty high-pri (flush) thread pool causes flushes to be
147 // scheduled in the low-pri (compaction) thread pool.
148 Options options
= CurrentOptions();
149 options
.level0_file_num_compaction_trigger
= 4;
150 options
.memtable_factory
.reset(new SpecialSkipListFactory(1));
152 env_
->SetBackgroundThreads(0, Env::HIGH
);
155 int num_flushes
= 0, num_compactions
= 0;
156 SyncPoint::GetInstance()->SetCallBack(
157 "DBImpl::BGWorkFlush", [&](void* /*arg*/) {
158 if (tid
== std::thread::id()) {
159 tid
= std::this_thread::get_id();
161 ASSERT_EQ(tid
, std::this_thread::get_id());
165 SyncPoint::GetInstance()->SetCallBack(
166 "DBImpl::BGWorkCompaction", [&](void* /*arg*/) {
167 ASSERT_EQ(tid
, std::this_thread::get_id());
170 SyncPoint::GetInstance()->EnableProcessing();
172 ASSERT_OK(Put("key", "val"));
173 for (int i
= 0; i
< 4; ++i
) {
174 ASSERT_OK(Put("key", "val"));
175 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
177 ASSERT_OK(dbfull()->TEST_WaitForCompact());
178 ASSERT_EQ(4, num_flushes
);
179 ASSERT_EQ(1, num_compactions
);
182 TEST_F(DBFlushTest
, ManualFlushWithMinWriteBufferNumberToMerge
) {
183 Options options
= CurrentOptions();
184 options
.write_buffer_size
= 100;
185 options
.max_write_buffer_number
= 4;
186 options
.min_write_buffer_number_to_merge
= 3;
189 SyncPoint::GetInstance()->LoadDependency(
190 {{"DBImpl::BGWorkFlush",
191 "DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"},
192 {"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2",
193 "FlushJob::WriteLevel0Table"}});
194 SyncPoint::GetInstance()->EnableProcessing();
196 ASSERT_OK(Put("key1", "value1"));
198 port::Thread
t([&]() {
199 // The call wait for flush to finish, i.e. with flush_options.wait = true.
203 // Wait for flush start.
204 TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1");
205 // Insert a second memtable before the manual flush finish.
206 // At the end of the manual flush job, it will check if further flush
207 // is needed, but it will not trigger flush of the second memtable because
208 // min_write_buffer_number_to_merge is not reached.
209 ASSERT_OK(Put("key2", "value2"));
210 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
211 TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2");
213 // Manual flush should return, without waiting for flush indefinitely.
217 TEST_F(DBFlushTest
, ScheduleOnlyOneBgThread
) {
218 Options options
= CurrentOptions();
220 SyncPoint::GetInstance()->DisableProcessing();
221 SyncPoint::GetInstance()->ClearAllCallBacks();
223 SyncPoint::GetInstance()->SetCallBack(
224 "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", [&](void* arg
) {
225 ASSERT_NE(nullptr, arg
);
226 auto unscheduled_flushes
= *reinterpret_cast<int*>(arg
);
227 ASSERT_EQ(0, unscheduled_flushes
);
230 SyncPoint::GetInstance()->EnableProcessing();
232 ASSERT_OK(Put("a", "foo"));
233 FlushOptions flush_opts
;
234 ASSERT_OK(dbfull()->Flush(flush_opts
));
235 ASSERT_EQ(1, called
);
237 SyncPoint::GetInstance()->DisableProcessing();
238 SyncPoint::GetInstance()->ClearAllCallBacks();
241 TEST_P(DBFlushDirectIOTest
, DirectIO
) {
243 options
.create_if_missing
= true;
244 options
.disable_auto_compactions
= true;
245 options
.max_background_flushes
= 2;
246 options
.use_direct_io_for_flush_and_compaction
= GetParam();
247 options
.env
= new MockEnv(Env::Default());
248 SyncPoint::GetInstance()->SetCallBack(
249 "BuildTable:create_file", [&](void* arg
) {
250 bool* use_direct_writes
= static_cast<bool*>(arg
);
251 ASSERT_EQ(*use_direct_writes
,
252 options
.use_direct_io_for_flush_and_compaction
);
255 SyncPoint::GetInstance()->EnableProcessing();
257 ASSERT_OK(Put("foo", "v"));
258 FlushOptions flush_options
;
259 flush_options
.wait
= true;
260 ASSERT_OK(dbfull()->Flush(flush_options
));
265 TEST_F(DBFlushTest
, FlushError
) {
267 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env(
268 new FaultInjectionTestEnv(env_
));
269 options
.write_buffer_size
= 100;
270 options
.max_write_buffer_number
= 4;
271 options
.min_write_buffer_number_to_merge
= 3;
272 options
.disable_auto_compactions
= true;
273 options
.env
= fault_injection_env
.get();
276 ASSERT_OK(Put("key1", "value1"));
277 ASSERT_OK(Put("key2", "value2"));
278 fault_injection_env
->SetFilesystemActive(false);
279 Status s
= dbfull()->TEST_SwitchMemtable();
280 fault_injection_env
->SetFilesystemActive(true);
282 ASSERT_NE(s
, Status::OK());
285 TEST_F(DBFlushTest
, ManualFlushFailsInReadOnlyMode
) {
286 // Regression test for bug where manual flush hangs forever when the DB
287 // is in read-only mode. Verify it now at least returns, despite failing.
289 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env(
290 new FaultInjectionTestEnv(env_
));
291 options
.env
= fault_injection_env
.get();
292 options
.max_write_buffer_number
= 2;
295 // Trigger a first flush but don't let it run
296 ASSERT_OK(db_
->PauseBackgroundWork());
297 ASSERT_OK(Put("key1", "value1"));
298 FlushOptions flush_opts
;
299 flush_opts
.wait
= false;
300 ASSERT_OK(db_
->Flush(flush_opts
));
302 // Write a key to the second memtable so we have something to flush later
303 // after the DB is in read-only mode.
304 ASSERT_OK(Put("key2", "value2"));
306 // Let the first flush continue, hit an error, and put the DB in read-only
308 fault_injection_env
->SetFilesystemActive(false);
309 ASSERT_OK(db_
->ContinueBackgroundWork());
310 // We ingested the error to env, so the returned status is not OK.
311 ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
313 uint64_t num_bg_errors
;
314 ASSERT_TRUE(db_
->GetIntProperty(DB::Properties::kBackgroundErrors
,
316 ASSERT_GT(num_bg_errors
, 0);
317 #endif // ROCKSDB_LITE
319 // In the bug scenario, triggering another flush would cause the second flush
320 // to hang forever. After the fix we expect it to return an error.
321 ASSERT_NOK(db_
->Flush(FlushOptions()));
326 TEST_F(DBFlushTest
, CFDropRaceWithWaitForFlushMemTables
) {
327 Options options
= CurrentOptions();
328 options
.create_if_missing
= true;
329 CreateAndReopenWithCF({"pikachu"}, options
);
330 SyncPoint::GetInstance()->DisableProcessing();
331 SyncPoint::GetInstance()->LoadDependency(
332 {{"DBImpl::FlushMemTable:AfterScheduleFlush",
333 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
334 {"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
335 "DBImpl::BackgroundCallFlush:start"},
336 {"DBImpl::BackgroundCallFlush:start",
337 "DBImpl::FlushMemTable:BeforeWaitForBgFlush"}});
338 SyncPoint::GetInstance()->EnableProcessing();
339 ASSERT_EQ(2, handles_
.size());
340 ASSERT_OK(Put(1, "key", "value"));
341 auto* cfd
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[1])->cfd();
342 port::Thread
drop_cf_thr([&]() {
344 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
345 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
346 ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_
[1]));
349 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
351 FlushOptions flush_opts
;
352 flush_opts
.allow_write_stall
= true;
353 ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd
, flush_opts
));
356 SyncPoint::GetInstance()->DisableProcessing();
360 TEST_F(DBFlushTest
, FireOnFlushCompletedAfterCommittedResult
) {
361 class TestListener
: public EventListener
{
363 void OnFlushCompleted(DB
* db
, const FlushJobInfo
& info
) override
{
364 // There's only one key in each flush.
365 ASSERT_EQ(info
.smallest_seqno
, info
.largest_seqno
);
366 ASSERT_NE(0, info
.smallest_seqno
);
367 if (info
.smallest_seqno
== seq1
) {
368 // First flush completed
369 ASSERT_FALSE(completed1
);
371 CheckFlushResultCommitted(db
, seq1
);
373 // Second flush completed
374 ASSERT_FALSE(completed2
);
376 ASSERT_EQ(info
.smallest_seqno
, seq2
);
377 CheckFlushResultCommitted(db
, seq2
);
381 void CheckFlushResultCommitted(DB
* db
, SequenceNumber seq
) {
382 DBImpl
* db_impl
= static_cast_with_check
<DBImpl
>(db
);
383 InstrumentedMutex
* mutex
= db_impl
->mutex();
385 auto* cfd
= static_cast_with_check
<ColumnFamilyHandleImpl
>(
386 db
->DefaultColumnFamily())
388 ASSERT_LT(seq
, cfd
->imm()->current()->GetEarliestSequenceNumber());
392 std::atomic
<SequenceNumber
> seq1
{0};
393 std::atomic
<SequenceNumber
> seq2
{0};
394 std::atomic
<bool> completed1
{false};
395 std::atomic
<bool> completed2
{false};
397 std::shared_ptr
<TestListener
> listener
= std::make_shared
<TestListener
>();
399 SyncPoint::GetInstance()->LoadDependency(
400 {{"DBImpl::BackgroundCallFlush:start",
401 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},
402 {"DBImpl::FlushMemTableToOutputFile:Finish",
403 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});
404 SyncPoint::GetInstance()->SetCallBack(
405 "FlushJob::WriteLevel0Table", [&listener
](void* arg
) {
406 // Wait for the second flush finished, out of mutex.
407 auto* mems
= reinterpret_cast<autovector
<MemTable
*>*>(arg
);
408 if (mems
->front()->GetEarliestSequenceNumber() == listener
->seq1
- 1) {
410 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:"
415 Options options
= CurrentOptions();
416 options
.create_if_missing
= true;
417 options
.listeners
.push_back(listener
);
418 // Setting max_flush_jobs = max_background_jobs / 4 = 2.
419 options
.max_background_jobs
= 8;
420 // Allow 2 immutable memtables.
421 options
.max_write_buffer_number
= 3;
423 SyncPoint::GetInstance()->EnableProcessing();
424 ASSERT_OK(Put("foo", "v"));
425 listener
->seq1
= db_
->GetLatestSequenceNumber();
426 // t1 will wait for the second flush complete before committing flush result.
427 auto t1
= port::Thread([&]() {
428 // flush_opts.wait = true
429 ASSERT_OK(db_
->Flush(FlushOptions()));
431 // Wait for first flush started.
433 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst");
434 // The second flush will exit early without commit its result. The work
435 // is delegated to the first flush.
436 ASSERT_OK(Put("bar", "v"));
437 listener
->seq2
= db_
->GetLatestSequenceNumber();
438 FlushOptions flush_opts
;
439 flush_opts
.wait
= false;
440 ASSERT_OK(db_
->Flush(flush_opts
));
442 ASSERT_TRUE(listener
->completed1
);
443 ASSERT_TRUE(listener
->completed2
);
444 SyncPoint::GetInstance()->DisableProcessing();
445 SyncPoint::GetInstance()->ClearAllCallBacks();
447 #endif // !ROCKSDB_LITE
449 TEST_F(DBFlushTest
, FlushWithBlob
) {
450 constexpr uint64_t min_blob_size
= 10;
453 options
.env
= CurrentOptions().env
;
454 options
.enable_blob_files
= true;
455 options
.min_blob_size
= min_blob_size
;
456 options
.disable_auto_compactions
= true;
461 constexpr char short_value
[] = "short";
462 static_assert(sizeof(short_value
) - 1 < min_blob_size
,
463 "short_value too long");
465 constexpr char long_value
[] = "long_value";
466 static_assert(sizeof(long_value
) - 1 >= min_blob_size
,
467 "long_value too short");
469 ASSERT_OK(Put("key1", short_value
));
470 ASSERT_OK(Put("key2", long_value
));
474 ASSERT_EQ(Get("key1"), short_value
);
475 ASSERT_EQ(Get("key2"), long_value
);
477 VersionSet
* const versions
= dbfull()->TEST_GetVersionSet();
480 ColumnFamilyData
* const cfd
= versions
->GetColumnFamilySet()->GetDefault();
483 Version
* const current
= cfd
->current();
486 const VersionStorageInfo
* const storage_info
= current
->storage_info();
487 assert(storage_info
);
489 const auto& l0_files
= storage_info
->LevelFiles(0);
490 ASSERT_EQ(l0_files
.size(), 1);
492 const FileMetaData
* const table_file
= l0_files
[0];
495 const auto& blob_files
= storage_info
->GetBlobFiles();
496 ASSERT_EQ(blob_files
.size(), 1);
498 const auto& blob_file
= blob_files
.begin()->second
;
501 ASSERT_EQ(table_file
->smallest
.user_key(), "key1");
502 ASSERT_EQ(table_file
->largest
.user_key(), "key2");
503 ASSERT_EQ(table_file
->fd
.smallest_seqno
, 1);
504 ASSERT_EQ(table_file
->fd
.largest_seqno
, 2);
505 ASSERT_EQ(table_file
->oldest_blob_file_number
,
506 blob_file
->GetBlobFileNumber());
508 ASSERT_EQ(blob_file
->GetTotalBlobCount(), 1);
511 const InternalStats
* const internal_stats
= cfd
->internal_stats();
512 assert(internal_stats
);
514 const uint64_t expected_bytes
=
515 table_file
->fd
.GetFileSize() + blob_file
->GetTotalBlobBytes();
517 const auto& compaction_stats
= internal_stats
->TEST_GetCompactionStats();
518 ASSERT_FALSE(compaction_stats
.empty());
519 ASSERT_EQ(compaction_stats
[0].bytes_written
, expected_bytes
);
520 ASSERT_EQ(compaction_stats
[0].num_output_files
, 2);
522 const uint64_t* const cf_stats_value
= internal_stats
->TEST_GetCFStatsValue();
523 ASSERT_EQ(cf_stats_value
[InternalStats::BYTES_FLUSHED
], expected_bytes
);
524 #endif // ROCKSDB_LITE
527 class DBFlushTestBlobError
: public DBFlushTest
,
528 public testing::WithParamInterface
<std::string
> {
530 DBFlushTestBlobError()
531 : fault_injection_env_(env_
), sync_point_(GetParam()) {}
532 ~DBFlushTestBlobError() { Close(); }
534 FaultInjectionTestEnv fault_injection_env_
;
535 std::string sync_point_
;
538 INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError
, DBFlushTestBlobError
,
539 ::testing::ValuesIn(std::vector
<std::string
>{
540 "BlobFileBuilder::WriteBlobToFile:AddRecord",
541 "BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
543 TEST_P(DBFlushTestBlobError
, FlushError
) {
545 options
.enable_blob_files
= true;
546 options
.disable_auto_compactions
= true;
547 options
.env
= &fault_injection_env_
;
551 ASSERT_OK(Put("key", "blob"));
553 SyncPoint::GetInstance()->SetCallBack(sync_point_
, [this](void* /* arg */) {
554 fault_injection_env_
.SetFilesystemActive(false,
555 Status::IOError(sync_point_
));
557 SyncPoint::GetInstance()->SetCallBack(
558 "BuildTable:BeforeDeleteFile", [this](void* /* arg */) {
559 fault_injection_env_
.SetFilesystemActive(true);
561 SyncPoint::GetInstance()->EnableProcessing();
565 SyncPoint::GetInstance()->DisableProcessing();
566 SyncPoint::GetInstance()->ClearAllCallBacks();
568 VersionSet
* const versions
= dbfull()->TEST_GetVersionSet();
571 ColumnFamilyData
* const cfd
= versions
->GetColumnFamilySet()->GetDefault();
574 Version
* const current
= cfd
->current();
577 const VersionStorageInfo
* const storage_info
= current
->storage_info();
578 assert(storage_info
);
580 const auto& l0_files
= storage_info
->LevelFiles(0);
581 ASSERT_TRUE(l0_files
.empty());
583 const auto& blob_files
= storage_info
->GetBlobFiles();
584 ASSERT_TRUE(blob_files
.empty());
586 // Make sure the files generated by the failed job have been deleted
587 std::vector
<std::string
> files
;
588 ASSERT_OK(env_
->GetChildren(dbname_
, &files
));
589 for (const auto& file
: files
) {
591 FileType type
= kTableFile
;
593 if (!ParseFileName(file
, &number
, &type
)) {
597 ASSERT_NE(type
, kTableFile
);
598 ASSERT_NE(type
, kBlobFile
);
602 const InternalStats
* const internal_stats
= cfd
->internal_stats();
603 assert(internal_stats
);
605 const auto& compaction_stats
= internal_stats
->TEST_GetCompactionStats();
606 ASSERT_FALSE(compaction_stats
.empty());
608 if (sync_point_
== "BlobFileBuilder::WriteBlobToFile:AddRecord") {
609 ASSERT_EQ(compaction_stats
[0].bytes_written
, 0);
610 ASSERT_EQ(compaction_stats
[0].num_output_files
, 0);
612 // SST file writing succeeded; blob file writing failed (during Finish)
613 ASSERT_GT(compaction_stats
[0].bytes_written
, 0);
614 ASSERT_EQ(compaction_stats
[0].num_output_files
, 1);
617 const uint64_t* const cf_stats_value
= internal_stats
->TEST_GetCFStatsValue();
618 ASSERT_EQ(cf_stats_value
[InternalStats::BYTES_FLUSHED
],
619 compaction_stats
[0].bytes_written
);
620 #endif // ROCKSDB_LITE
623 TEST_P(DBAtomicFlushTest
, ManualAtomicFlush
) {
624 Options options
= CurrentOptions();
625 options
.create_if_missing
= true;
626 options
.atomic_flush
= GetParam();
627 options
.write_buffer_size
= (static_cast<size_t>(64) << 20);
629 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
630 size_t num_cfs
= handles_
.size();
631 ASSERT_EQ(3, num_cfs
);
633 wopts
.disableWAL
= true;
634 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
635 ASSERT_OK(Put(static_cast<int>(i
) /*cf*/, "key", "value", wopts
));
637 std::vector
<int> cf_ids
;
638 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
639 cf_ids
.emplace_back(static_cast<int>(i
));
641 ASSERT_OK(Flush(cf_ids
));
642 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
643 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
644 ASSERT_EQ(0, cfh
->cfd()->imm()->NumNotFlushed());
645 ASSERT_TRUE(cfh
->cfd()->mem()->IsEmpty());
649 TEST_P(DBAtomicFlushTest
, AtomicFlushTriggeredByMemTableFull
) {
650 Options options
= CurrentOptions();
651 options
.create_if_missing
= true;
652 options
.atomic_flush
= GetParam();
653 // 4KB so that we can easily trigger auto flush.
654 options
.write_buffer_size
= 4096;
656 SyncPoint::GetInstance()->LoadDependency(
657 {{"DBImpl::BackgroundCallFlush:FlushFinish:0",
658 "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"}});
659 SyncPoint::GetInstance()->EnableProcessing();
661 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
662 size_t num_cfs
= handles_
.size();
663 ASSERT_EQ(3, num_cfs
);
665 wopts
.disableWAL
= true;
666 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
667 ASSERT_OK(Put(static_cast<int>(i
) /*cf*/, "key", "value", wopts
));
669 // Keep writing to one of them column families to trigger auto flush.
670 for (int i
= 0; i
!= 4000; ++i
) {
671 ASSERT_OK(Put(static_cast<int>(num_cfs
) - 1 /*cf*/,
672 "key" + std::to_string(i
), "value" + std::to_string(i
),
677 "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck");
678 if (options
.atomic_flush
) {
679 for (size_t i
= 0; i
+ 1 != num_cfs
; ++i
) {
680 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
681 ASSERT_EQ(0, cfh
->cfd()->imm()->NumNotFlushed());
682 ASSERT_TRUE(cfh
->cfd()->mem()->IsEmpty());
685 for (size_t i
= 0; i
+ 1 != num_cfs
; ++i
) {
686 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
687 ASSERT_EQ(0, cfh
->cfd()->imm()->NumNotFlushed());
688 ASSERT_FALSE(cfh
->cfd()->mem()->IsEmpty());
691 SyncPoint::GetInstance()->DisableProcessing();
694 TEST_P(DBAtomicFlushTest
, AtomicFlushRollbackSomeJobs
) {
695 bool atomic_flush
= GetParam();
699 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env(
700 new FaultInjectionTestEnv(env_
));
701 Options options
= CurrentOptions();
702 options
.create_if_missing
= true;
703 options
.atomic_flush
= atomic_flush
;
704 options
.env
= fault_injection_env
.get();
705 SyncPoint::GetInstance()->DisableProcessing();
706 SyncPoint::GetInstance()->LoadDependency(
707 {{"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1",
708 "DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1"},
709 {"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2",
710 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"}});
711 SyncPoint::GetInstance()->EnableProcessing();
713 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
714 size_t num_cfs
= handles_
.size();
715 ASSERT_EQ(3, num_cfs
);
717 wopts
.disableWAL
= true;
718 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
719 int cf_id
= static_cast<int>(i
);
720 ASSERT_OK(Put(cf_id
, "key", "value", wopts
));
722 FlushOptions flush_opts
;
723 flush_opts
.wait
= false;
724 ASSERT_OK(dbfull()->Flush(flush_opts
, handles_
));
725 TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1");
726 fault_injection_env
->SetFilesystemActive(false);
727 TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2");
728 for (auto* cfh
: handles_
) {
729 // Returns the IO error happend during flush.
730 ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable(cfh
));
732 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
733 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[i
]);
734 ASSERT_EQ(1, cfh
->cfd()->imm()->NumNotFlushed());
735 ASSERT_TRUE(cfh
->cfd()->mem()->IsEmpty());
737 fault_injection_env
->SetFilesystemActive(true);
741 TEST_P(DBAtomicFlushTest
, FlushMultipleCFs_DropSomeBeforeRequestFlush
) {
742 bool atomic_flush
= GetParam();
746 Options options
= CurrentOptions();
747 options
.create_if_missing
= true;
748 options
.atomic_flush
= atomic_flush
;
749 SyncPoint::GetInstance()->DisableProcessing();
750 SyncPoint::GetInstance()->ClearAllCallBacks();
751 SyncPoint::GetInstance()->EnableProcessing();
753 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
754 size_t num_cfs
= handles_
.size();
755 ASSERT_EQ(3, num_cfs
);
757 wopts
.disableWAL
= true;
758 std::vector
<int> cf_ids
;
759 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
760 int cf_id
= static_cast<int>(i
);
761 ASSERT_OK(Put(cf_id
, "key", "value", wopts
));
762 cf_ids
.push_back(cf_id
);
764 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
765 ASSERT_TRUE(Flush(cf_ids
).IsColumnFamilyDropped());
769 TEST_P(DBAtomicFlushTest
,
770 FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun
) {
771 bool atomic_flush
= GetParam();
775 Options options
= CurrentOptions();
776 options
.create_if_missing
= true;
777 options
.atomic_flush
= atomic_flush
;
779 CreateAndReopenWithCF({"pikachu", "eevee"}, options
);
781 SyncPoint::GetInstance()->DisableProcessing();
782 SyncPoint::GetInstance()->ClearAllCallBacks();
783 SyncPoint::GetInstance()->LoadDependency(
784 {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
785 "DBAtomicFlushTest::BeforeDropCF"},
786 {"DBAtomicFlushTest::AfterDropCF",
787 "DBImpl::BackgroundCallFlush:start"}});
788 SyncPoint::GetInstance()->EnableProcessing();
790 size_t num_cfs
= handles_
.size();
791 ASSERT_EQ(3, num_cfs
);
793 wopts
.disableWAL
= true;
794 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
795 int cf_id
= static_cast<int>(i
);
796 ASSERT_OK(Put(cf_id
, "key", "value", wopts
));
798 port::Thread
user_thread([&]() {
799 TEST_SYNC_POINT("DBAtomicFlushTest::BeforeDropCF");
800 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
801 TEST_SYNC_POINT("DBAtomicFlushTest::AfterDropCF");
803 FlushOptions flush_opts
;
804 flush_opts
.wait
= true;
805 ASSERT_OK(dbfull()->Flush(flush_opts
, handles_
));
807 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
808 int cf_id
= static_cast<int>(i
);
809 ASSERT_EQ("value", Get(cf_id
, "key"));
812 ReopenWithColumnFamilies({kDefaultColumnFamilyName
, "eevee"}, options
);
813 num_cfs
= handles_
.size();
814 ASSERT_EQ(2, num_cfs
);
815 for (size_t i
= 0; i
!= num_cfs
; ++i
) {
816 int cf_id
= static_cast<int>(i
);
817 ASSERT_EQ("value", Get(cf_id
, "key"));
822 TEST_P(DBAtomicFlushTest
, TriggerFlushAndClose
) {
823 bool atomic_flush
= GetParam();
827 const int kNumKeysTriggerFlush
= 4;
828 Options options
= CurrentOptions();
829 options
.create_if_missing
= true;
830 options
.atomic_flush
= atomic_flush
;
831 options
.memtable_factory
.reset(
832 new SpecialSkipListFactory(kNumKeysTriggerFlush
));
833 CreateAndReopenWithCF({"pikachu"}, options
);
835 for (int i
= 0; i
!= kNumKeysTriggerFlush
; ++i
) {
836 ASSERT_OK(Put(0, "key" + std::to_string(i
), "value" + std::to_string(i
)));
838 SyncPoint::GetInstance()->DisableProcessing();
839 SyncPoint::GetInstance()->ClearAllCallBacks();
840 SyncPoint::GetInstance()->EnableProcessing();
841 ASSERT_OK(Put(0, "key", "value"));
844 ReopenWithColumnFamilies({kDefaultColumnFamilyName
, "pikachu"}, options
);
845 ASSERT_EQ("value", Get(0, "key"));
848 TEST_P(DBAtomicFlushTest
, PickMemtablesRaceWithBackgroundFlush
) {
849 bool atomic_flush
= GetParam();
850 Options options
= CurrentOptions();
851 options
.create_if_missing
= true;
852 options
.atomic_flush
= atomic_flush
;
853 options
.max_write_buffer_number
= 4;
854 // Set min_write_buffer_number_to_merge to be greater than 1, so that
855 // a column family with one memtable in the imm will not cause IsFlushPending
856 // to return true when flush_requested_ is false.
857 options
.min_write_buffer_number_to_merge
= 2;
858 CreateAndReopenWithCF({"pikachu"}, options
);
859 ASSERT_EQ(2, handles_
.size());
860 ASSERT_OK(dbfull()->PauseBackgroundWork());
861 ASSERT_OK(Put(0, "key00", "value00"));
862 ASSERT_OK(Put(1, "key10", "value10"));
863 FlushOptions flush_opts
;
864 flush_opts
.wait
= false;
865 ASSERT_OK(dbfull()->Flush(flush_opts
, handles_
));
866 ASSERT_OK(Put(0, "key01", "value01"));
867 // Since max_write_buffer_number is 4, the following flush won't cause write
869 ASSERT_OK(dbfull()->Flush(flush_opts
));
870 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
871 ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_
[1]));
872 handles_
[1] = nullptr;
873 ASSERT_OK(dbfull()->ContinueBackgroundWork());
874 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_
[0]));
879 TEST_P(DBAtomicFlushTest
, CFDropRaceWithWaitForFlushMemTables
) {
880 bool atomic_flush
= GetParam();
884 Options options
= CurrentOptions();
885 options
.create_if_missing
= true;
886 options
.atomic_flush
= atomic_flush
;
887 CreateAndReopenWithCF({"pikachu"}, options
);
888 SyncPoint::GetInstance()->DisableProcessing();
889 SyncPoint::GetInstance()->LoadDependency(
890 {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
891 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
892 {"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
893 "DBImpl::BackgroundCallFlush:start"},
894 {"DBImpl::BackgroundCallFlush:start",
895 "DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}});
896 SyncPoint::GetInstance()->EnableProcessing();
897 ASSERT_EQ(2, handles_
.size());
898 ASSERT_OK(Put(0, "key", "value"));
899 ASSERT_OK(Put(1, "key", "value"));
901 static_cast<ColumnFamilyHandleImpl
*>(dbfull()->DefaultColumnFamily())
903 auto* cfd_pikachu
= static_cast<ColumnFamilyHandleImpl
*>(handles_
[1])->cfd();
904 port::Thread
drop_cf_thr([&]() {
906 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
907 ASSERT_OK(dbfull()->DropColumnFamily(handles_
[1]));
911 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
913 FlushOptions flush_opts
;
914 flush_opts
.allow_write_stall
= true;
915 ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default
, cfd_pikachu
},
919 SyncPoint::GetInstance()->DisableProcessing();
922 TEST_P(DBAtomicFlushTest
, RollbackAfterFailToInstallResults
) {
923 bool atomic_flush
= GetParam();
927 auto fault_injection_env
= std::make_shared
<FaultInjectionTestEnv
>(env_
);
928 Options options
= CurrentOptions();
929 options
.env
= fault_injection_env
.get();
930 options
.create_if_missing
= true;
931 options
.atomic_flush
= atomic_flush
;
932 CreateAndReopenWithCF({"pikachu"}, options
);
933 ASSERT_EQ(2, handles_
.size());
934 for (size_t cf
= 0; cf
< handles_
.size(); ++cf
) {
935 ASSERT_OK(Put(static_cast<int>(cf
), "a", "value"));
937 SyncPoint::GetInstance()->DisableProcessing();
938 SyncPoint::GetInstance()->ClearAllCallBacks();
939 SyncPoint::GetInstance()->SetCallBack(
940 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
941 [&](void* /*arg*/) { fault_injection_env
->SetFilesystemActive(false); });
942 SyncPoint::GetInstance()->EnableProcessing();
943 FlushOptions flush_opts
;
944 Status s
= db_
->Flush(flush_opts
, handles_
);
946 fault_injection_env
->SetFilesystemActive(true);
948 SyncPoint::GetInstance()->ClearAllCallBacks();
951 INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest
, DBFlushDirectIOTest
,
954 INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest
, DBAtomicFlushTest
, testing::Bool());
956 } // namespace ROCKSDB_NAMESPACE
958 int main(int argc
, char** argv
) {
959 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
960 ::testing::InitGoogleTest(&argc
, argv
);
961 return RUN_ALL_TESTS();