]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_wal_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_wal_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_test_util.h"
11 #include "env/composite_env_wrapper.h"
12 #include "options/options_helper.h"
13 #include "port/port.h"
14 #include "port/stack_trace.h"
15 #include "test_util/sync_point.h"
16 #include "utilities/fault_injection_env.h"
17
18 namespace ROCKSDB_NAMESPACE {
19 class DBWALTestBase : public DBTestBase {
20 protected:
21 explicit DBWALTestBase(const std::string& dir_name)
22 : DBTestBase(dir_name, /*env_do_fsync=*/true) {}
23
24 #if defined(ROCKSDB_PLATFORM_POSIX)
25 public:
26 uint64_t GetAllocatedFileSize(std::string file_name) {
27 struct stat sbuf;
28 int err = stat(file_name.c_str(), &sbuf);
29 assert(err == 0);
30 return sbuf.st_blocks * 512;
31 }
32 #endif
33 };
34
35 class DBWALTest : public DBWALTestBase {
36 public:
37 DBWALTest() : DBWALTestBase("/db_wal_test") {}
38 };
39
40 // A SpecialEnv enriched to give more insight about deleted files
41 class EnrichedSpecialEnv : public SpecialEnv {
42 public:
43 explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {}
44 Status NewSequentialFile(const std::string& f,
45 std::unique_ptr<SequentialFile>* r,
46 const EnvOptions& soptions) override {
47 InstrumentedMutexLock l(&env_mutex_);
48 if (f == skipped_wal) {
49 deleted_wal_reopened = true;
50 if (IsWAL(f) && largest_deleted_wal.size() != 0 &&
51 f.compare(largest_deleted_wal) <= 0) {
52 gap_in_wals = true;
53 }
54 }
55 return SpecialEnv::NewSequentialFile(f, r, soptions);
56 }
57 Status DeleteFile(const std::string& fname) override {
58 if (IsWAL(fname)) {
59 deleted_wal_cnt++;
60 InstrumentedMutexLock l(&env_mutex_);
61 // If this is the first WAL, remember its name and skip deleting it. We
62 // remember its name partly because the application might attempt to
63 // delete the file again.
64 if (skipped_wal.size() != 0 && skipped_wal != fname) {
65 if (largest_deleted_wal.size() == 0 ||
66 largest_deleted_wal.compare(fname) < 0) {
67 largest_deleted_wal = fname;
68 }
69 } else {
70 skipped_wal = fname;
71 return Status::OK();
72 }
73 }
74 return SpecialEnv::DeleteFile(fname);
75 }
76 bool IsWAL(const std::string& fname) {
77 // printf("iswal %s\n", fname.c_str());
78 return fname.compare(fname.size() - 3, 3, "log") == 0;
79 }
80
81 InstrumentedMutex env_mutex_;
82 // the wal whose actual delete was skipped by the env
83 std::string skipped_wal = "";
84 // the largest WAL that was requested to be deleted
85 std::string largest_deleted_wal = "";
86 // number of WALs that were successfully deleted
87 std::atomic<size_t> deleted_wal_cnt = {0};
88 // the WAL whose delete from fs was skipped is reopened during recovery
89 std::atomic<bool> deleted_wal_reopened = {false};
90 // whether a gap in the WALs was detected during recovery
91 std::atomic<bool> gap_in_wals = {false};
92 };
93
94 class DBWALTestWithEnrichedEnv : public DBTestBase {
95 public:
96 DBWALTestWithEnrichedEnv()
97 : DBTestBase("/db_wal_test", /*env_do_fsync=*/true) {
98 enriched_env_ = new EnrichedSpecialEnv(env_->target());
99 auto options = CurrentOptions();
100 options.env = enriched_env_;
101 options.allow_2pc = true;
102 Reopen(options);
103 delete env_;
104 // to be deleted by the parent class
105 env_ = enriched_env_;
106 }
107
108 protected:
109 EnrichedSpecialEnv* enriched_env_;
110 };
111
112 // Test that the recovery would successfully avoid the gaps between the logs.
113 // One known scenario that could cause this is that the application issue the
114 // WAL deletion out of order. For the sake of simplicity in the test, here we
115 // create the gap by manipulating the env to skip deletion of the first WAL but
116 // not the ones after it.
117 TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) {
118 auto options = last_options_;
119 // To cause frequent WAL deletion
120 options.write_buffer_size = 128;
121 Reopen(options);
122
123 WriteOptions writeOpt = WriteOptions();
124 for (int i = 0; i < 128 * 5; i++) {
125 ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
126 }
127 FlushOptions fo;
128 fo.wait = true;
129 ASSERT_OK(db_->Flush(fo));
130
131 // some wals are deleted
132 ASSERT_NE(0, enriched_env_->deleted_wal_cnt);
133 // but not the first one
134 ASSERT_NE(0, enriched_env_->skipped_wal.size());
135
136 // Test that the WAL that was not deleted will be skipped during recovery
137 options = last_options_;
138 Reopen(options);
139 ASSERT_FALSE(enriched_env_->deleted_wal_reopened);
140 ASSERT_FALSE(enriched_env_->gap_in_wals);
141 }
142
143 TEST_F(DBWALTest, WAL) {
144 do {
145 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
146 WriteOptions writeOpt = WriteOptions();
147 writeOpt.disableWAL = true;
148 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
149 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
150
151 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
152 ASSERT_EQ("v1", Get(1, "foo"));
153 ASSERT_EQ("v1", Get(1, "bar"));
154
155 writeOpt.disableWAL = false;
156 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
157 writeOpt.disableWAL = true;
158 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
159
160 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
161 // Both value's should be present.
162 ASSERT_EQ("v2", Get(1, "bar"));
163 ASSERT_EQ("v2", Get(1, "foo"));
164
165 writeOpt.disableWAL = true;
166 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
167 writeOpt.disableWAL = false;
168 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
169
170 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
171 // again both values should be present.
172 ASSERT_EQ("v3", Get(1, "foo"));
173 ASSERT_EQ("v3", Get(1, "bar"));
174 } while (ChangeWalOptions());
175 }
176
177 TEST_F(DBWALTest, RollLog) {
178 do {
179 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
180 ASSERT_OK(Put(1, "foo", "v1"));
181 ASSERT_OK(Put(1, "baz", "v5"));
182
183 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
184 for (int i = 0; i < 10; i++) {
185 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
186 }
187 ASSERT_OK(Put(1, "foo", "v4"));
188 for (int i = 0; i < 10; i++) {
189 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
190 }
191 } while (ChangeWalOptions());
192 }
193
194 TEST_F(DBWALTest, SyncWALNotBlockWrite) {
195 Options options = CurrentOptions();
196 options.max_write_buffer_number = 4;
197 DestroyAndReopen(options);
198
199 ASSERT_OK(Put("foo1", "bar1"));
200 ASSERT_OK(Put("foo5", "bar5"));
201
202 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
203 {"WritableFileWriter::SyncWithoutFlush:1",
204 "DBWALTest::SyncWALNotBlockWrite:1"},
205 {"DBWALTest::SyncWALNotBlockWrite:2",
206 "WritableFileWriter::SyncWithoutFlush:2"},
207 });
208 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
209
210 ROCKSDB_NAMESPACE::port::Thread thread([&]() { ASSERT_OK(db_->SyncWAL()); });
211
212 TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:1");
213 ASSERT_OK(Put("foo2", "bar2"));
214 ASSERT_OK(Put("foo3", "bar3"));
215 FlushOptions fo;
216 fo.wait = false;
217 ASSERT_OK(db_->Flush(fo));
218 ASSERT_OK(Put("foo4", "bar4"));
219
220 TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:2");
221
222 thread.join();
223
224 ASSERT_EQ(Get("foo1"), "bar1");
225 ASSERT_EQ(Get("foo2"), "bar2");
226 ASSERT_EQ(Get("foo3"), "bar3");
227 ASSERT_EQ(Get("foo4"), "bar4");
228 ASSERT_EQ(Get("foo5"), "bar5");
229 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
230 }
231
232 TEST_F(DBWALTest, SyncWALNotWaitWrite) {
233 ASSERT_OK(Put("foo1", "bar1"));
234 ASSERT_OK(Put("foo3", "bar3"));
235
236 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
237 {"SpecialEnv::WalFile::Append:1", "DBWALTest::SyncWALNotWaitWrite:1"},
238 {"DBWALTest::SyncWALNotWaitWrite:2", "SpecialEnv::WalFile::Append:2"},
239 });
240 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
241
242 ROCKSDB_NAMESPACE::port::Thread thread(
243 [&]() { ASSERT_OK(Put("foo2", "bar2")); });
244 // Moving this to SyncWAL before the actual fsync
245 // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
246 ASSERT_OK(db_->SyncWAL());
247 // Moving this to SyncWAL after actual fsync
248 // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
249
250 thread.join();
251
252 ASSERT_EQ(Get("foo1"), "bar1");
253 ASSERT_EQ(Get("foo2"), "bar2");
254 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
255 }
256
257 TEST_F(DBWALTest, Recover) {
258 do {
259 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
260 ASSERT_OK(Put(1, "foo", "v1"));
261 ASSERT_OK(Put(1, "baz", "v5"));
262
263 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
264 ASSERT_EQ("v1", Get(1, "foo"));
265
266 ASSERT_EQ("v1", Get(1, "foo"));
267 ASSERT_EQ("v5", Get(1, "baz"));
268 ASSERT_OK(Put(1, "bar", "v2"));
269 ASSERT_OK(Put(1, "foo", "v3"));
270
271 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
272 ASSERT_EQ("v3", Get(1, "foo"));
273 ASSERT_OK(Put(1, "foo", "v4"));
274 ASSERT_EQ("v4", Get(1, "foo"));
275 ASSERT_EQ("v2", Get(1, "bar"));
276 ASSERT_EQ("v5", Get(1, "baz"));
277 } while (ChangeWalOptions());
278 }
279
280 TEST_F(DBWALTest, RecoverWithTableHandle) {
281 do {
282 Options options = CurrentOptions();
283 options.create_if_missing = true;
284 options.disable_auto_compactions = true;
285 options.avoid_flush_during_recovery = false;
286 DestroyAndReopen(options);
287 CreateAndReopenWithCF({"pikachu"}, options);
288
289 ASSERT_OK(Put(1, "foo", "v1"));
290 ASSERT_OK(Put(1, "bar", "v2"));
291 ASSERT_OK(Flush(1));
292 ASSERT_OK(Put(1, "foo", "v3"));
293 ASSERT_OK(Put(1, "bar", "v4"));
294 ASSERT_OK(Flush(1));
295 ASSERT_OK(Put(1, "big", std::string(100, 'a')));
296
297 options = CurrentOptions();
298 const int kSmallMaxOpenFiles = 13;
299 if (option_config_ == kDBLogDir) {
300 // Use this option to check not preloading files
301 // Set the max open files to be small enough so no preload will
302 // happen.
303 options.max_open_files = kSmallMaxOpenFiles;
304 // RocksDB sanitize max open files to at least 20. Modify it back.
305 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
306 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
307 int* max_open_files = static_cast<int*>(arg);
308 *max_open_files = kSmallMaxOpenFiles;
309 });
310
311 } else if (option_config_ == kWalDirAndMmapReads) {
312 // Use this option to check always loading all files.
313 options.max_open_files = 100;
314 } else {
315 options.max_open_files = -1;
316 }
317 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
318 ReopenWithColumnFamilies({"default", "pikachu"}, options);
319 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
320 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
321
322 std::vector<std::vector<FileMetaData>> files;
323 dbfull()->TEST_GetFilesMetaData(handles_[1], &files);
324 size_t total_files = 0;
325 for (const auto& level : files) {
326 total_files += level.size();
327 }
328 ASSERT_EQ(total_files, 3);
329 for (const auto& level : files) {
330 for (const auto& file : level) {
331 if (options.max_open_files == kSmallMaxOpenFiles) {
332 ASSERT_TRUE(file.table_reader_handle == nullptr);
333 } else {
334 ASSERT_TRUE(file.table_reader_handle != nullptr);
335 }
336 }
337 }
338 } while (ChangeWalOptions());
339 }
340
341 TEST_F(DBWALTest, RecoverWithBlob) {
342 // Write a value that's below the prospective size limit for blobs and another
343 // one that's above. Note that blob files are not actually enabled at this
344 // point.
345 constexpr uint64_t min_blob_size = 10;
346
347 constexpr char short_value[] = "short";
348 static_assert(sizeof(short_value) - 1 < min_blob_size,
349 "short_value too long");
350
351 constexpr char long_value[] = "long_value";
352 static_assert(sizeof(long_value) - 1 >= min_blob_size,
353 "long_value too short");
354
355 ASSERT_OK(Put("key1", short_value));
356 ASSERT_OK(Put("key2", long_value));
357
358 // There should be no files just yet since we haven't flushed.
359 {
360 VersionSet* const versions = dbfull()->TEST_GetVersionSet();
361 assert(versions);
362
363 ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
364 assert(cfd);
365
366 Version* const current = cfd->current();
367 assert(current);
368
369 const VersionStorageInfo* const storage_info = current->storage_info();
370 assert(storage_info);
371
372 ASSERT_EQ(storage_info->num_non_empty_levels(), 0);
373 ASSERT_TRUE(storage_info->GetBlobFiles().empty());
374 }
375
376 // Reopen the database with blob files enabled. A new table file/blob file
377 // pair should be written during recovery.
378 Options options;
379 options.enable_blob_files = true;
380 options.min_blob_size = min_blob_size;
381 options.avoid_flush_during_recovery = false;
382 options.disable_auto_compactions = true;
383 options.env = env_;
384
385 Reopen(options);
386
387 ASSERT_EQ(Get("key1"), short_value);
388 ASSERT_EQ(Get("key2"), long_value);
389
390 VersionSet* const versions = dbfull()->TEST_GetVersionSet();
391 assert(versions);
392
393 ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
394 assert(cfd);
395
396 Version* const current = cfd->current();
397 assert(current);
398
399 const VersionStorageInfo* const storage_info = current->storage_info();
400 assert(storage_info);
401
402 const auto& l0_files = storage_info->LevelFiles(0);
403 ASSERT_EQ(l0_files.size(), 1);
404
405 const FileMetaData* const table_file = l0_files[0];
406 assert(table_file);
407
408 const auto& blob_files = storage_info->GetBlobFiles();
409 ASSERT_EQ(blob_files.size(), 1);
410
411 const auto& blob_file = blob_files.begin()->second;
412 assert(blob_file);
413
414 ASSERT_EQ(table_file->smallest.user_key(), "key1");
415 ASSERT_EQ(table_file->largest.user_key(), "key2");
416 ASSERT_EQ(table_file->fd.smallest_seqno, 1);
417 ASSERT_EQ(table_file->fd.largest_seqno, 2);
418 ASSERT_EQ(table_file->oldest_blob_file_number,
419 blob_file->GetBlobFileNumber());
420
421 ASSERT_EQ(blob_file->GetTotalBlobCount(), 1);
422
423 #ifndef ROCKSDB_LITE
424 const InternalStats* const internal_stats = cfd->internal_stats();
425 assert(internal_stats);
426
427 const uint64_t expected_bytes =
428 table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes();
429
430 const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
431 ASSERT_FALSE(compaction_stats.empty());
432 ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes);
433 ASSERT_EQ(compaction_stats[0].num_output_files, 2);
434
435 const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
436 ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes);
437 #endif // ROCKSDB_LITE
438 }
439
440 class DBRecoveryTestBlobError
441 : public DBWALTest,
442 public testing::WithParamInterface<std::string> {
443 public:
444 DBRecoveryTestBlobError()
445 : fault_injection_env_(env_), sync_point_(GetParam()) {}
446 ~DBRecoveryTestBlobError() { Close(); }
447
448 FaultInjectionTestEnv fault_injection_env_;
449 std::string sync_point_;
450 };
451
452 INSTANTIATE_TEST_CASE_P(DBRecoveryTestBlobError, DBRecoveryTestBlobError,
453 ::testing::ValuesIn(std::vector<std::string>{
454 "BlobFileBuilder::WriteBlobToFile:AddRecord",
455 "BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
456
457 TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) {
458 // Write a value. Note that blob files are not actually enabled at this point.
459 ASSERT_OK(Put("key", "blob"));
460
461 // Reopen with blob files enabled but make blob file writing fail during
462 // recovery.
463 SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
464 fault_injection_env_.SetFilesystemActive(false,
465 Status::IOError(sync_point_));
466 });
467 SyncPoint::GetInstance()->SetCallBack(
468 "BuildTable:BeforeDeleteFile", [this](void* /* arg */) {
469 fault_injection_env_.SetFilesystemActive(true);
470 });
471 SyncPoint::GetInstance()->EnableProcessing();
472
473 Options options;
474 options.enable_blob_files = true;
475 options.avoid_flush_during_recovery = false;
476 options.disable_auto_compactions = true;
477 options.env = &fault_injection_env_;
478
479 ASSERT_NOK(TryReopen(options));
480
481 SyncPoint::GetInstance()->DisableProcessing();
482 SyncPoint::GetInstance()->ClearAllCallBacks();
483
484 // Make sure the files generated by the failed recovery have been deleted.
485 std::vector<std::string> files;
486 ASSERT_OK(env_->GetChildren(dbname_, &files));
487 for (const auto& file : files) {
488 uint64_t number = 0;
489 FileType type = kTableFile;
490
491 if (!ParseFileName(file, &number, &type)) {
492 continue;
493 }
494
495 ASSERT_NE(type, kTableFile);
496 ASSERT_NE(type, kBlobFile);
497 }
498 }
499
500 TEST_F(DBWALTest, IgnoreRecoveredLog) {
501 std::string backup_logs = dbname_ + "/backup_logs";
502
503 do {
504 // delete old files in backup_logs directory
505 env_->CreateDirIfMissing(backup_logs);
506 std::vector<std::string> old_files;
507 env_->GetChildren(backup_logs, &old_files);
508 for (auto& file : old_files) {
509 if (file != "." && file != "..") {
510 env_->DeleteFile(backup_logs + "/" + file);
511 }
512 }
513 Options options = CurrentOptions();
514 options.create_if_missing = true;
515 options.merge_operator = MergeOperators::CreateUInt64AddOperator();
516 options.wal_dir = dbname_ + "/logs";
517 DestroyAndReopen(options);
518
519 // fill up the DB
520 std::string one, two;
521 PutFixed64(&one, 1);
522 PutFixed64(&two, 2);
523 ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
524 ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
525 ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one)));
526
527 // copy the logs to backup
528 std::vector<std::string> logs;
529 env_->GetChildren(options.wal_dir, &logs);
530 for (auto& log : logs) {
531 if (log != ".." && log != ".") {
532 CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log);
533 }
534 }
535
536 // recover the DB
537 Reopen(options);
538 ASSERT_EQ(two, Get("foo"));
539 ASSERT_EQ(one, Get("bar"));
540 Close();
541
542 // copy the logs from backup back to wal dir
543 for (auto& log : logs) {
544 if (log != ".." && log != ".") {
545 CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
546 }
547 }
548 // this should ignore the log files, recovery should not happen again
549 // if the recovery happens, the same merge operator would be called twice,
550 // leading to incorrect results
551 Reopen(options);
552 ASSERT_EQ(two, Get("foo"));
553 ASSERT_EQ(one, Get("bar"));
554 Close();
555 Destroy(options);
556 Reopen(options);
557 Close();
558
559 // copy the logs from backup back to wal dir
560 env_->CreateDirIfMissing(options.wal_dir);
561 for (auto& log : logs) {
562 if (log != ".." && log != ".") {
563 CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
564 }
565 }
566 // assert that we successfully recovered only from logs, even though we
567 // destroyed the DB
568 Reopen(options);
569 ASSERT_EQ(two, Get("foo"));
570 ASSERT_EQ(one, Get("bar"));
571
572 // Recovery will fail if DB directory doesn't exist.
573 Destroy(options);
574 // copy the logs from backup back to wal dir
575 env_->CreateDirIfMissing(options.wal_dir);
576 for (auto& log : logs) {
577 if (log != ".." && log != ".") {
578 CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
579 // we won't be needing this file no more
580 env_->DeleteFile(backup_logs + "/" + log);
581 }
582 }
583 Status s = TryReopen(options);
584 ASSERT_TRUE(!s.ok());
585 Destroy(options);
586 } while (ChangeWalOptions());
587 }
588
589 TEST_F(DBWALTest, RecoveryWithEmptyLog) {
590 do {
591 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
592 ASSERT_OK(Put(1, "foo", "v1"));
593 ASSERT_OK(Put(1, "foo", "v2"));
594 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
595 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
596 ASSERT_OK(Put(1, "foo", "v3"));
597 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
598 ASSERT_EQ("v3", Get(1, "foo"));
599 } while (ChangeWalOptions());
600 }
601
602 #if !(defined NDEBUG) || !defined(OS_WIN)
603 TEST_F(DBWALTest, PreallocateBlock) {
604 Options options = CurrentOptions();
605 options.write_buffer_size = 10 * 1000 * 1000;
606 options.max_total_wal_size = 0;
607
608 size_t expected_preallocation_size = static_cast<size_t>(
609 options.write_buffer_size + options.write_buffer_size / 10);
610
611 DestroyAndReopen(options);
612
613 std::atomic<int> called(0);
614 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
615 "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
616 ASSERT_TRUE(arg != nullptr);
617 size_t preallocation_size = *(static_cast<size_t*>(arg));
618 ASSERT_EQ(expected_preallocation_size, preallocation_size);
619 called.fetch_add(1);
620 });
621 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
622 Put("", "");
623 Flush();
624 Put("", "");
625 Close();
626 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
627 ASSERT_EQ(2, called.load());
628
629 options.max_total_wal_size = 1000 * 1000;
630 expected_preallocation_size = static_cast<size_t>(options.max_total_wal_size);
631 Reopen(options);
632 called.store(0);
633 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
634 "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
635 ASSERT_TRUE(arg != nullptr);
636 size_t preallocation_size = *(static_cast<size_t*>(arg));
637 ASSERT_EQ(expected_preallocation_size, preallocation_size);
638 called.fetch_add(1);
639 });
640 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
641 Put("", "");
642 Flush();
643 Put("", "");
644 Close();
645 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
646 ASSERT_EQ(2, called.load());
647
648 options.db_write_buffer_size = 800 * 1000;
649 expected_preallocation_size =
650 static_cast<size_t>(options.db_write_buffer_size);
651 Reopen(options);
652 called.store(0);
653 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
654 "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
655 ASSERT_TRUE(arg != nullptr);
656 size_t preallocation_size = *(static_cast<size_t*>(arg));
657 ASSERT_EQ(expected_preallocation_size, preallocation_size);
658 called.fetch_add(1);
659 });
660 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
661 Put("", "");
662 Flush();
663 Put("", "");
664 Close();
665 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
666 ASSERT_EQ(2, called.load());
667
668 expected_preallocation_size = 700 * 1000;
669 std::shared_ptr<WriteBufferManager> write_buffer_manager =
670 std::make_shared<WriteBufferManager>(static_cast<uint64_t>(700 * 1000));
671 options.write_buffer_manager = write_buffer_manager;
672 Reopen(options);
673 called.store(0);
674 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
675 "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
676 ASSERT_TRUE(arg != nullptr);
677 size_t preallocation_size = *(static_cast<size_t*>(arg));
678 ASSERT_EQ(expected_preallocation_size, preallocation_size);
679 called.fetch_add(1);
680 });
681 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
682 Put("", "");
683 Flush();
684 Put("", "");
685 Close();
686 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
687 ASSERT_EQ(2, called.load());
688 }
689 #endif // !(defined NDEBUG) || !defined(OS_WIN)
690
691 #ifndef ROCKSDB_LITE
692 TEST_F(DBWALTest, DISABLED_FullPurgePreservesRecycledLog) {
693 // TODO(ajkr): Disabled until WAL recycling is fixed for
694 // `kPointInTimeRecovery`.
695
696 // For github issue #1303
697 for (int i = 0; i < 2; ++i) {
698 Options options = CurrentOptions();
699 options.create_if_missing = true;
700 options.recycle_log_file_num = 2;
701 if (i != 0) {
702 options.wal_dir = alternative_wal_dir_;
703 }
704
705 DestroyAndReopen(options);
706 ASSERT_OK(Put("foo", "v1"));
707 VectorLogPtr log_files;
708 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
709 ASSERT_GT(log_files.size(), 0);
710 ASSERT_OK(Flush());
711
712 // Now the original WAL is in log_files[0] and should be marked for
713 // recycling.
714 // Verify full purge cannot remove this file.
715 JobContext job_context(0);
716 dbfull()->TEST_LockMutex();
717 dbfull()->FindObsoleteFiles(&job_context, true /* force */);
718 dbfull()->TEST_UnlockMutex();
719 dbfull()->PurgeObsoleteFiles(job_context);
720
721 if (i == 0) {
722 ASSERT_OK(
723 env_->FileExists(LogFileName(dbname_, log_files[0]->LogNumber())));
724 } else {
725 ASSERT_OK(env_->FileExists(
726 LogFileName(alternative_wal_dir_, log_files[0]->LogNumber())));
727 }
728 }
729 }
730
731 TEST_F(DBWALTest, DISABLED_FullPurgePreservesLogPendingReuse) {
732 // TODO(ajkr): Disabled until WAL recycling is fixed for
733 // `kPointInTimeRecovery`.
734
735 // Ensures full purge cannot delete a WAL while it's in the process of being
736 // recycled. In particular, we force the full purge after a file has been
737 // chosen for reuse, but before it has been renamed.
738 for (int i = 0; i < 2; ++i) {
739 Options options = CurrentOptions();
740 options.recycle_log_file_num = 1;
741 if (i != 0) {
742 options.wal_dir = alternative_wal_dir_;
743 }
744 DestroyAndReopen(options);
745
746 // The first flush creates a second log so writes can continue before the
747 // flush finishes.
748 ASSERT_OK(Put("foo", "bar"));
749 ASSERT_OK(Flush());
750
751 // The second flush can recycle the first log. Sync points enforce the
752 // full purge happens after choosing the log to recycle and before it is
753 // renamed.
754 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
755 {"DBImpl::CreateWAL:BeforeReuseWritableFile1",
756 "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"},
757 {"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge",
758 "DBImpl::CreateWAL:BeforeReuseWritableFile2"},
759 });
760 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
761 ROCKSDB_NAMESPACE::port::Thread thread([&]() {
762 TEST_SYNC_POINT(
763 "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge");
764 ASSERT_OK(db_->EnableFileDeletions(true));
765 TEST_SYNC_POINT(
766 "DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge");
767 });
768 ASSERT_OK(Put("foo", "bar"));
769 ASSERT_OK(Flush());
770 thread.join();
771 }
772 }
773
774 TEST_F(DBWALTest, GetSortedWalFiles) {
775 do {
776 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
777 VectorLogPtr log_files;
778 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
779 ASSERT_EQ(0, log_files.size());
780
781 ASSERT_OK(Put(1, "foo", "v1"));
782 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
783 ASSERT_EQ(1, log_files.size());
784 } while (ChangeWalOptions());
785 }
786
787 TEST_F(DBWALTest, GetCurrentWalFile) {
788 do {
789 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
790
791 std::unique_ptr<LogFile>* bad_log_file = nullptr;
792 ASSERT_NOK(dbfull()->GetCurrentWalFile(bad_log_file));
793
794 std::unique_ptr<LogFile> log_file;
795 ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
796
797 // nothing has been written to the log yet
798 ASSERT_EQ(log_file->StartSequence(), 0);
799 ASSERT_EQ(log_file->SizeFileBytes(), 0);
800 ASSERT_EQ(log_file->Type(), kAliveLogFile);
801 ASSERT_GT(log_file->LogNumber(), 0);
802
803 // add some data and verify that the file size actually moves foward
804 ASSERT_OK(Put(0, "foo", "v1"));
805 ASSERT_OK(Put(0, "foo2", "v2"));
806 ASSERT_OK(Put(0, "foo3", "v3"));
807
808 ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
809
810 ASSERT_EQ(log_file->StartSequence(), 0);
811 ASSERT_GT(log_file->SizeFileBytes(), 0);
812 ASSERT_EQ(log_file->Type(), kAliveLogFile);
813 ASSERT_GT(log_file->LogNumber(), 0);
814
815 // force log files to cycle and add some more data, then check if
816 // log number moves forward
817
818 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
819 for (int i = 0; i < 10; i++) {
820 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
821 }
822
823 ASSERT_OK(Put(0, "foo4", "v4"));
824 ASSERT_OK(Put(0, "foo5", "v5"));
825 ASSERT_OK(Put(0, "foo6", "v6"));
826
827 ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
828
829 ASSERT_EQ(log_file->StartSequence(), 0);
830 ASSERT_GT(log_file->SizeFileBytes(), 0);
831 ASSERT_EQ(log_file->Type(), kAliveLogFile);
832 ASSERT_GT(log_file->LogNumber(), 0);
833
834 } while (ChangeWalOptions());
835 }
836
837 TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) {
838 // Test for regression of WAL cleanup missing files that don't contain data
839 // for every column family.
840 do {
841 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
842 ASSERT_OK(Put(1, "foo", "v1"));
843 ASSERT_OK(Put(1, "foo", "v2"));
844 uint64_t earliest_log_nums[2];
845 for (int i = 0; i < 2; ++i) {
846 if (i > 0) {
847 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
848 }
849 VectorLogPtr log_files;
850 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
851 if (log_files.size() > 0) {
852 earliest_log_nums[i] = log_files[0]->LogNumber();
853 } else {
854 earliest_log_nums[i] = port::kMaxUint64;
855 }
856 }
857 // Check at least the first WAL was cleaned up during the recovery.
858 ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]);
859 } while (ChangeWalOptions());
860 }
861
862 TEST_F(DBWALTest, RecoverWithLargeLog) {
863 do {
864 {
865 Options options = CurrentOptions();
866 CreateAndReopenWithCF({"pikachu"}, options);
867 ASSERT_OK(Put(1, "big1", std::string(200000, '1')));
868 ASSERT_OK(Put(1, "big2", std::string(200000, '2')));
869 ASSERT_OK(Put(1, "small3", std::string(10, '3')));
870 ASSERT_OK(Put(1, "small4", std::string(10, '4')));
871 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
872 }
873
874 // Make sure that if we re-open with a small write buffer size that
875 // we flush table files in the middle of a large log file.
876 Options options;
877 options.write_buffer_size = 100000;
878 options = CurrentOptions(options);
879 ReopenWithColumnFamilies({"default", "pikachu"}, options);
880 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3);
881 ASSERT_EQ(std::string(200000, '1'), Get(1, "big1"));
882 ASSERT_EQ(std::string(200000, '2'), Get(1, "big2"));
883 ASSERT_EQ(std::string(10, '3'), Get(1, "small3"));
884 ASSERT_EQ(std::string(10, '4'), Get(1, "small4"));
885 ASSERT_GT(NumTableFilesAtLevel(0, 1), 1);
886 } while (ChangeWalOptions());
887 }
888
889 // In https://reviews.facebook.net/D20661 we change
890 // recovery behavior: previously for each log file each column family
891 // memtable was flushed, even it was empty. Now it's changed:
892 // we try to create the smallest number of table files by merging
893 // updates from multiple logs
894 TEST_F(DBWALTest, RecoverCheckFileAmountWithSmallWriteBuffer) {
895 Options options = CurrentOptions();
896 options.write_buffer_size = 5000000;
897 CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
898
899 // Since we will reopen DB with smaller write_buffer_size,
900 // each key will go to new SST file
901 ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
902 ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
903 ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
904 ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
905
906 ASSERT_OK(Put(3, Key(10), DummyString(1)));
907 // Make 'dobrynia' to be flushed and new WAL file to be created
908 ASSERT_OK(Put(2, Key(10), DummyString(7500000)));
909 ASSERT_OK(Put(2, Key(1), DummyString(1)));
910 dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
911 {
912 auto tables = ListTableFiles(env_, dbname_);
913 ASSERT_EQ(tables.size(), static_cast<size_t>(1));
914 // Make sure 'dobrynia' was flushed: check sst files amount
915 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
916 static_cast<uint64_t>(1));
917 }
918 // New WAL file
919 ASSERT_OK(Put(1, Key(1), DummyString(1)));
920 ASSERT_OK(Put(1, Key(1), DummyString(1)));
921 ASSERT_OK(Put(3, Key(10), DummyString(1)));
922 ASSERT_OK(Put(3, Key(10), DummyString(1)));
923 ASSERT_OK(Put(3, Key(10), DummyString(1)));
924
925 options.write_buffer_size = 4096;
926 options.arena_block_size = 4096;
927 ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
928 options);
929 {
930 // No inserts => default is empty
931 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
932 static_cast<uint64_t>(0));
933 // First 4 keys goes to separate SSTs + 1 more SST for 2 smaller keys
934 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
935 static_cast<uint64_t>(5));
936 // 1 SST for big key + 1 SST for small one
937 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
938 static_cast<uint64_t>(2));
939 // 1 SST for all keys
940 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
941 static_cast<uint64_t>(1));
942 }
943 }
944
945 // In https://reviews.facebook.net/D20661 we change
946 // recovery behavior: previously for each log file each column family
947 // memtable was flushed, even it wasn't empty. Now it's changed:
948 // we try to create the smallest number of table files by merging
949 // updates from multiple logs
950 TEST_F(DBWALTest, RecoverCheckFileAmount) {
951 Options options = CurrentOptions();
952 options.write_buffer_size = 100000;
953 options.arena_block_size = 4 * 1024;
954 options.avoid_flush_during_recovery = false;
955 CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
956
957 ASSERT_OK(Put(0, Key(1), DummyString(1)));
958 ASSERT_OK(Put(1, Key(1), DummyString(1)));
959 ASSERT_OK(Put(2, Key(1), DummyString(1)));
960
961 // Make 'nikitich' memtable to be flushed
962 ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
963 ASSERT_OK(Put(3, Key(1), DummyString(1)));
964 dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
965 // 4 memtable are not flushed, 1 sst file
966 {
967 auto tables = ListTableFiles(env_, dbname_);
968 ASSERT_EQ(tables.size(), static_cast<size_t>(1));
969 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
970 static_cast<uint64_t>(1));
971 }
972 // Memtable for 'nikitich' has flushed, new WAL file has opened
973 // 4 memtable still not flushed
974
975 // Write to new WAL file
976 ASSERT_OK(Put(0, Key(1), DummyString(1)));
977 ASSERT_OK(Put(1, Key(1), DummyString(1)));
978 ASSERT_OK(Put(2, Key(1), DummyString(1)));
979
980 // Fill up 'nikitich' one more time
981 ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
982 // make it flush
983 ASSERT_OK(Put(3, Key(1), DummyString(1)));
984 dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
985 // There are still 4 memtable not flushed, and 2 sst tables
986 ASSERT_OK(Put(0, Key(1), DummyString(1)));
987 ASSERT_OK(Put(1, Key(1), DummyString(1)));
988 ASSERT_OK(Put(2, Key(1), DummyString(1)));
989
990 {
991 auto tables = ListTableFiles(env_, dbname_);
992 ASSERT_EQ(tables.size(), static_cast<size_t>(2));
993 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
994 static_cast<uint64_t>(2));
995 }
996
997 ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
998 options);
999 {
1000 std::vector<uint64_t> table_files = ListTableFiles(env_, dbname_);
1001 // Check, that records for 'default', 'dobrynia' and 'pikachu' from
1002 // first, second and third WALs went to the same SST.
1003 // So, there is 6 SSTs: three for 'nikitich', one for 'default', one for
1004 // 'dobrynia', one for 'pikachu'
1005 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
1006 static_cast<uint64_t>(1));
1007 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
1008 static_cast<uint64_t>(3));
1009 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
1010 static_cast<uint64_t>(1));
1011 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
1012 static_cast<uint64_t>(1));
1013 }
1014 }
1015
1016 TEST_F(DBWALTest, SyncMultipleLogs) {
1017 const uint64_t kNumBatches = 2;
1018 const int kBatchSize = 1000;
1019
1020 Options options = CurrentOptions();
1021 options.create_if_missing = true;
1022 options.write_buffer_size = 4096;
1023 Reopen(options);
1024
1025 WriteBatch batch;
1026 WriteOptions wo;
1027 wo.sync = true;
1028
1029 for (uint64_t b = 0; b < kNumBatches; b++) {
1030 batch.Clear();
1031 for (int i = 0; i < kBatchSize; i++) {
1032 batch.Put(Key(i), DummyString(128));
1033 }
1034
1035 dbfull()->Write(wo, &batch);
1036 }
1037
1038 ASSERT_OK(dbfull()->SyncWAL());
1039 }
1040
1041 // Github issue 1339. Prior the fix we read sequence id from the first log to
1042 // a local variable, then keep increase the variable as we replay logs,
1043 // ignoring actual sequence id of the records. This is incorrect if some writes
1044 // come with WAL disabled.
1045 TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
1046 std::unique_ptr<FaultInjectionTestEnv> fault_env(
1047 new FaultInjectionTestEnv(env_));
1048 Options options = CurrentOptions();
1049 options.env = fault_env.get();
1050 options.disable_auto_compactions = true;
1051 WriteOptions wal_on, wal_off;
1052 wal_on.sync = true;
1053 wal_on.disableWAL = false;
1054 wal_off.disableWAL = true;
1055 CreateAndReopenWithCF({"dummy"}, options);
1056 ASSERT_OK(Put(1, "dummy", "d1", wal_on)); // seq id 1
1057 ASSERT_OK(Put(1, "dummy", "d2", wal_off));
1058 ASSERT_OK(Put(1, "dummy", "d3", wal_off));
1059 ASSERT_OK(Put(0, "key", "v4", wal_on)); // seq id 4
1060 ASSERT_OK(Flush(0));
1061 ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5
1062 ASSERT_EQ("v5", Get(0, "key"));
1063 dbfull()->FlushWAL(false);
1064 // Simulate a crash.
1065 fault_env->SetFilesystemActive(false);
1066 Close();
1067 fault_env->ResetState();
1068 ReopenWithColumnFamilies({"default", "dummy"}, options);
1069 // Prior to the fix, we may incorrectly recover "v5" with sequence id = 3.
1070 ASSERT_EQ("v5", Get(0, "key"));
1071 // Destroy DB before destruct fault_env.
1072 Destroy(options);
1073 }
1074
1075 //
1076 // Test WAL recovery for the various modes available
1077 //
1078 class RecoveryTestHelper {
1079 public:
1080 // Number of WAL files to generate
1081 static constexpr int kWALFilesCount = 10;
1082 // Starting number for the WAL file name like 00010.log
1083 static constexpr int kWALFileOffset = 10;
1084 // Keys to be written per WAL file
1085 static constexpr int kKeysPerWALFile = 133;
1086 // Size of the value
1087 static constexpr int kValueSize = 96;
1088
1089 // Create WAL files with values filled in
1090 static void FillData(DBWALTestBase* test, const Options& options,
1091 const size_t wal_count, size_t* count) {
1092 // Calling internal functions requires sanitized options.
1093 Options sanitized_options = SanitizeOptions(test->dbname_, options);
1094 const ImmutableDBOptions db_options(sanitized_options);
1095
1096 *count = 0;
1097
1098 std::shared_ptr<Cache> table_cache = NewLRUCache(50, 0);
1099 EnvOptions env_options;
1100 WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
1101
1102 std::unique_ptr<VersionSet> versions;
1103 std::unique_ptr<WalManager> wal_manager;
1104 WriteController write_controller;
1105
1106 versions.reset(new VersionSet(
1107 test->dbname_, &db_options, env_options, table_cache.get(),
1108 &write_buffer_manager, &write_controller,
1109 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
1110
1111 wal_manager.reset(
1112 new WalManager(db_options, env_options, /*io_tracer=*/nullptr));
1113
1114 std::unique_ptr<log::Writer> current_log_writer;
1115
1116 for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) {
1117 uint64_t current_log_number = j;
1118 std::string fname = LogFileName(test->dbname_, current_log_number);
1119 std::unique_ptr<WritableFile> file;
1120 ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options));
1121 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
1122 NewLegacyWritableFileWrapper(std::move(file)), fname, env_options));
1123 current_log_writer.reset(
1124 new log::Writer(std::move(file_writer), current_log_number,
1125 db_options.recycle_log_file_num > 0));
1126
1127 WriteBatch batch;
1128 for (int i = 0; i < kKeysPerWALFile; i++) {
1129 std::string key = "key" + ToString((*count)++);
1130 std::string value = test->DummyString(kValueSize);
1131 assert(current_log_writer.get() != nullptr);
1132 uint64_t seq = versions->LastSequence() + 1;
1133 batch.Clear();
1134 batch.Put(key, value);
1135 WriteBatchInternal::SetSequence(&batch, seq);
1136 current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch));
1137 versions->SetLastAllocatedSequence(seq);
1138 versions->SetLastPublishedSequence(seq);
1139 versions->SetLastSequence(seq);
1140 }
1141 }
1142 }
1143
1144 // Recreate and fill the store with some data
1145 static size_t FillData(DBWALTestBase* test, Options* options) {
1146 options->create_if_missing = true;
1147 test->DestroyAndReopen(*options);
1148 test->Close();
1149
1150 size_t count = 0;
1151 FillData(test, *options, kWALFilesCount, &count);
1152 return count;
1153 }
1154
1155 // Read back all the keys we wrote and return the number of keys found
1156 static size_t GetData(DBWALTestBase* test) {
1157 size_t count = 0;
1158 for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) {
1159 if (test->Get("key" + ToString(i)) != "NOT_FOUND") {
1160 ++count;
1161 }
1162 }
1163 return count;
1164 }
1165
1166 // Manuall corrupt the specified WAL
1167 static void CorruptWAL(DBWALTestBase* test, const Options& options,
1168 const double off, const double len,
1169 const int wal_file_id, const bool trunc = false) {
1170 Env* env = options.env;
1171 std::string fname = LogFileName(test->dbname_, wal_file_id);
1172 uint64_t size;
1173 ASSERT_OK(env->GetFileSize(fname, &size));
1174 ASSERT_GT(size, 0);
1175 #ifdef OS_WIN
1176 // Windows disk cache behaves differently. When we truncate
1177 // the original content is still in the cache due to the original
1178 // handle is still open. Generally, in Windows, one prohibits
1179 // shared access to files and it is not needed for WAL but we allow
1180 // it to induce corruption at various tests.
1181 test->Close();
1182 #endif
1183 if (trunc) {
1184 ASSERT_OK(
1185 test::TruncateFile(env, fname, static_cast<uint64_t>(size * off)));
1186 } else {
1187 ASSERT_OK(test::CorruptFile(env, fname, static_cast<int>(size * off + 8),
1188 static_cast<int>(size * len), false));
1189 }
1190 }
1191 };
1192
1193 class DBWALTestWithParams
1194 : public DBWALTestBase,
1195 public ::testing::WithParamInterface<std::tuple<bool, int, int>> {
1196 public:
1197 DBWALTestWithParams() : DBWALTestBase("/db_wal_test_with_params") {}
1198 };
1199
1200 INSTANTIATE_TEST_CASE_P(
1201 Wal, DBWALTestWithParams,
1202 ::testing::Combine(::testing::Bool(), ::testing::Range(0, 4, 1),
1203 ::testing::Range(RecoveryTestHelper::kWALFileOffset,
1204 RecoveryTestHelper::kWALFileOffset +
1205 RecoveryTestHelper::kWALFilesCount,
1206 1)));
1207
1208 class DBWALTestWithParamsVaryingRecoveryMode
1209 : public DBWALTestBase,
1210 public ::testing::WithParamInterface<
1211 std::tuple<bool, int, int, WALRecoveryMode>> {
1212 public:
1213 DBWALTestWithParamsVaryingRecoveryMode()
1214 : DBWALTestBase("/db_wal_test_with_params_mode") {}
1215 };
1216
1217 INSTANTIATE_TEST_CASE_P(
1218 Wal, DBWALTestWithParamsVaryingRecoveryMode,
1219 ::testing::Combine(
1220 ::testing::Bool(), ::testing::Range(0, 4, 1),
1221 ::testing::Range(RecoveryTestHelper::kWALFileOffset,
1222 RecoveryTestHelper::kWALFileOffset +
1223 RecoveryTestHelper::kWALFilesCount,
1224 1),
1225 ::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords,
1226 WALRecoveryMode::kAbsoluteConsistency,
1227 WALRecoveryMode::kPointInTimeRecovery,
1228 WALRecoveryMode::kSkipAnyCorruptedRecords)));
1229
1230 // Test scope:
1231 // - We expect to open the data store when there is incomplete trailing writes
1232 // at the end of any of the logs
1233 // - We do not expect to open the data store for corruption
1234 TEST_P(DBWALTestWithParams, kTolerateCorruptedTailRecords) {
1235 bool trunc = std::get<0>(GetParam()); // Corruption style
1236 // Corruption offset position
1237 int corrupt_offset = std::get<1>(GetParam());
1238 int wal_file_id = std::get<2>(GetParam()); // WAL file
1239
1240 // Fill data for testing
1241 Options options = CurrentOptions();
1242 const size_t row_count = RecoveryTestHelper::FillData(this, &options);
1243 // test checksum failure or parsing
1244 RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
1245 /*len%=*/.1, wal_file_id, trunc);
1246
1247 options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
1248 if (trunc) {
1249 options.create_if_missing = false;
1250 ASSERT_OK(TryReopen(options));
1251 const size_t recovered_row_count = RecoveryTestHelper::GetData(this);
1252 ASSERT_TRUE(corrupt_offset == 0 || recovered_row_count > 0);
1253 ASSERT_LT(recovered_row_count, row_count);
1254 } else {
1255 ASSERT_NOK(TryReopen(options));
1256 }
1257 }
1258
1259 // Test scope:
1260 // We don't expect the data store to be opened if there is any corruption
1261 // (leading, middle or trailing -- incomplete writes or corruption)
1262 TEST_P(DBWALTestWithParams, kAbsoluteConsistency) {
1263 // Verify clean slate behavior
1264 Options options = CurrentOptions();
1265 const size_t row_count = RecoveryTestHelper::FillData(this, &options);
1266 options.create_if_missing = false;
1267 ASSERT_OK(TryReopen(options));
1268 ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count);
1269
1270 bool trunc = std::get<0>(GetParam()); // Corruption style
1271 // Corruption offset position
1272 int corrupt_offset = std::get<1>(GetParam());
1273 int wal_file_id = std::get<2>(GetParam()); // WAL file
1274
1275 if (trunc && corrupt_offset == 0) {
1276 return;
1277 }
1278
1279 // fill with new date
1280 RecoveryTestHelper::FillData(this, &options);
1281 // corrupt the wal
1282 RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
1283 /*len%=*/.1, wal_file_id, trunc);
1284 // verify
1285 options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
1286 options.create_if_missing = false;
1287 ASSERT_NOK(TryReopen(options));
1288 }
1289
1290 // Test scope:
1291 // We don't expect the data store to be opened if there is any inconsistency
1292 // between WAL and SST files
1293 TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
1294 Options options = CurrentOptions();
1295 options.avoid_flush_during_recovery = true;
1296
1297 // Create DB with multiple column families.
1298 CreateAndReopenWithCF({"one", "two"}, options);
1299 ASSERT_OK(Put(1, "key1", "val1"));
1300 ASSERT_OK(Put(2, "key2", "val2"));
1301
1302 // Record the offset at this point
1303 Env* env = options.env;
1304 uint64_t wal_file_id = dbfull()->TEST_LogfileNumber();
1305 std::string fname = LogFileName(dbname_, wal_file_id);
1306 uint64_t offset_to_corrupt;
1307 ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt));
1308 ASSERT_GT(offset_to_corrupt, 0);
1309
1310 ASSERT_OK(Put(1, "key3", "val3"));
1311 // Corrupt WAL at location of key3
1312 test::CorruptFile(env, fname, static_cast<int>(offset_to_corrupt), 4, false);
1313 ASSERT_OK(Put(2, "key4", "val4"));
1314 ASSERT_OK(Put(1, "key5", "val5"));
1315 Flush(2);
1316
1317 // PIT recovery & verify
1318 options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
1319 ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options));
1320 }
1321
1322 // Test scope:
1323 // - We expect to open data store under all circumstances
1324 // - We expect only data upto the point where the first error was encountered
1325 TEST_P(DBWALTestWithParams, kPointInTimeRecovery) {
1326 const int maxkeys =
1327 RecoveryTestHelper::kWALFilesCount * RecoveryTestHelper::kKeysPerWALFile;
1328
1329 bool trunc = std::get<0>(GetParam()); // Corruption style
1330 // Corruption offset position
1331 int corrupt_offset = std::get<1>(GetParam());
1332 int wal_file_id = std::get<2>(GetParam()); // WAL file
1333
1334 // Fill data for testing
1335 Options options = CurrentOptions();
1336 const size_t row_count = RecoveryTestHelper::FillData(this, &options);
1337
1338 // Corrupt the wal
1339 RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
1340 /*len%=*/.1, wal_file_id, trunc);
1341
1342 // Verify
1343 options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
1344 options.create_if_missing = false;
1345 ASSERT_OK(TryReopen(options));
1346
1347 // Probe data for invariants
1348 size_t recovered_row_count = RecoveryTestHelper::GetData(this);
1349 ASSERT_LT(recovered_row_count, row_count);
1350
1351 // Verify a prefix of keys were recovered. But not in the case of full WAL
1352 // truncation, because we have no way to know there was a corruption when
1353 // truncation happened on record boundaries (preventing recovery holes in
1354 // that case requires using `track_and_verify_wals_in_manifest`).
1355 if (!trunc || corrupt_offset != 0) {
1356 bool expect_data = true;
1357 for (size_t k = 0; k < maxkeys; ++k) {
1358 bool found = Get("key" + ToString(k)) != "NOT_FOUND";
1359 if (expect_data && !found) {
1360 expect_data = false;
1361 }
1362 ASSERT_EQ(found, expect_data);
1363 }
1364 }
1365
1366 const size_t min = RecoveryTestHelper::kKeysPerWALFile *
1367 (wal_file_id - RecoveryTestHelper::kWALFileOffset);
1368 ASSERT_GE(recovered_row_count, min);
1369 if (!trunc && corrupt_offset != 0) {
1370 const size_t max = RecoveryTestHelper::kKeysPerWALFile *
1371 (wal_file_id - RecoveryTestHelper::kWALFileOffset + 1);
1372 ASSERT_LE(recovered_row_count, max);
1373 }
1374 }
1375
1376 // Test scope:
1377 // - We expect to open the data store under all scenarios
1378 // - We expect to have recovered records past the corruption zone
1379 TEST_P(DBWALTestWithParams, kSkipAnyCorruptedRecords) {
1380 bool trunc = std::get<0>(GetParam()); // Corruption style
1381 // Corruption offset position
1382 int corrupt_offset = std::get<1>(GetParam());
1383 int wal_file_id = std::get<2>(GetParam()); // WAL file
1384
1385 // Fill data for testing
1386 Options options = CurrentOptions();
1387 const size_t row_count = RecoveryTestHelper::FillData(this, &options);
1388
1389 // Corrupt the WAL
1390 RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
1391 /*len%=*/.1, wal_file_id, trunc);
1392
1393 // Verify behavior
1394 options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords;
1395 options.create_if_missing = false;
1396 ASSERT_OK(TryReopen(options));
1397
1398 // Probe data for invariants
1399 size_t recovered_row_count = RecoveryTestHelper::GetData(this);
1400 ASSERT_LT(recovered_row_count, row_count);
1401
1402 if (!trunc) {
1403 ASSERT_TRUE(corrupt_offset != 0 || recovered_row_count > 0);
1404 }
1405 }
1406
1407 TEST_F(DBWALTest, AvoidFlushDuringRecovery) {
1408 Options options = CurrentOptions();
1409 options.disable_auto_compactions = true;
1410 options.avoid_flush_during_recovery = false;
1411
1412 // Test with flush after recovery.
1413 Reopen(options);
1414 ASSERT_OK(Put("foo", "v1"));
1415 ASSERT_OK(Put("bar", "v2"));
1416 ASSERT_OK(Flush());
1417 ASSERT_OK(Put("foo", "v3"));
1418 ASSERT_OK(Put("bar", "v4"));
1419 ASSERT_EQ(1, TotalTableFiles());
1420 // Reopen DB. Check if WAL logs flushed.
1421 Reopen(options);
1422 ASSERT_EQ("v3", Get("foo"));
1423 ASSERT_EQ("v4", Get("bar"));
1424 ASSERT_EQ(2, TotalTableFiles());
1425
1426 // Test without flush after recovery.
1427 options.avoid_flush_during_recovery = true;
1428 DestroyAndReopen(options);
1429 ASSERT_OK(Put("foo", "v5"));
1430 ASSERT_OK(Put("bar", "v6"));
1431 ASSERT_OK(Flush());
1432 ASSERT_OK(Put("foo", "v7"));
1433 ASSERT_OK(Put("bar", "v8"));
1434 ASSERT_EQ(1, TotalTableFiles());
1435 // Reopen DB. WAL logs should not be flushed this time.
1436 Reopen(options);
1437 ASSERT_EQ("v7", Get("foo"));
1438 ASSERT_EQ("v8", Get("bar"));
1439 ASSERT_EQ(1, TotalTableFiles());
1440
1441 // Force flush with allow_2pc.
1442 options.avoid_flush_during_recovery = true;
1443 options.allow_2pc = true;
1444 ASSERT_OK(Put("foo", "v9"));
1445 ASSERT_OK(Put("bar", "v10"));
1446 ASSERT_OK(Flush());
1447 ASSERT_OK(Put("foo", "v11"));
1448 ASSERT_OK(Put("bar", "v12"));
1449 Reopen(options);
1450 ASSERT_EQ("v11", Get("foo"));
1451 ASSERT_EQ("v12", Get("bar"));
1452 ASSERT_EQ(3, TotalTableFiles());
1453 }
1454
1455 TEST_F(DBWALTest, WalCleanupAfterAvoidFlushDuringRecovery) {
1456 // Verifies WAL files that were present during recovery, but not flushed due
1457 // to avoid_flush_during_recovery, will be considered for deletion at a later
1458 // stage. We check at least one such file is deleted during Flush().
1459 Options options = CurrentOptions();
1460 options.disable_auto_compactions = true;
1461 options.avoid_flush_during_recovery = true;
1462 Reopen(options);
1463
1464 ASSERT_OK(Put("foo", "v1"));
1465 Reopen(options);
1466 for (int i = 0; i < 2; ++i) {
1467 if (i > 0) {
1468 // Flush() triggers deletion of obsolete tracked files
1469 Flush();
1470 }
1471 VectorLogPtr log_files;
1472 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
1473 if (i == 0) {
1474 ASSERT_GT(log_files.size(), 0);
1475 } else {
1476 ASSERT_EQ(0, log_files.size());
1477 }
1478 }
1479 }
1480
1481 TEST_F(DBWALTest, RecoverWithoutFlush) {
1482 Options options = CurrentOptions();
1483 options.avoid_flush_during_recovery = true;
1484 options.create_if_missing = false;
1485 options.disable_auto_compactions = true;
1486 options.write_buffer_size = 64 * 1024 * 1024;
1487
1488 size_t count = RecoveryTestHelper::FillData(this, &options);
1489 auto validateData = [this, count]() {
1490 for (size_t i = 0; i < count; i++) {
1491 ASSERT_NE(Get("key" + ToString(i)), "NOT_FOUND");
1492 }
1493 };
1494 Reopen(options);
1495 validateData();
1496 // Insert some data without flush
1497 ASSERT_OK(Put("foo", "foo_v1"));
1498 ASSERT_OK(Put("bar", "bar_v1"));
1499 Reopen(options);
1500 validateData();
1501 ASSERT_EQ(Get("foo"), "foo_v1");
1502 ASSERT_EQ(Get("bar"), "bar_v1");
1503 // Insert again and reopen
1504 ASSERT_OK(Put("foo", "foo_v2"));
1505 ASSERT_OK(Put("bar", "bar_v2"));
1506 Reopen(options);
1507 validateData();
1508 ASSERT_EQ(Get("foo"), "foo_v2");
1509 ASSERT_EQ(Get("bar"), "bar_v2");
1510 // manual flush and insert again
1511 Flush();
1512 ASSERT_EQ(Get("foo"), "foo_v2");
1513 ASSERT_EQ(Get("bar"), "bar_v2");
1514 ASSERT_OK(Put("foo", "foo_v3"));
1515 ASSERT_OK(Put("bar", "bar_v3"));
1516 Reopen(options);
1517 validateData();
1518 ASSERT_EQ(Get("foo"), "foo_v3");
1519 ASSERT_EQ(Get("bar"), "bar_v3");
1520 }
1521
1522 TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) {
1523 const std::string kSmallValue = "v";
1524 const std::string kLargeValue = DummyString(1024);
1525 Options options = CurrentOptions();
1526 options.avoid_flush_during_recovery = true;
1527 options.create_if_missing = false;
1528 options.disable_auto_compactions = true;
1529
1530 auto countWalFiles = [this]() {
1531 VectorLogPtr log_files;
1532 dbfull()->GetSortedWalFiles(log_files);
1533 return log_files.size();
1534 };
1535
1536 // Create DB with multiple column families and multiple log files.
1537 CreateAndReopenWithCF({"one", "two"}, options);
1538 ASSERT_OK(Put(0, "key1", kSmallValue));
1539 ASSERT_OK(Put(1, "key2", kLargeValue));
1540 Flush(1);
1541 ASSERT_EQ(1, countWalFiles());
1542 ASSERT_OK(Put(0, "key3", kSmallValue));
1543 ASSERT_OK(Put(2, "key4", kLargeValue));
1544 Flush(2);
1545 ASSERT_EQ(2, countWalFiles());
1546
1547 // Reopen, insert and flush.
1548 options.db_write_buffer_size = 64 * 1024 * 1024;
1549 ReopenWithColumnFamilies({"default", "one", "two"}, options);
1550 ASSERT_EQ(Get(0, "key1"), kSmallValue);
1551 ASSERT_EQ(Get(1, "key2"), kLargeValue);
1552 ASSERT_EQ(Get(0, "key3"), kSmallValue);
1553 ASSERT_EQ(Get(2, "key4"), kLargeValue);
1554 // Insert more data.
1555 ASSERT_OK(Put(0, "key5", kLargeValue));
1556 ASSERT_OK(Put(1, "key6", kLargeValue));
1557 ASSERT_EQ(3, countWalFiles());
1558 Flush(1);
1559 ASSERT_OK(Put(2, "key7", kLargeValue));
1560 dbfull()->FlushWAL(false);
1561 ASSERT_EQ(4, countWalFiles());
1562
1563 // Reopen twice and validate.
1564 for (int i = 0; i < 2; i++) {
1565 ReopenWithColumnFamilies({"default", "one", "two"}, options);
1566 ASSERT_EQ(Get(0, "key1"), kSmallValue);
1567 ASSERT_EQ(Get(1, "key2"), kLargeValue);
1568 ASSERT_EQ(Get(0, "key3"), kSmallValue);
1569 ASSERT_EQ(Get(2, "key4"), kLargeValue);
1570 ASSERT_EQ(Get(0, "key5"), kLargeValue);
1571 ASSERT_EQ(Get(1, "key6"), kLargeValue);
1572 ASSERT_EQ(Get(2, "key7"), kLargeValue);
1573 ASSERT_EQ(4, countWalFiles());
1574 }
1575 }
1576
1577 // In this test we are trying to do the following:
1578 // 1. Create a DB with corrupted WAL log;
1579 // 2. Open with avoid_flush_during_recovery = true;
1580 // 3. Append more data without flushing, which creates new WAL log.
1581 // 4. Open again. See if it can correctly handle previous corruption.
1582 TEST_P(DBWALTestWithParamsVaryingRecoveryMode,
1583 RecoverFromCorruptedWALWithoutFlush) {
1584 const int kAppendKeys = 100;
1585 Options options = CurrentOptions();
1586 options.avoid_flush_during_recovery = true;
1587 options.create_if_missing = false;
1588 options.disable_auto_compactions = true;
1589 options.write_buffer_size = 64 * 1024 * 1024;
1590
1591 auto getAll = [this]() {
1592 std::vector<std::pair<std::string, std::string>> data;
1593 ReadOptions ropt;
1594 Iterator* iter = dbfull()->NewIterator(ropt);
1595 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
1596 data.push_back(
1597 std::make_pair(iter->key().ToString(), iter->value().ToString()));
1598 }
1599 delete iter;
1600 return data;
1601 };
1602
1603 bool trunc = std::get<0>(GetParam()); // Corruption style
1604 // Corruption offset position
1605 int corrupt_offset = std::get<1>(GetParam());
1606 int wal_file_id = std::get<2>(GetParam()); // WAL file
1607 WALRecoveryMode recovery_mode = std::get<3>(GetParam());
1608
1609 options.wal_recovery_mode = recovery_mode;
1610 // Create corrupted WAL
1611 RecoveryTestHelper::FillData(this, &options);
1612 RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,
1613 /*len%=*/.1, wal_file_id, trunc);
1614 // Skip the test if DB won't open.
1615 if (!TryReopen(options).ok()) {
1616 ASSERT_TRUE(options.wal_recovery_mode ==
1617 WALRecoveryMode::kAbsoluteConsistency ||
1618 (!trunc && options.wal_recovery_mode ==
1619 WALRecoveryMode::kTolerateCorruptedTailRecords));
1620 return;
1621 }
1622 ASSERT_OK(TryReopen(options));
1623 // Append some more data.
1624 for (int k = 0; k < kAppendKeys; k++) {
1625 std::string key = "extra_key" + ToString(k);
1626 std::string value = DummyString(RecoveryTestHelper::kValueSize);
1627 ASSERT_OK(Put(key, value));
1628 }
1629 // Save data for comparison.
1630 auto data = getAll();
1631 // Reopen. Verify data.
1632 ASSERT_OK(TryReopen(options));
1633 auto actual_data = getAll();
1634 ASSERT_EQ(data, actual_data);
1635 }
1636
1637 // Tests that total log size is recovered if we set
1638 // avoid_flush_during_recovery=true.
1639 // Flush should trigger if max_total_wal_size is reached.
1640 TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) {
1641 class TestFlushListener : public EventListener {
1642 public:
1643 std::atomic<int> count{0};
1644
1645 TestFlushListener() = default;
1646
1647 void OnFlushBegin(DB* /*db*/, const FlushJobInfo& flush_job_info) override {
1648 count++;
1649 assert(FlushReason::kWriteBufferManager == flush_job_info.flush_reason);
1650 }
1651 };
1652 std::shared_ptr<TestFlushListener> test_listener =
1653 std::make_shared<TestFlushListener>();
1654
1655 constexpr size_t kKB = 1024;
1656 constexpr size_t kMB = 1024 * 1024;
1657 Options options = CurrentOptions();
1658 options.avoid_flush_during_recovery = true;
1659 options.max_total_wal_size = 1 * kMB;
1660 options.listeners.push_back(test_listener);
1661 // Have to open DB in multi-CF mode to trigger flush when
1662 // max_total_wal_size is reached.
1663 CreateAndReopenWithCF({"one"}, options);
1664 // Write some keys and we will end up with one log file which is slightly
1665 // smaller than 1MB.
1666 std::string value_100k(100 * kKB, 'v');
1667 std::string value_300k(300 * kKB, 'v');
1668 ASSERT_OK(Put(0, "foo", "v1"));
1669 for (int i = 0; i < 9; i++) {
1670 ASSERT_OK(Put(1, "key" + ToString(i), value_100k));
1671 }
1672 // Get log files before reopen.
1673 VectorLogPtr log_files_before;
1674 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
1675 ASSERT_EQ(1, log_files_before.size());
1676 uint64_t log_size_before = log_files_before[0]->SizeFileBytes();
1677 ASSERT_GT(log_size_before, 900 * kKB);
1678 ASSERT_LT(log_size_before, 1 * kMB);
1679 ReopenWithColumnFamilies({"default", "one"}, options);
1680 // Write one more value to make log larger than 1MB.
1681 ASSERT_OK(Put(1, "bar", value_300k));
1682 // Get log files again. A new log file will be opened.
1683 VectorLogPtr log_files_after_reopen;
1684 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after_reopen));
1685 ASSERT_EQ(2, log_files_after_reopen.size());
1686 ASSERT_EQ(log_files_before[0]->LogNumber(),
1687 log_files_after_reopen[0]->LogNumber());
1688 ASSERT_GT(log_files_after_reopen[0]->SizeFileBytes() +
1689 log_files_after_reopen[1]->SizeFileBytes(),
1690 1 * kMB);
1691 // Write one more key to trigger flush.
1692 ASSERT_OK(Put(0, "foo", "v2"));
1693 dbfull()->TEST_WaitForFlushMemTable();
1694 // Flushed two column families.
1695 ASSERT_EQ(2, test_listener->count.load());
1696 }
1697
1698 #if defined(ROCKSDB_PLATFORM_POSIX)
1699 #if defined(ROCKSDB_FALLOCATE_PRESENT)
1700 // Tests that we will truncate the preallocated space of the last log from
1701 // previous.
1702 TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) {
1703 constexpr size_t kKB = 1024;
1704 Options options = CurrentOptions();
1705 options.env = env_;
1706 options.avoid_flush_during_recovery = true;
1707 if (mem_env_) {
1708 ROCKSDB_GTEST_SKIP("Test requires non-mem environment");
1709 return;
1710 }
1711 // Test fallocate support of running file system.
1712 // Skip this test if fallocate is not supported.
1713 std::string fname_test_fallocate = dbname_ + "/preallocate_testfile";
1714 int fd = -1;
1715 do {
1716 fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
1717 } while (fd < 0 && errno == EINTR);
1718 ASSERT_GT(fd, 0);
1719 int alloc_status = fallocate(fd, 0, 0, 1);
1720 int err_number = errno;
1721 close(fd);
1722 ASSERT_OK(options.env->DeleteFile(fname_test_fallocate));
1723 if (err_number == ENOSYS || err_number == EOPNOTSUPP) {
1724 fprintf(stderr, "Skipped preallocated space check: %s\n", strerror(err_number));
1725 return;
1726 }
1727 ASSERT_EQ(0, alloc_status);
1728
1729 DestroyAndReopen(options);
1730 size_t preallocated_size =
1731 dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
1732 ASSERT_OK(Put("foo", "v1"));
1733 VectorLogPtr log_files_before;
1734 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
1735 ASSERT_EQ(1, log_files_before.size());
1736 auto& file_before = log_files_before[0];
1737 ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
1738 // The log file has preallocated space.
1739 ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
1740 preallocated_size);
1741 Reopen(options);
1742 VectorLogPtr log_files_after;
1743 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after));
1744 ASSERT_EQ(1, log_files_after.size());
1745 ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB);
1746 // The preallocated space should be truncated.
1747 ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
1748 preallocated_size);
1749 }
1750 #endif // ROCKSDB_FALLOCATE_PRESENT
1751 #endif // ROCKSDB_PLATFORM_POSIX
1752
1753 #endif // ROCKSDB_LITE
1754
1755 TEST_F(DBWALTest, WalTermTest) {
1756 Options options = CurrentOptions();
1757 options.env = env_;
1758 CreateAndReopenWithCF({"pikachu"}, options);
1759
1760 ASSERT_OK(Put(1, "foo", "bar"));
1761
1762 WriteOptions wo;
1763 wo.sync = true;
1764 wo.disableWAL = false;
1765
1766 WriteBatch batch;
1767 batch.Put("foo", "bar");
1768 batch.MarkWalTerminationPoint();
1769 batch.Put("foo2", "bar2");
1770
1771 ASSERT_OK(dbfull()->Write(wo, &batch));
1772
1773 // make sure we can re-open it.
1774 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
1775 ASSERT_EQ("bar", Get(1, "foo"));
1776 ASSERT_EQ("NOT_FOUND", Get(1, "foo2"));
1777 }
1778 } // namespace ROCKSDB_NAMESPACE
1779
1780 int main(int argc, char** argv) {
1781 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
1782 ::testing::InitGoogleTest(&argc, argv);
1783 return RUN_ALL_TESTS();
1784 }