]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_flush_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_flush_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include <atomic>
11
12 #include "db/db_impl/db_impl.h"
13 #include "db/db_test_util.h"
14 #include "file/filename.h"
15 #include "port/port.h"
16 #include "port/stack_trace.h"
17 #include "test_util/sync_point.h"
18 #include "util/cast_util.h"
19 #include "util/mutexlock.h"
20 #include "utilities/fault_injection_env.h"
21
22 namespace ROCKSDB_NAMESPACE {
23
24 class DBFlushTest : public DBTestBase {
25 public:
26 DBFlushTest() : DBTestBase("/db_flush_test", /*env_do_fsync=*/true) {}
27 };
28
29 class DBFlushDirectIOTest : public DBFlushTest,
30 public ::testing::WithParamInterface<bool> {
31 public:
32 DBFlushDirectIOTest() : DBFlushTest() {}
33 };
34
35 class DBAtomicFlushTest : public DBFlushTest,
36 public ::testing::WithParamInterface<bool> {
37 public:
38 DBAtomicFlushTest() : DBFlushTest() {}
39 };
40
41 // We had issue when two background threads trying to flush at the same time,
42 // only one of them get committed. The test verifies the issue is fixed.
43 TEST_F(DBFlushTest, FlushWhileWritingManifest) {
44 Options options;
45 options.disable_auto_compactions = true;
46 options.max_background_flushes = 2;
47 options.env = env_;
48 Reopen(options);
49 FlushOptions no_wait;
50 no_wait.wait = false;
51 no_wait.allow_write_stall=true;
52
53 SyncPoint::GetInstance()->LoadDependency(
54 {{"VersionSet::LogAndApply:WriteManifest",
55 "DBFlushTest::FlushWhileWritingManifest:1"},
56 {"MemTableList::TryInstallMemtableFlushResults:InProgress",
57 "VersionSet::LogAndApply:WriteManifestDone"}});
58 SyncPoint::GetInstance()->EnableProcessing();
59
60 ASSERT_OK(Put("foo", "v"));
61 ASSERT_OK(dbfull()->Flush(no_wait));
62 TEST_SYNC_POINT("DBFlushTest::FlushWhileWritingManifest:1");
63 ASSERT_OK(Put("bar", "v"));
64 ASSERT_OK(dbfull()->Flush(no_wait));
65 // If the issue is hit we will wait here forever.
66 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
67 #ifndef ROCKSDB_LITE
68 ASSERT_EQ(2, TotalTableFiles());
69 #endif // ROCKSDB_LITE
70 }
71
72 // Disable this test temporarily on Travis as it fails intermittently.
73 // Github issue: #4151
74 TEST_F(DBFlushTest, SyncFail) {
75 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
76 new FaultInjectionTestEnv(env_));
77 Options options;
78 options.disable_auto_compactions = true;
79 options.env = fault_injection_env.get();
80
81 SyncPoint::GetInstance()->LoadDependency(
82 {{"DBFlushTest::SyncFail:GetVersionRefCount:1",
83 "DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"},
84 {"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables",
85 "DBFlushTest::SyncFail:GetVersionRefCount:2"},
86 {"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
87 {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
88 SyncPoint::GetInstance()->EnableProcessing();
89
90 CreateAndReopenWithCF({"pikachu"}, options);
91 ASSERT_OK(Put("key", "value"));
92 auto* cfd =
93 static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
94 ->cfd();
95 FlushOptions flush_options;
96 flush_options.wait = false;
97 ASSERT_OK(dbfull()->Flush(flush_options));
98 // Flush installs a new super-version. Get the ref count after that.
99 auto current_before = cfd->current();
100 int refs_before = cfd->current()->TEST_refs();
101 TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:1");
102 TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:2");
103 int refs_after_picking_memtables = cfd->current()->TEST_refs();
104 ASSERT_EQ(refs_before + 1, refs_after_picking_memtables);
105 fault_injection_env->SetFilesystemActive(false);
106 TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
107 TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
108 fault_injection_env->SetFilesystemActive(true);
109 // Now the background job will do the flush; wait for it.
110 // Returns the IO error happend during flush.
111 ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
112 #ifndef ROCKSDB_LITE
113 ASSERT_EQ("", FilesPerLevel()); // flush failed.
114 #endif // ROCKSDB_LITE
115 // Backgroun flush job should release ref count to current version.
116 ASSERT_EQ(current_before, cfd->current());
117 ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
118 Destroy(options);
119 }
120
121 TEST_F(DBFlushTest, SyncSkip) {
122 Options options = CurrentOptions();
123
124 SyncPoint::GetInstance()->LoadDependency(
125 {{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedLogs:Skip"},
126 {"DBImpl::SyncClosedLogs:Skip", "DBFlushTest::SyncSkip:2"}});
127 SyncPoint::GetInstance()->EnableProcessing();
128
129 Reopen(options);
130 ASSERT_OK(Put("key", "value"));
131
132 FlushOptions flush_options;
133 flush_options.wait = false;
134 ASSERT_OK(dbfull()->Flush(flush_options));
135
136 TEST_SYNC_POINT("DBFlushTest::SyncSkip:1");
137 TEST_SYNC_POINT("DBFlushTest::SyncSkip:2");
138
139 // Now the background job will do the flush; wait for it.
140 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
141
142 Destroy(options);
143 }
144
145 TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
146 // Verify setting an empty high-pri (flush) thread pool causes flushes to be
147 // scheduled in the low-pri (compaction) thread pool.
148 Options options = CurrentOptions();
149 options.level0_file_num_compaction_trigger = 4;
150 options.memtable_factory.reset(new SpecialSkipListFactory(1));
151 Reopen(options);
152 env_->SetBackgroundThreads(0, Env::HIGH);
153
154 std::thread::id tid;
155 int num_flushes = 0, num_compactions = 0;
156 SyncPoint::GetInstance()->SetCallBack(
157 "DBImpl::BGWorkFlush", [&](void* /*arg*/) {
158 if (tid == std::thread::id()) {
159 tid = std::this_thread::get_id();
160 } else {
161 ASSERT_EQ(tid, std::this_thread::get_id());
162 }
163 ++num_flushes;
164 });
165 SyncPoint::GetInstance()->SetCallBack(
166 "DBImpl::BGWorkCompaction", [&](void* /*arg*/) {
167 ASSERT_EQ(tid, std::this_thread::get_id());
168 ++num_compactions;
169 });
170 SyncPoint::GetInstance()->EnableProcessing();
171
172 ASSERT_OK(Put("key", "val"));
173 for (int i = 0; i < 4; ++i) {
174 ASSERT_OK(Put("key", "val"));
175 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
176 }
177 ASSERT_OK(dbfull()->TEST_WaitForCompact());
178 ASSERT_EQ(4, num_flushes);
179 ASSERT_EQ(1, num_compactions);
180 }
181
182 TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) {
183 Options options = CurrentOptions();
184 options.write_buffer_size = 100;
185 options.max_write_buffer_number = 4;
186 options.min_write_buffer_number_to_merge = 3;
187 Reopen(options);
188
189 SyncPoint::GetInstance()->LoadDependency(
190 {{"DBImpl::BGWorkFlush",
191 "DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1"},
192 {"DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2",
193 "FlushJob::WriteLevel0Table"}});
194 SyncPoint::GetInstance()->EnableProcessing();
195
196 ASSERT_OK(Put("key1", "value1"));
197
198 port::Thread t([&]() {
199 // The call wait for flush to finish, i.e. with flush_options.wait = true.
200 ASSERT_OK(Flush());
201 });
202
203 // Wait for flush start.
204 TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:1");
205 // Insert a second memtable before the manual flush finish.
206 // At the end of the manual flush job, it will check if further flush
207 // is needed, but it will not trigger flush of the second memtable because
208 // min_write_buffer_number_to_merge is not reached.
209 ASSERT_OK(Put("key2", "value2"));
210 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
211 TEST_SYNC_POINT("DBFlushTest::ManualFlushWithMinWriteBufferNumberToMerge:2");
212
213 // Manual flush should return, without waiting for flush indefinitely.
214 t.join();
215 }
216
217 TEST_F(DBFlushTest, ScheduleOnlyOneBgThread) {
218 Options options = CurrentOptions();
219 Reopen(options);
220 SyncPoint::GetInstance()->DisableProcessing();
221 SyncPoint::GetInstance()->ClearAllCallBacks();
222 int called = 0;
223 SyncPoint::GetInstance()->SetCallBack(
224 "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0", [&](void* arg) {
225 ASSERT_NE(nullptr, arg);
226 auto unscheduled_flushes = *reinterpret_cast<int*>(arg);
227 ASSERT_EQ(0, unscheduled_flushes);
228 ++called;
229 });
230 SyncPoint::GetInstance()->EnableProcessing();
231
232 ASSERT_OK(Put("a", "foo"));
233 FlushOptions flush_opts;
234 ASSERT_OK(dbfull()->Flush(flush_opts));
235 ASSERT_EQ(1, called);
236
237 SyncPoint::GetInstance()->DisableProcessing();
238 SyncPoint::GetInstance()->ClearAllCallBacks();
239 }
240
241 TEST_P(DBFlushDirectIOTest, DirectIO) {
242 Options options;
243 options.create_if_missing = true;
244 options.disable_auto_compactions = true;
245 options.max_background_flushes = 2;
246 options.use_direct_io_for_flush_and_compaction = GetParam();
247 options.env = new MockEnv(Env::Default());
248 SyncPoint::GetInstance()->SetCallBack(
249 "BuildTable:create_file", [&](void* arg) {
250 bool* use_direct_writes = static_cast<bool*>(arg);
251 ASSERT_EQ(*use_direct_writes,
252 options.use_direct_io_for_flush_and_compaction);
253 });
254
255 SyncPoint::GetInstance()->EnableProcessing();
256 Reopen(options);
257 ASSERT_OK(Put("foo", "v"));
258 FlushOptions flush_options;
259 flush_options.wait = true;
260 ASSERT_OK(dbfull()->Flush(flush_options));
261 Destroy(options);
262 delete options.env;
263 }
264
265 TEST_F(DBFlushTest, FlushError) {
266 Options options;
267 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
268 new FaultInjectionTestEnv(env_));
269 options.write_buffer_size = 100;
270 options.max_write_buffer_number = 4;
271 options.min_write_buffer_number_to_merge = 3;
272 options.disable_auto_compactions = true;
273 options.env = fault_injection_env.get();
274 Reopen(options);
275
276 ASSERT_OK(Put("key1", "value1"));
277 ASSERT_OK(Put("key2", "value2"));
278 fault_injection_env->SetFilesystemActive(false);
279 Status s = dbfull()->TEST_SwitchMemtable();
280 fault_injection_env->SetFilesystemActive(true);
281 Destroy(options);
282 ASSERT_NE(s, Status::OK());
283 }
284
285 TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) {
286 // Regression test for bug where manual flush hangs forever when the DB
287 // is in read-only mode. Verify it now at least returns, despite failing.
288 Options options;
289 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
290 new FaultInjectionTestEnv(env_));
291 options.env = fault_injection_env.get();
292 options.max_write_buffer_number = 2;
293 Reopen(options);
294
295 // Trigger a first flush but don't let it run
296 ASSERT_OK(db_->PauseBackgroundWork());
297 ASSERT_OK(Put("key1", "value1"));
298 FlushOptions flush_opts;
299 flush_opts.wait = false;
300 ASSERT_OK(db_->Flush(flush_opts));
301
302 // Write a key to the second memtable so we have something to flush later
303 // after the DB is in read-only mode.
304 ASSERT_OK(Put("key2", "value2"));
305
306 // Let the first flush continue, hit an error, and put the DB in read-only
307 // mode.
308 fault_injection_env->SetFilesystemActive(false);
309 ASSERT_OK(db_->ContinueBackgroundWork());
310 // We ingested the error to env, so the returned status is not OK.
311 ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
312 #ifndef ROCKSDB_LITE
313 uint64_t num_bg_errors;
314 ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kBackgroundErrors,
315 &num_bg_errors));
316 ASSERT_GT(num_bg_errors, 0);
317 #endif // ROCKSDB_LITE
318
319 // In the bug scenario, triggering another flush would cause the second flush
320 // to hang forever. After the fix we expect it to return an error.
321 ASSERT_NOK(db_->Flush(FlushOptions()));
322
323 Close();
324 }
325
326 TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) {
327 Options options = CurrentOptions();
328 options.create_if_missing = true;
329 CreateAndReopenWithCF({"pikachu"}, options);
330 SyncPoint::GetInstance()->DisableProcessing();
331 SyncPoint::GetInstance()->LoadDependency(
332 {{"DBImpl::FlushMemTable:AfterScheduleFlush",
333 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
334 {"DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
335 "DBImpl::BackgroundCallFlush:start"},
336 {"DBImpl::BackgroundCallFlush:start",
337 "DBImpl::FlushMemTable:BeforeWaitForBgFlush"}});
338 SyncPoint::GetInstance()->EnableProcessing();
339 ASSERT_EQ(2, handles_.size());
340 ASSERT_OK(Put(1, "key", "value"));
341 auto* cfd = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
342 port::Thread drop_cf_thr([&]() {
343 TEST_SYNC_POINT(
344 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
345 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
346 ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
347 handles_.resize(1);
348 TEST_SYNC_POINT(
349 "DBFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
350 });
351 FlushOptions flush_opts;
352 flush_opts.allow_write_stall = true;
353 ASSERT_NOK(dbfull()->TEST_FlushMemTable(cfd, flush_opts));
354 drop_cf_thr.join();
355 Close();
356 SyncPoint::GetInstance()->DisableProcessing();
357 }
358
359 #ifndef ROCKSDB_LITE
360 TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
361 class TestListener : public EventListener {
362 public:
363 void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
364 // There's only one key in each flush.
365 ASSERT_EQ(info.smallest_seqno, info.largest_seqno);
366 ASSERT_NE(0, info.smallest_seqno);
367 if (info.smallest_seqno == seq1) {
368 // First flush completed
369 ASSERT_FALSE(completed1);
370 completed1 = true;
371 CheckFlushResultCommitted(db, seq1);
372 } else {
373 // Second flush completed
374 ASSERT_FALSE(completed2);
375 completed2 = true;
376 ASSERT_EQ(info.smallest_seqno, seq2);
377 CheckFlushResultCommitted(db, seq2);
378 }
379 }
380
381 void CheckFlushResultCommitted(DB* db, SequenceNumber seq) {
382 DBImpl* db_impl = static_cast_with_check<DBImpl>(db);
383 InstrumentedMutex* mutex = db_impl->mutex();
384 mutex->Lock();
385 auto* cfd = static_cast_with_check<ColumnFamilyHandleImpl>(
386 db->DefaultColumnFamily())
387 ->cfd();
388 ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber());
389 mutex->Unlock();
390 }
391
392 std::atomic<SequenceNumber> seq1{0};
393 std::atomic<SequenceNumber> seq2{0};
394 std::atomic<bool> completed1{false};
395 std::atomic<bool> completed2{false};
396 };
397 std::shared_ptr<TestListener> listener = std::make_shared<TestListener>();
398
399 SyncPoint::GetInstance()->LoadDependency(
400 {{"DBImpl::BackgroundCallFlush:start",
401 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},
402 {"DBImpl::FlushMemTableToOutputFile:Finish",
403 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});
404 SyncPoint::GetInstance()->SetCallBack(
405 "FlushJob::WriteLevel0Table", [&listener](void* arg) {
406 // Wait for the second flush finished, out of mutex.
407 auto* mems = reinterpret_cast<autovector<MemTable*>*>(arg);
408 if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) {
409 TEST_SYNC_POINT(
410 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:"
411 "WaitSecond");
412 }
413 });
414
415 Options options = CurrentOptions();
416 options.create_if_missing = true;
417 options.listeners.push_back(listener);
418 // Setting max_flush_jobs = max_background_jobs / 4 = 2.
419 options.max_background_jobs = 8;
420 // Allow 2 immutable memtables.
421 options.max_write_buffer_number = 3;
422 Reopen(options);
423 SyncPoint::GetInstance()->EnableProcessing();
424 ASSERT_OK(Put("foo", "v"));
425 listener->seq1 = db_->GetLatestSequenceNumber();
426 // t1 will wait for the second flush complete before committing flush result.
427 auto t1 = port::Thread([&]() {
428 // flush_opts.wait = true
429 ASSERT_OK(db_->Flush(FlushOptions()));
430 });
431 // Wait for first flush started.
432 TEST_SYNC_POINT(
433 "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst");
434 // The second flush will exit early without commit its result. The work
435 // is delegated to the first flush.
436 ASSERT_OK(Put("bar", "v"));
437 listener->seq2 = db_->GetLatestSequenceNumber();
438 FlushOptions flush_opts;
439 flush_opts.wait = false;
440 ASSERT_OK(db_->Flush(flush_opts));
441 t1.join();
442 ASSERT_TRUE(listener->completed1);
443 ASSERT_TRUE(listener->completed2);
444 SyncPoint::GetInstance()->DisableProcessing();
445 SyncPoint::GetInstance()->ClearAllCallBacks();
446 }
447 #endif // !ROCKSDB_LITE
448
449 TEST_F(DBFlushTest, FlushWithBlob) {
450 constexpr uint64_t min_blob_size = 10;
451
452 Options options;
453 options.env = CurrentOptions().env;
454 options.enable_blob_files = true;
455 options.min_blob_size = min_blob_size;
456 options.disable_auto_compactions = true;
457 options.env = env_;
458
459 Reopen(options);
460
461 constexpr char short_value[] = "short";
462 static_assert(sizeof(short_value) - 1 < min_blob_size,
463 "short_value too long");
464
465 constexpr char long_value[] = "long_value";
466 static_assert(sizeof(long_value) - 1 >= min_blob_size,
467 "long_value too short");
468
469 ASSERT_OK(Put("key1", short_value));
470 ASSERT_OK(Put("key2", long_value));
471
472 ASSERT_OK(Flush());
473
474 ASSERT_EQ(Get("key1"), short_value);
475 ASSERT_EQ(Get("key2"), long_value);
476
477 VersionSet* const versions = dbfull()->TEST_GetVersionSet();
478 assert(versions);
479
480 ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
481 assert(cfd);
482
483 Version* const current = cfd->current();
484 assert(current);
485
486 const VersionStorageInfo* const storage_info = current->storage_info();
487 assert(storage_info);
488
489 const auto& l0_files = storage_info->LevelFiles(0);
490 ASSERT_EQ(l0_files.size(), 1);
491
492 const FileMetaData* const table_file = l0_files[0];
493 assert(table_file);
494
495 const auto& blob_files = storage_info->GetBlobFiles();
496 ASSERT_EQ(blob_files.size(), 1);
497
498 const auto& blob_file = blob_files.begin()->second;
499 assert(blob_file);
500
501 ASSERT_EQ(table_file->smallest.user_key(), "key1");
502 ASSERT_EQ(table_file->largest.user_key(), "key2");
503 ASSERT_EQ(table_file->fd.smallest_seqno, 1);
504 ASSERT_EQ(table_file->fd.largest_seqno, 2);
505 ASSERT_EQ(table_file->oldest_blob_file_number,
506 blob_file->GetBlobFileNumber());
507
508 ASSERT_EQ(blob_file->GetTotalBlobCount(), 1);
509
510 #ifndef ROCKSDB_LITE
511 const InternalStats* const internal_stats = cfd->internal_stats();
512 assert(internal_stats);
513
514 const uint64_t expected_bytes =
515 table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes();
516
517 const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
518 ASSERT_FALSE(compaction_stats.empty());
519 ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes);
520 ASSERT_EQ(compaction_stats[0].num_output_files, 2);
521
522 const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
523 ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes);
524 #endif // ROCKSDB_LITE
525 }
526
527 class DBFlushTestBlobError : public DBFlushTest,
528 public testing::WithParamInterface<std::string> {
529 public:
530 DBFlushTestBlobError()
531 : fault_injection_env_(env_), sync_point_(GetParam()) {}
532 ~DBFlushTestBlobError() { Close(); }
533
534 FaultInjectionTestEnv fault_injection_env_;
535 std::string sync_point_;
536 };
537
538 INSTANTIATE_TEST_CASE_P(DBFlushTestBlobError, DBFlushTestBlobError,
539 ::testing::ValuesIn(std::vector<std::string>{
540 "BlobFileBuilder::WriteBlobToFile:AddRecord",
541 "BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
542
543 TEST_P(DBFlushTestBlobError, FlushError) {
544 Options options;
545 options.enable_blob_files = true;
546 options.disable_auto_compactions = true;
547 options.env = &fault_injection_env_;
548
549 Reopen(options);
550
551 ASSERT_OK(Put("key", "blob"));
552
553 SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
554 fault_injection_env_.SetFilesystemActive(false,
555 Status::IOError(sync_point_));
556 });
557 SyncPoint::GetInstance()->SetCallBack(
558 "BuildTable:BeforeDeleteFile", [this](void* /* arg */) {
559 fault_injection_env_.SetFilesystemActive(true);
560 });
561 SyncPoint::GetInstance()->EnableProcessing();
562
563 ASSERT_NOK(Flush());
564
565 SyncPoint::GetInstance()->DisableProcessing();
566 SyncPoint::GetInstance()->ClearAllCallBacks();
567
568 VersionSet* const versions = dbfull()->TEST_GetVersionSet();
569 assert(versions);
570
571 ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
572 assert(cfd);
573
574 Version* const current = cfd->current();
575 assert(current);
576
577 const VersionStorageInfo* const storage_info = current->storage_info();
578 assert(storage_info);
579
580 const auto& l0_files = storage_info->LevelFiles(0);
581 ASSERT_TRUE(l0_files.empty());
582
583 const auto& blob_files = storage_info->GetBlobFiles();
584 ASSERT_TRUE(blob_files.empty());
585
586 // Make sure the files generated by the failed job have been deleted
587 std::vector<std::string> files;
588 ASSERT_OK(env_->GetChildren(dbname_, &files));
589 for (const auto& file : files) {
590 uint64_t number = 0;
591 FileType type = kTableFile;
592
593 if (!ParseFileName(file, &number, &type)) {
594 continue;
595 }
596
597 ASSERT_NE(type, kTableFile);
598 ASSERT_NE(type, kBlobFile);
599 }
600
601 #ifndef ROCKSDB_LITE
602 const InternalStats* const internal_stats = cfd->internal_stats();
603 assert(internal_stats);
604
605 const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
606 ASSERT_FALSE(compaction_stats.empty());
607
608 if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
609 ASSERT_EQ(compaction_stats[0].bytes_written, 0);
610 ASSERT_EQ(compaction_stats[0].num_output_files, 0);
611 } else {
612 // SST file writing succeeded; blob file writing failed (during Finish)
613 ASSERT_GT(compaction_stats[0].bytes_written, 0);
614 ASSERT_EQ(compaction_stats[0].num_output_files, 1);
615 }
616
617 const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
618 ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
619 compaction_stats[0].bytes_written);
620 #endif // ROCKSDB_LITE
621 }
622
623 TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
624 Options options = CurrentOptions();
625 options.create_if_missing = true;
626 options.atomic_flush = GetParam();
627 options.write_buffer_size = (static_cast<size_t>(64) << 20);
628
629 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
630 size_t num_cfs = handles_.size();
631 ASSERT_EQ(3, num_cfs);
632 WriteOptions wopts;
633 wopts.disableWAL = true;
634 for (size_t i = 0; i != num_cfs; ++i) {
635 ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
636 }
637 std::vector<int> cf_ids;
638 for (size_t i = 0; i != num_cfs; ++i) {
639 cf_ids.emplace_back(static_cast<int>(i));
640 }
641 ASSERT_OK(Flush(cf_ids));
642 for (size_t i = 0; i != num_cfs; ++i) {
643 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
644 ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
645 ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
646 }
647 }
648
649 TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) {
650 Options options = CurrentOptions();
651 options.create_if_missing = true;
652 options.atomic_flush = GetParam();
653 // 4KB so that we can easily trigger auto flush.
654 options.write_buffer_size = 4096;
655
656 SyncPoint::GetInstance()->LoadDependency(
657 {{"DBImpl::BackgroundCallFlush:FlushFinish:0",
658 "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"}});
659 SyncPoint::GetInstance()->EnableProcessing();
660
661 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
662 size_t num_cfs = handles_.size();
663 ASSERT_EQ(3, num_cfs);
664 WriteOptions wopts;
665 wopts.disableWAL = true;
666 for (size_t i = 0; i != num_cfs; ++i) {
667 ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
668 }
669 // Keep writing to one of them column families to trigger auto flush.
670 for (int i = 0; i != 4000; ++i) {
671 ASSERT_OK(Put(static_cast<int>(num_cfs) - 1 /*cf*/,
672 "key" + std::to_string(i), "value" + std::to_string(i),
673 wopts));
674 }
675
676 TEST_SYNC_POINT(
677 "DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck");
678 if (options.atomic_flush) {
679 for (size_t i = 0; i + 1 != num_cfs; ++i) {
680 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
681 ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
682 ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
683 }
684 } else {
685 for (size_t i = 0; i + 1 != num_cfs; ++i) {
686 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
687 ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
688 ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
689 }
690 }
691 SyncPoint::GetInstance()->DisableProcessing();
692 }
693
694 TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) {
695 bool atomic_flush = GetParam();
696 if (!atomic_flush) {
697 return;
698 }
699 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
700 new FaultInjectionTestEnv(env_));
701 Options options = CurrentOptions();
702 options.create_if_missing = true;
703 options.atomic_flush = atomic_flush;
704 options.env = fault_injection_env.get();
705 SyncPoint::GetInstance()->DisableProcessing();
706 SyncPoint::GetInstance()->LoadDependency(
707 {{"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1",
708 "DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1"},
709 {"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2",
710 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"}});
711 SyncPoint::GetInstance()->EnableProcessing();
712
713 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
714 size_t num_cfs = handles_.size();
715 ASSERT_EQ(3, num_cfs);
716 WriteOptions wopts;
717 wopts.disableWAL = true;
718 for (size_t i = 0; i != num_cfs; ++i) {
719 int cf_id = static_cast<int>(i);
720 ASSERT_OK(Put(cf_id, "key", "value", wopts));
721 }
722 FlushOptions flush_opts;
723 flush_opts.wait = false;
724 ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
725 TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1");
726 fault_injection_env->SetFilesystemActive(false);
727 TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2");
728 for (auto* cfh : handles_) {
729 // Returns the IO error happend during flush.
730 ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable(cfh));
731 }
732 for (size_t i = 0; i != num_cfs; ++i) {
733 auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
734 ASSERT_EQ(1, cfh->cfd()->imm()->NumNotFlushed());
735 ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
736 }
737 fault_injection_env->SetFilesystemActive(true);
738 Destroy(options);
739 }
740
741 TEST_P(DBAtomicFlushTest, FlushMultipleCFs_DropSomeBeforeRequestFlush) {
742 bool atomic_flush = GetParam();
743 if (!atomic_flush) {
744 return;
745 }
746 Options options = CurrentOptions();
747 options.create_if_missing = true;
748 options.atomic_flush = atomic_flush;
749 SyncPoint::GetInstance()->DisableProcessing();
750 SyncPoint::GetInstance()->ClearAllCallBacks();
751 SyncPoint::GetInstance()->EnableProcessing();
752
753 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
754 size_t num_cfs = handles_.size();
755 ASSERT_EQ(3, num_cfs);
756 WriteOptions wopts;
757 wopts.disableWAL = true;
758 std::vector<int> cf_ids;
759 for (size_t i = 0; i != num_cfs; ++i) {
760 int cf_id = static_cast<int>(i);
761 ASSERT_OK(Put(cf_id, "key", "value", wopts));
762 cf_ids.push_back(cf_id);
763 }
764 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
765 ASSERT_TRUE(Flush(cf_ids).IsColumnFamilyDropped());
766 Destroy(options);
767 }
768
769 TEST_P(DBAtomicFlushTest,
770 FlushMultipleCFs_DropSomeAfterScheduleFlushBeforeFlushJobRun) {
771 bool atomic_flush = GetParam();
772 if (!atomic_flush) {
773 return;
774 }
775 Options options = CurrentOptions();
776 options.create_if_missing = true;
777 options.atomic_flush = atomic_flush;
778
779 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
780
781 SyncPoint::GetInstance()->DisableProcessing();
782 SyncPoint::GetInstance()->ClearAllCallBacks();
783 SyncPoint::GetInstance()->LoadDependency(
784 {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
785 "DBAtomicFlushTest::BeforeDropCF"},
786 {"DBAtomicFlushTest::AfterDropCF",
787 "DBImpl::BackgroundCallFlush:start"}});
788 SyncPoint::GetInstance()->EnableProcessing();
789
790 size_t num_cfs = handles_.size();
791 ASSERT_EQ(3, num_cfs);
792 WriteOptions wopts;
793 wopts.disableWAL = true;
794 for (size_t i = 0; i != num_cfs; ++i) {
795 int cf_id = static_cast<int>(i);
796 ASSERT_OK(Put(cf_id, "key", "value", wopts));
797 }
798 port::Thread user_thread([&]() {
799 TEST_SYNC_POINT("DBAtomicFlushTest::BeforeDropCF");
800 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
801 TEST_SYNC_POINT("DBAtomicFlushTest::AfterDropCF");
802 });
803 FlushOptions flush_opts;
804 flush_opts.wait = true;
805 ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
806 user_thread.join();
807 for (size_t i = 0; i != num_cfs; ++i) {
808 int cf_id = static_cast<int>(i);
809 ASSERT_EQ("value", Get(cf_id, "key"));
810 }
811
812 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "eevee"}, options);
813 num_cfs = handles_.size();
814 ASSERT_EQ(2, num_cfs);
815 for (size_t i = 0; i != num_cfs; ++i) {
816 int cf_id = static_cast<int>(i);
817 ASSERT_EQ("value", Get(cf_id, "key"));
818 }
819 Destroy(options);
820 }
821
822 TEST_P(DBAtomicFlushTest, TriggerFlushAndClose) {
823 bool atomic_flush = GetParam();
824 if (!atomic_flush) {
825 return;
826 }
827 const int kNumKeysTriggerFlush = 4;
828 Options options = CurrentOptions();
829 options.create_if_missing = true;
830 options.atomic_flush = atomic_flush;
831 options.memtable_factory.reset(
832 new SpecialSkipListFactory(kNumKeysTriggerFlush));
833 CreateAndReopenWithCF({"pikachu"}, options);
834
835 for (int i = 0; i != kNumKeysTriggerFlush; ++i) {
836 ASSERT_OK(Put(0, "key" + std::to_string(i), "value" + std::to_string(i)));
837 }
838 SyncPoint::GetInstance()->DisableProcessing();
839 SyncPoint::GetInstance()->ClearAllCallBacks();
840 SyncPoint::GetInstance()->EnableProcessing();
841 ASSERT_OK(Put(0, "key", "value"));
842 Close();
843
844 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
845 ASSERT_EQ("value", Get(0, "key"));
846 }
847
848 TEST_P(DBAtomicFlushTest, PickMemtablesRaceWithBackgroundFlush) {
849 bool atomic_flush = GetParam();
850 Options options = CurrentOptions();
851 options.create_if_missing = true;
852 options.atomic_flush = atomic_flush;
853 options.max_write_buffer_number = 4;
854 // Set min_write_buffer_number_to_merge to be greater than 1, so that
855 // a column family with one memtable in the imm will not cause IsFlushPending
856 // to return true when flush_requested_ is false.
857 options.min_write_buffer_number_to_merge = 2;
858 CreateAndReopenWithCF({"pikachu"}, options);
859 ASSERT_EQ(2, handles_.size());
860 ASSERT_OK(dbfull()->PauseBackgroundWork());
861 ASSERT_OK(Put(0, "key00", "value00"));
862 ASSERT_OK(Put(1, "key10", "value10"));
863 FlushOptions flush_opts;
864 flush_opts.wait = false;
865 ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
866 ASSERT_OK(Put(0, "key01", "value01"));
867 // Since max_write_buffer_number is 4, the following flush won't cause write
868 // stall.
869 ASSERT_OK(dbfull()->Flush(flush_opts));
870 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
871 ASSERT_OK(dbfull()->DestroyColumnFamilyHandle(handles_[1]));
872 handles_[1] = nullptr;
873 ASSERT_OK(dbfull()->ContinueBackgroundWork());
874 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
875 delete handles_[0];
876 handles_.clear();
877 }
878
879 TEST_P(DBAtomicFlushTest, CFDropRaceWithWaitForFlushMemTables) {
880 bool atomic_flush = GetParam();
881 if (!atomic_flush) {
882 return;
883 }
884 Options options = CurrentOptions();
885 options.create_if_missing = true;
886 options.atomic_flush = atomic_flush;
887 CreateAndReopenWithCF({"pikachu"}, options);
888 SyncPoint::GetInstance()->DisableProcessing();
889 SyncPoint::GetInstance()->LoadDependency(
890 {{"DBImpl::AtomicFlushMemTables:AfterScheduleFlush",
891 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop"},
892 {"DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree",
893 "DBImpl::BackgroundCallFlush:start"},
894 {"DBImpl::BackgroundCallFlush:start",
895 "DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush"}});
896 SyncPoint::GetInstance()->EnableProcessing();
897 ASSERT_EQ(2, handles_.size());
898 ASSERT_OK(Put(0, "key", "value"));
899 ASSERT_OK(Put(1, "key", "value"));
900 auto* cfd_default =
901 static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily())
902 ->cfd();
903 auto* cfd_pikachu = static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
904 port::Thread drop_cf_thr([&]() {
905 TEST_SYNC_POINT(
906 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:BeforeDrop");
907 ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
908 delete handles_[1];
909 handles_.resize(1);
910 TEST_SYNC_POINT(
911 "DBAtomicFlushTest::CFDropRaceWithWaitForFlushMemTables:AfterFree");
912 });
913 FlushOptions flush_opts;
914 flush_opts.allow_write_stall = true;
915 ASSERT_OK(dbfull()->TEST_AtomicFlushMemTables({cfd_default, cfd_pikachu},
916 flush_opts));
917 drop_cf_thr.join();
918 Close();
919 SyncPoint::GetInstance()->DisableProcessing();
920 }
921
922 TEST_P(DBAtomicFlushTest, RollbackAfterFailToInstallResults) {
923 bool atomic_flush = GetParam();
924 if (!atomic_flush) {
925 return;
926 }
927 auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
928 Options options = CurrentOptions();
929 options.env = fault_injection_env.get();
930 options.create_if_missing = true;
931 options.atomic_flush = atomic_flush;
932 CreateAndReopenWithCF({"pikachu"}, options);
933 ASSERT_EQ(2, handles_.size());
934 for (size_t cf = 0; cf < handles_.size(); ++cf) {
935 ASSERT_OK(Put(static_cast<int>(cf), "a", "value"));
936 }
937 SyncPoint::GetInstance()->DisableProcessing();
938 SyncPoint::GetInstance()->ClearAllCallBacks();
939 SyncPoint::GetInstance()->SetCallBack(
940 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
941 [&](void* /*arg*/) { fault_injection_env->SetFilesystemActive(false); });
942 SyncPoint::GetInstance()->EnableProcessing();
943 FlushOptions flush_opts;
944 Status s = db_->Flush(flush_opts, handles_);
945 ASSERT_NOK(s);
946 fault_injection_env->SetFilesystemActive(true);
947 Close();
948 SyncPoint::GetInstance()->ClearAllCallBacks();
949 }
950
951 INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
952 testing::Bool());
953
954 INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest, DBAtomicFlushTest, testing::Bool());
955
956 } // namespace ROCKSDB_NAMESPACE
957
958 int main(int argc, char** argv) {
959 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
960 ::testing::InitGoogleTest(&argc, argv);
961 return RUN_ALL_TESTS();
962 }