]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_wal_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / db_wal_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "db/db_test_util.h"
f67539c2 11#include "env/composite_env_wrapper.h"
7c673cae
FG
12#include "options/options_helper.h"
13#include "port/port.h"
14#include "port/stack_trace.h"
f67539c2
TL
15#include "test_util/fault_injection_test_env.h"
16#include "test_util/sync_point.h"
7c673cae 17
f67539c2 18namespace ROCKSDB_NAMESPACE {
7c673cae
FG
19class DBWALTest : public DBTestBase {
20 public:
21 DBWALTest() : DBTestBase("/db_wal_test") {}
11fdf7f2
TL
22
23#if defined(ROCKSDB_PLATFORM_POSIX)
24 uint64_t GetAllocatedFileSize(std::string file_name) {
25 struct stat sbuf;
26 int err = stat(file_name.c_str(), &sbuf);
27 assert(err == 0);
28 return sbuf.st_blocks * 512;
29 }
30#endif
31};
32
33// A SpecialEnv enriched to give more insight about deleted files
34class EnrichedSpecialEnv : public SpecialEnv {
35 public:
36 explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {}
494da23a
TL
37 Status NewSequentialFile(const std::string& f,
38 std::unique_ptr<SequentialFile>* r,
11fdf7f2
TL
39 const EnvOptions& soptions) override {
40 InstrumentedMutexLock l(&env_mutex_);
41 if (f == skipped_wal) {
42 deleted_wal_reopened = true;
43 if (IsWAL(f) && largetest_deleted_wal.size() != 0 &&
44 f.compare(largetest_deleted_wal) <= 0) {
45 gap_in_wals = true;
46 }
47 }
48 return SpecialEnv::NewSequentialFile(f, r, soptions);
49 }
50 Status DeleteFile(const std::string& fname) override {
51 if (IsWAL(fname)) {
52 deleted_wal_cnt++;
53 InstrumentedMutexLock l(&env_mutex_);
54 // If this is the first WAL, remember its name and skip deleting it. We
55 // remember its name partly because the application might attempt to
56 // delete the file again.
57 if (skipped_wal.size() != 0 && skipped_wal != fname) {
58 if (largetest_deleted_wal.size() == 0 ||
59 largetest_deleted_wal.compare(fname) < 0) {
60 largetest_deleted_wal = fname;
61 }
62 } else {
63 skipped_wal = fname;
64 return Status::OK();
65 }
66 }
67 return SpecialEnv::DeleteFile(fname);
68 }
69 bool IsWAL(const std::string& fname) {
70 // printf("iswal %s\n", fname.c_str());
71 return fname.compare(fname.size() - 3, 3, "log") == 0;
72 }
73
74 InstrumentedMutex env_mutex_;
75 // the wal whose actual delete was skipped by the env
76 std::string skipped_wal = "";
77 // the largest WAL that was requested to be deleted
78 std::string largetest_deleted_wal = "";
79 // number of WALs that were successfully deleted
80 std::atomic<size_t> deleted_wal_cnt = {0};
81 // the WAL whose delete from fs was skipped is reopened during recovery
82 std::atomic<bool> deleted_wal_reopened = {false};
83 // whether a gap in the WALs was detected during recovery
84 std::atomic<bool> gap_in_wals = {false};
85};
86
87class DBWALTestWithEnrichedEnv : public DBTestBase {
88 public:
89 DBWALTestWithEnrichedEnv() : DBTestBase("/db_wal_test") {
90 enriched_env_ = new EnrichedSpecialEnv(env_->target());
91 auto options = CurrentOptions();
92 options.env = enriched_env_;
494da23a 93 options.allow_2pc = true;
11fdf7f2
TL
94 Reopen(options);
95 delete env_;
96 // to be deleted by the parent class
97 env_ = enriched_env_;
98 }
99
100 protected:
101 EnrichedSpecialEnv* enriched_env_;
7c673cae
FG
102};
103
11fdf7f2
TL
104// Test that the recovery would successfully avoid the gaps between the logs.
105// One known scenario that could cause this is that the application issue the
106// WAL deletion out of order. For the sake of simplicity in the test, here we
107// create the gap by manipulating the env to skip deletion of the first WAL but
108// not the ones after it.
109TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) {
110 auto options = last_options_;
111 // To cause frequent WAL deletion
112 options.write_buffer_size = 128;
113 Reopen(options);
114
115 WriteOptions writeOpt = WriteOptions();
116 for (int i = 0; i < 128 * 5; i++) {
117 ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
118 }
119 FlushOptions fo;
120 fo.wait = true;
121 ASSERT_OK(db_->Flush(fo));
122
123 // some wals are deleted
124 ASSERT_NE(0, enriched_env_->deleted_wal_cnt);
125 // but not the first one
126 ASSERT_NE(0, enriched_env_->skipped_wal.size());
127
128 // Test that the WAL that was not deleted will be skipped during recovery
129 options = last_options_;
130 Reopen(options);
131 ASSERT_FALSE(enriched_env_->deleted_wal_reopened);
132 ASSERT_FALSE(enriched_env_->gap_in_wals);
133}
134
7c673cae
FG
135TEST_F(DBWALTest, WAL) {
136 do {
137 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
138 WriteOptions writeOpt = WriteOptions();
139 writeOpt.disableWAL = true;
140 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
141 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
142
143 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
144 ASSERT_EQ("v1", Get(1, "foo"));
145 ASSERT_EQ("v1", Get(1, "bar"));
146
147 writeOpt.disableWAL = false;
148 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
149 writeOpt.disableWAL = true;
150 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
151
152 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
153 // Both value's should be present.
154 ASSERT_EQ("v2", Get(1, "bar"));
155 ASSERT_EQ("v2", Get(1, "foo"));
156
157 writeOpt.disableWAL = true;
158 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
159 writeOpt.disableWAL = false;
160 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
161
162 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
163 // again both values should be present.
164 ASSERT_EQ("v3", Get(1, "foo"));
165 ASSERT_EQ("v3", Get(1, "bar"));
166 } while (ChangeWalOptions());
167}
168
169TEST_F(DBWALTest, RollLog) {
170 do {
171 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
172 ASSERT_OK(Put(1, "foo", "v1"));
173 ASSERT_OK(Put(1, "baz", "v5"));
174
175 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
176 for (int i = 0; i < 10; i++) {
177 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
178 }
179 ASSERT_OK(Put(1, "foo", "v4"));
180 for (int i = 0; i < 10; i++) {
181 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
182 }
183 } while (ChangeWalOptions());
184}
185
186TEST_F(DBWALTest, SyncWALNotBlockWrite) {
187 Options options = CurrentOptions();
188 options.max_write_buffer_number = 4;
189 DestroyAndReopen(options);
190
191 ASSERT_OK(Put("foo1", "bar1"));
192 ASSERT_OK(Put("foo5", "bar5"));
193
f67539c2 194 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
7c673cae
FG
195 {"WritableFileWriter::SyncWithoutFlush:1",
196 "DBWALTest::SyncWALNotBlockWrite:1"},
197 {"DBWALTest::SyncWALNotBlockWrite:2",
198 "WritableFileWriter::SyncWithoutFlush:2"},
199 });
f67539c2 200 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae 201
f67539c2 202 ROCKSDB_NAMESPACE::port::Thread thread([&]() { ASSERT_OK(db_->SyncWAL()); });
7c673cae
FG
203
204 TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:1");
205 ASSERT_OK(Put("foo2", "bar2"));
206 ASSERT_OK(Put("foo3", "bar3"));
207 FlushOptions fo;
208 fo.wait = false;
209 ASSERT_OK(db_->Flush(fo));
210 ASSERT_OK(Put("foo4", "bar4"));
211
212 TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:2");
213
214 thread.join();
215
216 ASSERT_EQ(Get("foo1"), "bar1");
217 ASSERT_EQ(Get("foo2"), "bar2");
218 ASSERT_EQ(Get("foo3"), "bar3");
219 ASSERT_EQ(Get("foo4"), "bar4");
220 ASSERT_EQ(Get("foo5"), "bar5");
f67539c2 221 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
222}
223
224TEST_F(DBWALTest, SyncWALNotWaitWrite) {
225 ASSERT_OK(Put("foo1", "bar1"));
226 ASSERT_OK(Put("foo3", "bar3"));
227
f67539c2 228 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
7c673cae
FG
229 {"SpecialEnv::WalFile::Append:1", "DBWALTest::SyncWALNotWaitWrite:1"},
230 {"DBWALTest::SyncWALNotWaitWrite:2", "SpecialEnv::WalFile::Append:2"},
231 });
f67539c2 232 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae 233
f67539c2
TL
234 ROCKSDB_NAMESPACE::port::Thread thread(
235 [&]() { ASSERT_OK(Put("foo2", "bar2")); });
11fdf7f2
TL
236 // Moving this to SyncWAL before the actual fsync
237 // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
7c673cae 238 ASSERT_OK(db_->SyncWAL());
11fdf7f2
TL
239 // Moving this to SyncWAL after actual fsync
240 // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
7c673cae
FG
241
242 thread.join();
243
244 ASSERT_EQ(Get("foo1"), "bar1");
245 ASSERT_EQ(Get("foo2"), "bar2");
f67539c2 246 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
247}
248
249TEST_F(DBWALTest, Recover) {
250 do {
251 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
252 ASSERT_OK(Put(1, "foo", "v1"));
253 ASSERT_OK(Put(1, "baz", "v5"));
254
255 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
256 ASSERT_EQ("v1", Get(1, "foo"));
257
258 ASSERT_EQ("v1", Get(1, "foo"));
259 ASSERT_EQ("v5", Get(1, "baz"));
260 ASSERT_OK(Put(1, "bar", "v2"));
261 ASSERT_OK(Put(1, "foo", "v3"));
262
263 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
264 ASSERT_EQ("v3", Get(1, "foo"));
265 ASSERT_OK(Put(1, "foo", "v4"));
266 ASSERT_EQ("v4", Get(1, "foo"));
267 ASSERT_EQ("v2", Get(1, "bar"));
268 ASSERT_EQ("v5", Get(1, "baz"));
269 } while (ChangeWalOptions());
270}
271
272TEST_F(DBWALTest, RecoverWithTableHandle) {
273 do {
274 Options options = CurrentOptions();
275 options.create_if_missing = true;
276 options.disable_auto_compactions = true;
277 options.avoid_flush_during_recovery = false;
278 DestroyAndReopen(options);
279 CreateAndReopenWithCF({"pikachu"}, options);
280
281 ASSERT_OK(Put(1, "foo", "v1"));
282 ASSERT_OK(Put(1, "bar", "v2"));
283 ASSERT_OK(Flush(1));
284 ASSERT_OK(Put(1, "foo", "v3"));
285 ASSERT_OK(Put(1, "bar", "v4"));
286 ASSERT_OK(Flush(1));
287 ASSERT_OK(Put(1, "big", std::string(100, 'a')));
494da23a
TL
288
289 options = CurrentOptions();
290 const int kSmallMaxOpenFiles = 13;
291 if (option_config_ == kDBLogDir) {
292 // Use this option to check not preloading files
293 // Set the max open files to be small enough so no preload will
294 // happen.
295 options.max_open_files = kSmallMaxOpenFiles;
296 // RocksDB sanitize max open files to at least 20. Modify it back.
f67539c2 297 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
494da23a
TL
298 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
299 int* max_open_files = static_cast<int*>(arg);
300 *max_open_files = kSmallMaxOpenFiles;
301 });
302
303 } else if (option_config_ == kWalDirAndMmapReads) {
304 // Use this option to check always loading all files.
305 options.max_open_files = 100;
306 } else {
307 options.max_open_files = -1;
308 }
f67539c2 309 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
494da23a 310 ReopenWithColumnFamilies({"default", "pikachu"}, options);
f67539c2
TL
311 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
312 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
7c673cae
FG
313
314 std::vector<std::vector<FileMetaData>> files;
315 dbfull()->TEST_GetFilesMetaData(handles_[1], &files);
316 size_t total_files = 0;
317 for (const auto& level : files) {
318 total_files += level.size();
319 }
320 ASSERT_EQ(total_files, 3);
321 for (const auto& level : files) {
322 for (const auto& file : level) {
494da23a 323 if (options.max_open_files == kSmallMaxOpenFiles) {
7c673cae 324 ASSERT_TRUE(file.table_reader_handle == nullptr);
494da23a
TL
325 } else {
326 ASSERT_TRUE(file.table_reader_handle != nullptr);
7c673cae
FG
327 }
328 }
329 }
330 } while (ChangeWalOptions());
331}
332
333TEST_F(DBWALTest, IgnoreRecoveredLog) {
334 std::string backup_logs = dbname_ + "/backup_logs";
335
336 do {
337 // delete old files in backup_logs directory
338 env_->CreateDirIfMissing(backup_logs);
339 std::vector<std::string> old_files;
340 env_->GetChildren(backup_logs, &old_files);
341 for (auto& file : old_files) {
342 if (file != "." && file != "..") {
343 env_->DeleteFile(backup_logs + "/" + file);
344 }
345 }
346 Options options = CurrentOptions();
347 options.create_if_missing = true;
348 options.merge_operator = MergeOperators::CreateUInt64AddOperator();
349 options.wal_dir = dbname_ + "/logs";
350 DestroyAndReopen(options);
351
352 // fill up the DB
353 std::string one, two;
354 PutFixed64(&one, 1);
355 PutFixed64(&two, 2);
356 ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
357 ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
358 ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one)));
359
360 // copy the logs to backup
361 std::vector<std::string> logs;
362 env_->GetChildren(options.wal_dir, &logs);
363 for (auto& log : logs) {
364 if (log != ".." && log != ".") {
365 CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log);
366 }
367 }
368
369 // recover the DB
370 Reopen(options);
371 ASSERT_EQ(two, Get("foo"));
372 ASSERT_EQ(one, Get("bar"));
373 Close();
374
375 // copy the logs from backup back to wal dir
376 for (auto& log : logs) {
377 if (log != ".." && log != ".") {
378 CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
379 }
380 }
381 // this should ignore the log files, recovery should not happen again
382 // if the recovery happens, the same merge operator would be called twice,
383 // leading to incorrect results
384 Reopen(options);
385 ASSERT_EQ(two, Get("foo"));
386 ASSERT_EQ(one, Get("bar"));
387 Close();
388 Destroy(options);
389 Reopen(options);
390 Close();
391
392 // copy the logs from backup back to wal dir
393 env_->CreateDirIfMissing(options.wal_dir);
394 for (auto& log : logs) {
395 if (log != ".." && log != ".") {
396 CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
397 }
398 }
399 // assert that we successfully recovered only from logs, even though we
400 // destroyed the DB
401 Reopen(options);
402 ASSERT_EQ(two, Get("foo"));
403 ASSERT_EQ(one, Get("bar"));
404
405 // Recovery will fail if DB directory doesn't exist.
406 Destroy(options);
407 // copy the logs from backup back to wal dir
408 env_->CreateDirIfMissing(options.wal_dir);
409 for (auto& log : logs) {
410 if (log != ".." && log != ".") {
411 CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
412 // we won't be needing this file no more
413 env_->DeleteFile(backup_logs + "/" + log);
414 }
415 }
416 Status s = TryReopen(options);
417 ASSERT_TRUE(!s.ok());
418 Destroy(options);
419 } while (ChangeWalOptions());
420}
421
422TEST_F(DBWALTest, RecoveryWithEmptyLog) {
423 do {
424 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
425 ASSERT_OK(Put(1, "foo", "v1"));
426 ASSERT_OK(Put(1, "foo", "v2"));
427 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
428 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
429 ASSERT_OK(Put(1, "foo", "v3"));
430 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
431 ASSERT_EQ("v3", Get(1, "foo"));
432 } while (ChangeWalOptions());
433}
434
435#if !(defined NDEBUG) || !defined(OS_WIN)
436TEST_F(DBWALTest, PreallocateBlock) {
437 Options options = CurrentOptions();
438 options.write_buffer_size = 10 * 1000 * 1000;
439 options.max_total_wal_size = 0;
440
441 size_t expected_preallocation_size = static_cast<size_t>(
442 options.write_buffer_size + options.write_buffer_size / 10);
443
444 DestroyAndReopen(options);
445
446 std::atomic<int> called(0);
f67539c2 447 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
7c673cae
FG
448 "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
449 ASSERT_TRUE(arg != nullptr);
450 size_t preallocation_size = *(static_cast<size_t*>(arg));
451 ASSERT_EQ(expected_preallocation_size, preallocation_size);
452 called.fetch_add(1);
453 });
f67539c2 454 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
455 Put("", "");
456 Flush();
457 Put("", "");
458 Close();
f67539c2 459 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
460 ASSERT_EQ(2, called.load());
461
462 options.max_total_wal_size = 1000 * 1000;
463 expected_preallocation_size = static_cast<size_t>(options.max_total_wal_size);
464 Reopen(options);
465 called.store(0);
f67539c2 466 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
7c673cae
FG
467 "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
468 ASSERT_TRUE(arg != nullptr);
469 size_t preallocation_size = *(static_cast<size_t*>(arg));
470 ASSERT_EQ(expected_preallocation_size, preallocation_size);
471 called.fetch_add(1);
472 });
f67539c2 473 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
474 Put("", "");
475 Flush();
476 Put("", "");
477 Close();
f67539c2 478 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
479 ASSERT_EQ(2, called.load());
480
481 options.db_write_buffer_size = 800 * 1000;
482 expected_preallocation_size =
483 static_cast<size_t>(options.db_write_buffer_size);
484 Reopen(options);
485 called.store(0);
f67539c2 486 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
7c673cae
FG
487 "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
488 ASSERT_TRUE(arg != nullptr);
489 size_t preallocation_size = *(static_cast<size_t*>(arg));
490 ASSERT_EQ(expected_preallocation_size, preallocation_size);
491 called.fetch_add(1);
492 });
f67539c2 493 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
494 Put("", "");
495 Flush();
496 Put("", "");
497 Close();
f67539c2 498 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
499 ASSERT_EQ(2, called.load());
500
501 expected_preallocation_size = 700 * 1000;
502 std::shared_ptr<WriteBufferManager> write_buffer_manager =
503 std::make_shared<WriteBufferManager>(static_cast<uint64_t>(700 * 1000));
504 options.write_buffer_manager = write_buffer_manager;
505 Reopen(options);
506 called.store(0);
f67539c2 507 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
7c673cae
FG
508 "DBTestWalFile.GetPreallocationStatus", [&](void* arg) {
509 ASSERT_TRUE(arg != nullptr);
510 size_t preallocation_size = *(static_cast<size_t*>(arg));
511 ASSERT_EQ(expected_preallocation_size, preallocation_size);
512 called.fetch_add(1);
513 });
f67539c2 514 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
515 Put("", "");
516 Flush();
517 Put("", "");
518 Close();
f67539c2 519 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
520 ASSERT_EQ(2, called.load());
521}
522#endif // !(defined NDEBUG) || !defined(OS_WIN)
523
524#ifndef ROCKSDB_LITE
525TEST_F(DBWALTest, FullPurgePreservesRecycledLog) {
526 // For github issue #1303
527 for (int i = 0; i < 2; ++i) {
528 Options options = CurrentOptions();
529 options.create_if_missing = true;
530 options.recycle_log_file_num = 2;
531 if (i != 0) {
532 options.wal_dir = alternative_wal_dir_;
533 }
534
535 DestroyAndReopen(options);
536 ASSERT_OK(Put("foo", "v1"));
537 VectorLogPtr log_files;
538 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
539 ASSERT_GT(log_files.size(), 0);
540 ASSERT_OK(Flush());
541
542 // Now the original WAL is in log_files[0] and should be marked for
543 // recycling.
544 // Verify full purge cannot remove this file.
545 JobContext job_context(0);
546 dbfull()->TEST_LockMutex();
547 dbfull()->FindObsoleteFiles(&job_context, true /* force */);
548 dbfull()->TEST_UnlockMutex();
549 dbfull()->PurgeObsoleteFiles(job_context);
550
551 if (i == 0) {
552 ASSERT_OK(
553 env_->FileExists(LogFileName(dbname_, log_files[0]->LogNumber())));
554 } else {
555 ASSERT_OK(env_->FileExists(
556 LogFileName(alternative_wal_dir_, log_files[0]->LogNumber())));
557 }
558 }
559}
560
f67539c2
TL
561TEST_F(DBWALTest, FullPurgePreservesLogPendingReuse) {
562 // Ensures full purge cannot delete a WAL while it's in the process of being
563 // recycled. In particular, we force the full purge after a file has been
564 // chosen for reuse, but before it has been renamed.
565 for (int i = 0; i < 2; ++i) {
566 Options options = CurrentOptions();
567 options.recycle_log_file_num = 1;
568 if (i != 0) {
569 options.wal_dir = alternative_wal_dir_;
570 }
571 DestroyAndReopen(options);
572
573 // The first flush creates a second log so writes can continue before the
574 // flush finishes.
575 ASSERT_OK(Put("foo", "bar"));
576 ASSERT_OK(Flush());
577
578 // The second flush can recycle the first log. Sync points enforce the
579 // full purge happens after choosing the log to recycle and before it is
580 // renamed.
581 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
582 {"DBImpl::CreateWAL:BeforeReuseWritableFile1",
583 "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"},
584 {"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge",
585 "DBImpl::CreateWAL:BeforeReuseWritableFile2"},
586 });
587 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
588 ROCKSDB_NAMESPACE::port::Thread thread([&]() {
589 TEST_SYNC_POINT(
590 "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge");
591 ASSERT_OK(db_->EnableFileDeletions(true));
592 TEST_SYNC_POINT(
593 "DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge");
594 });
595 ASSERT_OK(Put("foo", "bar"));
596 ASSERT_OK(Flush());
597 thread.join();
598 }
599}
600
7c673cae
FG
601TEST_F(DBWALTest, GetSortedWalFiles) {
602 do {
603 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
604 VectorLogPtr log_files;
605 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
606 ASSERT_EQ(0, log_files.size());
607
608 ASSERT_OK(Put(1, "foo", "v1"));
609 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
610 ASSERT_EQ(1, log_files.size());
611 } while (ChangeWalOptions());
612}
613
f67539c2
TL
614TEST_F(DBWALTest, GetCurrentWalFile) {
615 do {
616 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
617
618 std::unique_ptr<LogFile>* bad_log_file = nullptr;
619 ASSERT_NOK(dbfull()->GetCurrentWalFile(bad_log_file));
620
621 std::unique_ptr<LogFile> log_file;
622 ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
623
624 // nothing has been written to the log yet
625 ASSERT_EQ(log_file->StartSequence(), 0);
626 ASSERT_EQ(log_file->SizeFileBytes(), 0);
627 ASSERT_EQ(log_file->Type(), kAliveLogFile);
628 ASSERT_GT(log_file->LogNumber(), 0);
629
630 // add some data and verify that the file size actually moves foward
631 ASSERT_OK(Put(0, "foo", "v1"));
632 ASSERT_OK(Put(0, "foo2", "v2"));
633 ASSERT_OK(Put(0, "foo3", "v3"));
634
635 ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
636
637 ASSERT_EQ(log_file->StartSequence(), 0);
638 ASSERT_GT(log_file->SizeFileBytes(), 0);
639 ASSERT_EQ(log_file->Type(), kAliveLogFile);
640 ASSERT_GT(log_file->LogNumber(), 0);
641
642 // force log files to cycle and add some more data, then check if
643 // log number moves forward
644
645 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
646 for (int i = 0; i < 10; i++) {
647 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
648 }
649
650 ASSERT_OK(Put(0, "foo4", "v4"));
651 ASSERT_OK(Put(0, "foo5", "v5"));
652 ASSERT_OK(Put(0, "foo6", "v6"));
653
654 ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file));
655
656 ASSERT_EQ(log_file->StartSequence(), 0);
657 ASSERT_GT(log_file->SizeFileBytes(), 0);
658 ASSERT_EQ(log_file->Type(), kAliveLogFile);
659 ASSERT_GT(log_file->LogNumber(), 0);
660
661 } while (ChangeWalOptions());
662}
663
7c673cae
FG
664TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) {
665 // Test for regression of WAL cleanup missing files that don't contain data
666 // for every column family.
667 do {
668 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
669 ASSERT_OK(Put(1, "foo", "v1"));
670 ASSERT_OK(Put(1, "foo", "v2"));
671 uint64_t earliest_log_nums[2];
672 for (int i = 0; i < 2; ++i) {
673 if (i > 0) {
674 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
675 }
676 VectorLogPtr log_files;
677 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
678 if (log_files.size() > 0) {
679 earliest_log_nums[i] = log_files[0]->LogNumber();
680 } else {
681 earliest_log_nums[i] = port::kMaxUint64;
682 }
683 }
684 // Check at least the first WAL was cleaned up during the recovery.
685 ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]);
686 } while (ChangeWalOptions());
687}
688
689TEST_F(DBWALTest, RecoverWithLargeLog) {
690 do {
691 {
692 Options options = CurrentOptions();
693 CreateAndReopenWithCF({"pikachu"}, options);
694 ASSERT_OK(Put(1, "big1", std::string(200000, '1')));
695 ASSERT_OK(Put(1, "big2", std::string(200000, '2')));
696 ASSERT_OK(Put(1, "small3", std::string(10, '3')));
697 ASSERT_OK(Put(1, "small4", std::string(10, '4')));
698 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
699 }
700
701 // Make sure that if we re-open with a small write buffer size that
702 // we flush table files in the middle of a large log file.
703 Options options;
704 options.write_buffer_size = 100000;
705 options = CurrentOptions(options);
706 ReopenWithColumnFamilies({"default", "pikachu"}, options);
707 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3);
708 ASSERT_EQ(std::string(200000, '1'), Get(1, "big1"));
709 ASSERT_EQ(std::string(200000, '2'), Get(1, "big2"));
710 ASSERT_EQ(std::string(10, '3'), Get(1, "small3"));
711 ASSERT_EQ(std::string(10, '4'), Get(1, "small4"));
712 ASSERT_GT(NumTableFilesAtLevel(0, 1), 1);
713 } while (ChangeWalOptions());
714}
715
716// In https://reviews.facebook.net/D20661 we change
717// recovery behavior: previously for each log file each column family
718// memtable was flushed, even it was empty. Now it's changed:
719// we try to create the smallest number of table files by merging
720// updates from multiple logs
721TEST_F(DBWALTest, RecoverCheckFileAmountWithSmallWriteBuffer) {
722 Options options = CurrentOptions();
723 options.write_buffer_size = 5000000;
724 CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
725
726 // Since we will reopen DB with smaller write_buffer_size,
727 // each key will go to new SST file
728 ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
729 ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
730 ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
731 ASSERT_OK(Put(1, Key(10), DummyString(1000000)));
732
733 ASSERT_OK(Put(3, Key(10), DummyString(1)));
734 // Make 'dobrynia' to be flushed and new WAL file to be created
735 ASSERT_OK(Put(2, Key(10), DummyString(7500000)));
736 ASSERT_OK(Put(2, Key(1), DummyString(1)));
737 dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
738 {
739 auto tables = ListTableFiles(env_, dbname_);
740 ASSERT_EQ(tables.size(), static_cast<size_t>(1));
741 // Make sure 'dobrynia' was flushed: check sst files amount
742 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
743 static_cast<uint64_t>(1));
744 }
745 // New WAL file
746 ASSERT_OK(Put(1, Key(1), DummyString(1)));
747 ASSERT_OK(Put(1, Key(1), DummyString(1)));
748 ASSERT_OK(Put(3, Key(10), DummyString(1)));
749 ASSERT_OK(Put(3, Key(10), DummyString(1)));
750 ASSERT_OK(Put(3, Key(10), DummyString(1)));
751
752 options.write_buffer_size = 4096;
753 options.arena_block_size = 4096;
754 ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
755 options);
756 {
757 // No inserts => default is empty
758 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
759 static_cast<uint64_t>(0));
760 // First 4 keys goes to separate SSTs + 1 more SST for 2 smaller keys
761 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
762 static_cast<uint64_t>(5));
763 // 1 SST for big key + 1 SST for small one
764 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
765 static_cast<uint64_t>(2));
766 // 1 SST for all keys
767 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
768 static_cast<uint64_t>(1));
769 }
770}
771
772// In https://reviews.facebook.net/D20661 we change
773// recovery behavior: previously for each log file each column family
774// memtable was flushed, even it wasn't empty. Now it's changed:
775// we try to create the smallest number of table files by merging
776// updates from multiple logs
777TEST_F(DBWALTest, RecoverCheckFileAmount) {
778 Options options = CurrentOptions();
779 options.write_buffer_size = 100000;
780 options.arena_block_size = 4 * 1024;
781 options.avoid_flush_during_recovery = false;
782 CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
783
784 ASSERT_OK(Put(0, Key(1), DummyString(1)));
785 ASSERT_OK(Put(1, Key(1), DummyString(1)));
786 ASSERT_OK(Put(2, Key(1), DummyString(1)));
787
788 // Make 'nikitich' memtable to be flushed
789 ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
790 ASSERT_OK(Put(3, Key(1), DummyString(1)));
791 dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
792 // 4 memtable are not flushed, 1 sst file
793 {
794 auto tables = ListTableFiles(env_, dbname_);
795 ASSERT_EQ(tables.size(), static_cast<size_t>(1));
796 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
797 static_cast<uint64_t>(1));
798 }
799 // Memtable for 'nikitich' has flushed, new WAL file has opened
800 // 4 memtable still not flushed
801
802 // Write to new WAL file
803 ASSERT_OK(Put(0, Key(1), DummyString(1)));
804 ASSERT_OK(Put(1, Key(1), DummyString(1)));
805 ASSERT_OK(Put(2, Key(1), DummyString(1)));
806
807 // Fill up 'nikitich' one more time
808 ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
809 // make it flush
810 ASSERT_OK(Put(3, Key(1), DummyString(1)));
811 dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
812 // There are still 4 memtable not flushed, and 2 sst tables
813 ASSERT_OK(Put(0, Key(1), DummyString(1)));
814 ASSERT_OK(Put(1, Key(1), DummyString(1)));
815 ASSERT_OK(Put(2, Key(1), DummyString(1)));
816
817 {
818 auto tables = ListTableFiles(env_, dbname_);
819 ASSERT_EQ(tables.size(), static_cast<size_t>(2));
820 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
821 static_cast<uint64_t>(2));
822 }
823
824 ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
825 options);
826 {
827 std::vector<uint64_t> table_files = ListTableFiles(env_, dbname_);
828 // Check, that records for 'default', 'dobrynia' and 'pikachu' from
829 // first, second and third WALs went to the same SST.
830 // So, there is 6 SSTs: three for 'nikitich', one for 'default', one for
831 // 'dobrynia', one for 'pikachu'
832 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
833 static_cast<uint64_t>(1));
834 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
835 static_cast<uint64_t>(3));
836 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
837 static_cast<uint64_t>(1));
838 ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
839 static_cast<uint64_t>(1));
840 }
841}
842
843TEST_F(DBWALTest, SyncMultipleLogs) {
844 const uint64_t kNumBatches = 2;
845 const int kBatchSize = 1000;
846
847 Options options = CurrentOptions();
848 options.create_if_missing = true;
849 options.write_buffer_size = 4096;
850 Reopen(options);
851
852 WriteBatch batch;
853 WriteOptions wo;
854 wo.sync = true;
855
856 for (uint64_t b = 0; b < kNumBatches; b++) {
857 batch.Clear();
858 for (int i = 0; i < kBatchSize; i++) {
859 batch.Put(Key(i), DummyString(128));
860 }
861
862 dbfull()->Write(wo, &batch);
863 }
864
865 ASSERT_OK(dbfull()->SyncWAL());
866}
867
868// Github issue 1339. Prior the fix we read sequence id from the first log to
869// a local variable, then keep increase the variable as we replay logs,
870// ignoring actual sequence id of the records. This is incorrect if some writes
871// come with WAL disabled.
872TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
873 std::unique_ptr<FaultInjectionTestEnv> fault_env(
874 new FaultInjectionTestEnv(env_));
875 Options options = CurrentOptions();
876 options.env = fault_env.get();
877 options.disable_auto_compactions = true;
878 WriteOptions wal_on, wal_off;
879 wal_on.sync = true;
880 wal_on.disableWAL = false;
881 wal_off.disableWAL = true;
882 CreateAndReopenWithCF({"dummy"}, options);
883 ASSERT_OK(Put(1, "dummy", "d1", wal_on)); // seq id 1
884 ASSERT_OK(Put(1, "dummy", "d2", wal_off));
885 ASSERT_OK(Put(1, "dummy", "d3", wal_off));
886 ASSERT_OK(Put(0, "key", "v4", wal_on)); // seq id 4
887 ASSERT_OK(Flush(0));
888 ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5
889 ASSERT_EQ("v5", Get(0, "key"));
11fdf7f2 890 dbfull()->FlushWAL(false);
7c673cae
FG
891 // Simulate a crash.
892 fault_env->SetFilesystemActive(false);
893 Close();
894 fault_env->ResetState();
895 ReopenWithColumnFamilies({"default", "dummy"}, options);
896 // Prior to the fix, we may incorrectly recover "v5" with sequence id = 3.
897 ASSERT_EQ("v5", Get(0, "key"));
898 // Destroy DB before destruct fault_env.
899 Destroy(options);
900}
901
902//
903// Test WAL recovery for the various modes available
904//
905class RecoveryTestHelper {
906 public:
907 // Number of WAL files to generate
908 static const int kWALFilesCount = 10;
909 // Starting number for the WAL file name like 00010.log
910 static const int kWALFileOffset = 10;
911 // Keys to be written per WAL file
912 static const int kKeysPerWALFile = 133;
913 // Size of the value
914 static const int kValueSize = 96;
915
916 // Create WAL files with values filled in
917 static void FillData(DBWALTest* test, const Options& options,
918 const size_t wal_count, size_t* count) {
f67539c2
TL
919 // Calling internal functions requires sanitized options.
920 Options sanitized_options = SanitizeOptions(test->dbname_, options);
921 const ImmutableDBOptions db_options(sanitized_options);
7c673cae
FG
922
923 *count = 0;
924
494da23a 925 std::shared_ptr<Cache> table_cache = NewLRUCache(50, 0);
7c673cae
FG
926 EnvOptions env_options;
927 WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
928
494da23a
TL
929 std::unique_ptr<VersionSet> versions;
930 std::unique_ptr<WalManager> wal_manager;
7c673cae
FG
931 WriteController write_controller;
932
933 versions.reset(new VersionSet(test->dbname_, &db_options, env_options,
934 table_cache.get(), &write_buffer_manager,
f67539c2
TL
935 &write_controller,
936 /*block_cache_tracer=*/nullptr));
7c673cae
FG
937
938 wal_manager.reset(new WalManager(db_options, env_options));
939
940 std::unique_ptr<log::Writer> current_log_writer;
941
942 for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) {
943 uint64_t current_log_number = j;
944 std::string fname = LogFileName(test->dbname_, current_log_number);
494da23a 945 std::unique_ptr<WritableFile> file;
7c673cae 946 ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options));
f67539c2
TL
947 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
948 NewLegacyWritableFileWrapper(std::move(file)), fname, env_options));
7c673cae
FG
949 current_log_writer.reset(
950 new log::Writer(std::move(file_writer), current_log_number,
951 db_options.recycle_log_file_num > 0));
952
953 WriteBatch batch;
954 for (int i = 0; i < kKeysPerWALFile; i++) {
955 std::string key = "key" + ToString((*count)++);
956 std::string value = test->DummyString(kValueSize);
957 assert(current_log_writer.get() != nullptr);
958 uint64_t seq = versions->LastSequence() + 1;
959 batch.Clear();
960 batch.Put(key, value);
961 WriteBatchInternal::SetSequence(&batch, seq);
962 current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch));
11fdf7f2
TL
963 versions->SetLastAllocatedSequence(seq);
964 versions->SetLastPublishedSequence(seq);
7c673cae
FG
965 versions->SetLastSequence(seq);
966 }
967 }
968 }
969
970 // Recreate and fill the store with some data
971 static size_t FillData(DBWALTest* test, Options* options) {
972 options->create_if_missing = true;
973 test->DestroyAndReopen(*options);
974 test->Close();
975
976 size_t count = 0;
977 FillData(test, *options, kWALFilesCount, &count);
978 return count;
979 }
980
981 // Read back all the keys we wrote and return the number of keys found
982 static size_t GetData(DBWALTest* test) {
983 size_t count = 0;
984 for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) {
985 if (test->Get("key" + ToString(i)) != "NOT_FOUND") {
986 ++count;
987 }
988 }
989 return count;
990 }
991
992 // Manuall corrupt the specified WAL
993 static void CorruptWAL(DBWALTest* test, const Options& options,
994 const double off, const double len,
995 const int wal_file_id, const bool trunc = false) {
996 Env* env = options.env;
997 std::string fname = LogFileName(test->dbname_, wal_file_id);
998 uint64_t size;
999 ASSERT_OK(env->GetFileSize(fname, &size));
1000 ASSERT_GT(size, 0);
1001#ifdef OS_WIN
1002 // Windows disk cache behaves differently. When we truncate
1003 // the original content is still in the cache due to the original
1004 // handle is still open. Generally, in Windows, one prohibits
1005 // shared access to files and it is not needed for WAL but we allow
1006 // it to induce corruption at various tests.
1007 test->Close();
1008#endif
1009 if (trunc) {
1010 ASSERT_EQ(0, truncate(fname.c_str(), static_cast<int64_t>(size * off)));
1011 } else {
1012 InduceCorruption(fname, static_cast<size_t>(size * off + 8),
1013 static_cast<size_t>(size * len));
1014 }
1015 }
1016
1017 // Overwrite data with 'a' from offset for length len
1018 static void InduceCorruption(const std::string& filename, size_t offset,
1019 size_t len) {
1020 ASSERT_GT(len, 0U);
1021
1022 int fd = open(filename.c_str(), O_RDWR);
1023
1024 // On windows long is 32-bit
1025 ASSERT_LE(offset, std::numeric_limits<long>::max());
1026
1027 ASSERT_GT(fd, 0);
1028 ASSERT_EQ(offset, lseek(fd, static_cast<long>(offset), SEEK_SET));
1029
1030 void* buf = alloca(len);
1031 memset(buf, 'b', len);
1032 ASSERT_EQ(len, write(fd, buf, static_cast<unsigned int>(len)));
1033
1034 close(fd);
1035 }
1036};
1037
1038// Test scope:
1039// - We expect to open the data store when there is incomplete trailing writes
1040// at the end of any of the logs
1041// - We do not expect to open the data store for corruption
1042TEST_F(DBWALTest, kTolerateCorruptedTailRecords) {
1043 const int jstart = RecoveryTestHelper::kWALFileOffset;
1044 const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
1045
1046 for (auto trunc : {true, false}) { /* Corruption style */
1047 for (int i = 0; i < 3; i++) { /* Corruption offset position */
1048 for (int j = jstart; j < jend; j++) { /* WAL file */
1049 // Fill data for testing
1050 Options options = CurrentOptions();
1051 const size_t row_count = RecoveryTestHelper::FillData(this, &options);
1052 // test checksum failure or parsing
1053 RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
1054 /*len%=*/.1, /*wal=*/j, trunc);
1055
1056 if (trunc) {
1057 options.wal_recovery_mode =
1058 WALRecoveryMode::kTolerateCorruptedTailRecords;
1059 options.create_if_missing = false;
1060 ASSERT_OK(TryReopen(options));
1061 const size_t recovered_row_count = RecoveryTestHelper::GetData(this);
1062 ASSERT_TRUE(i == 0 || recovered_row_count > 0);
1063 ASSERT_LT(recovered_row_count, row_count);
1064 } else {
1065 options.wal_recovery_mode =
1066 WALRecoveryMode::kTolerateCorruptedTailRecords;
1067 ASSERT_NOK(TryReopen(options));
1068 }
1069 }
1070 }
1071 }
1072}
1073
1074// Test scope:
1075// We don't expect the data store to be opened if there is any corruption
1076// (leading, middle or trailing -- incomplete writes or corruption)
1077TEST_F(DBWALTest, kAbsoluteConsistency) {
1078 const int jstart = RecoveryTestHelper::kWALFileOffset;
1079 const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
1080
1081 // Verify clean slate behavior
1082 Options options = CurrentOptions();
1083 const size_t row_count = RecoveryTestHelper::FillData(this, &options);
1084 options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
1085 options.create_if_missing = false;
1086 ASSERT_OK(TryReopen(options));
1087 ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count);
1088
1089 for (auto trunc : {true, false}) { /* Corruption style */
1090 for (int i = 0; i < 4; i++) { /* Corruption offset position */
1091 if (trunc && i == 0) {
1092 continue;
1093 }
1094
1095 for (int j = jstart; j < jend; j++) { /* wal files */
1096 // fill with new date
1097 RecoveryTestHelper::FillData(this, &options);
1098 // corrupt the wal
1099 RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
1100 /*len%=*/.1, j, trunc);
1101 // verify
1102 options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency;
1103 options.create_if_missing = false;
1104 ASSERT_NOK(TryReopen(options));
1105 }
1106 }
1107 }
1108}
1109
11fdf7f2
TL
1110// Test scope:
1111// We don't expect the data store to be opened if there is any inconsistency
1112// between WAL and SST files
1113TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
1114 Options options = CurrentOptions();
1115 options.avoid_flush_during_recovery = true;
1116
1117 // Create DB with multiple column families.
1118 CreateAndReopenWithCF({"one", "two"}, options);
1119 ASSERT_OK(Put(1, "key1", "val1"));
1120 ASSERT_OK(Put(2, "key2", "val2"));
1121
1122 // Record the offset at this point
1123 Env* env = options.env;
1124 uint64_t wal_file_id = dbfull()->TEST_LogfileNumber();
1125 std::string fname = LogFileName(dbname_, wal_file_id);
1126 uint64_t offset_to_corrupt;
1127 ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt));
1128 ASSERT_GT(offset_to_corrupt, 0);
1129
1130 ASSERT_OK(Put(1, "key3", "val3"));
1131 // Corrupt WAL at location of key3
1132 RecoveryTestHelper::InduceCorruption(
1133 fname, static_cast<size_t>(offset_to_corrupt), static_cast<size_t>(4));
1134 ASSERT_OK(Put(2, "key4", "val4"));
1135 ASSERT_OK(Put(1, "key5", "val5"));
1136 Flush(2);
1137
1138 // PIT recovery & verify
1139 options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
1140 ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options));
1141}
1142
7c673cae
FG
1143// Test scope:
1144// - We expect to open data store under all circumstances
1145// - We expect only data upto the point where the first error was encountered
1146TEST_F(DBWALTest, kPointInTimeRecovery) {
1147 const int jstart = RecoveryTestHelper::kWALFileOffset;
1148 const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
1149 const int maxkeys =
1150 RecoveryTestHelper::kWALFilesCount * RecoveryTestHelper::kKeysPerWALFile;
1151
1152 for (auto trunc : {true, false}) { /* Corruption style */
1153 for (int i = 0; i < 4; i++) { /* Offset of corruption */
1154 for (int j = jstart; j < jend; j++) { /* WAL file */
1155 // Fill data for testing
1156 Options options = CurrentOptions();
1157 const size_t row_count = RecoveryTestHelper::FillData(this, &options);
1158
1159 // Corrupt the wal
1160 RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
1161 /*len%=*/.1, j, trunc);
1162
1163 // Verify
1164 options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
1165 options.create_if_missing = false;
1166 ASSERT_OK(TryReopen(options));
1167
1168 // Probe data for invariants
1169 size_t recovered_row_count = RecoveryTestHelper::GetData(this);
1170 ASSERT_LT(recovered_row_count, row_count);
1171
1172 bool expect_data = true;
1173 for (size_t k = 0; k < maxkeys; ++k) {
1174 bool found = Get("key" + ToString(i)) != "NOT_FOUND";
1175 if (expect_data && !found) {
1176 expect_data = false;
1177 }
1178 ASSERT_EQ(found, expect_data);
1179 }
1180
1181 const size_t min = RecoveryTestHelper::kKeysPerWALFile *
1182 (j - RecoveryTestHelper::kWALFileOffset);
1183 ASSERT_GE(recovered_row_count, min);
1184 if (!trunc && i != 0) {
1185 const size_t max = RecoveryTestHelper::kKeysPerWALFile *
1186 (j - RecoveryTestHelper::kWALFileOffset + 1);
1187 ASSERT_LE(recovered_row_count, max);
1188 }
1189 }
1190 }
1191 }
1192}
1193
1194// Test scope:
1195// - We expect to open the data store under all scenarios
1196// - We expect to have recovered records past the corruption zone
1197TEST_F(DBWALTest, kSkipAnyCorruptedRecords) {
1198 const int jstart = RecoveryTestHelper::kWALFileOffset;
1199 const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
1200
1201 for (auto trunc : {true, false}) { /* Corruption style */
1202 for (int i = 0; i < 4; i++) { /* Corruption offset */
1203 for (int j = jstart; j < jend; j++) { /* wal files */
1204 // Fill data for testing
1205 Options options = CurrentOptions();
1206 const size_t row_count = RecoveryTestHelper::FillData(this, &options);
1207
1208 // Corrupt the WAL
1209 RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
1210 /*len%=*/.1, j, trunc);
1211
1212 // Verify behavior
1213 options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords;
1214 options.create_if_missing = false;
1215 ASSERT_OK(TryReopen(options));
1216
1217 // Probe data for invariants
1218 size_t recovered_row_count = RecoveryTestHelper::GetData(this);
1219 ASSERT_LT(recovered_row_count, row_count);
1220
1221 if (!trunc) {
1222 ASSERT_TRUE(i != 0 || recovered_row_count > 0);
1223 }
1224 }
1225 }
1226 }
1227}
1228
1229TEST_F(DBWALTest, AvoidFlushDuringRecovery) {
1230 Options options = CurrentOptions();
1231 options.disable_auto_compactions = true;
1232 options.avoid_flush_during_recovery = false;
1233
1234 // Test with flush after recovery.
1235 Reopen(options);
1236 ASSERT_OK(Put("foo", "v1"));
1237 ASSERT_OK(Put("bar", "v2"));
1238 ASSERT_OK(Flush());
1239 ASSERT_OK(Put("foo", "v3"));
1240 ASSERT_OK(Put("bar", "v4"));
1241 ASSERT_EQ(1, TotalTableFiles());
1242 // Reopen DB. Check if WAL logs flushed.
1243 Reopen(options);
1244 ASSERT_EQ("v3", Get("foo"));
1245 ASSERT_EQ("v4", Get("bar"));
1246 ASSERT_EQ(2, TotalTableFiles());
1247
1248 // Test without flush after recovery.
1249 options.avoid_flush_during_recovery = true;
1250 DestroyAndReopen(options);
1251 ASSERT_OK(Put("foo", "v5"));
1252 ASSERT_OK(Put("bar", "v6"));
1253 ASSERT_OK(Flush());
1254 ASSERT_OK(Put("foo", "v7"));
1255 ASSERT_OK(Put("bar", "v8"));
1256 ASSERT_EQ(1, TotalTableFiles());
1257 // Reopen DB. WAL logs should not be flushed this time.
1258 Reopen(options);
1259 ASSERT_EQ("v7", Get("foo"));
1260 ASSERT_EQ("v8", Get("bar"));
1261 ASSERT_EQ(1, TotalTableFiles());
1262
1263 // Force flush with allow_2pc.
1264 options.avoid_flush_during_recovery = true;
1265 options.allow_2pc = true;
1266 ASSERT_OK(Put("foo", "v9"));
1267 ASSERT_OK(Put("bar", "v10"));
1268 ASSERT_OK(Flush());
1269 ASSERT_OK(Put("foo", "v11"));
1270 ASSERT_OK(Put("bar", "v12"));
1271 Reopen(options);
1272 ASSERT_EQ("v11", Get("foo"));
1273 ASSERT_EQ("v12", Get("bar"));
11fdf7f2 1274 ASSERT_EQ(3, TotalTableFiles());
7c673cae
FG
1275}
1276
1277TEST_F(DBWALTest, WalCleanupAfterAvoidFlushDuringRecovery) {
1278 // Verifies WAL files that were present during recovery, but not flushed due
1279 // to avoid_flush_during_recovery, will be considered for deletion at a later
1280 // stage. We check at least one such file is deleted during Flush().
1281 Options options = CurrentOptions();
1282 options.disable_auto_compactions = true;
1283 options.avoid_flush_during_recovery = true;
1284 Reopen(options);
1285
1286 ASSERT_OK(Put("foo", "v1"));
1287 Reopen(options);
1288 for (int i = 0; i < 2; ++i) {
1289 if (i > 0) {
1290 // Flush() triggers deletion of obsolete tracked files
1291 Flush();
1292 }
1293 VectorLogPtr log_files;
1294 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
1295 if (i == 0) {
1296 ASSERT_GT(log_files.size(), 0);
1297 } else {
1298 ASSERT_EQ(0, log_files.size());
1299 }
1300 }
1301}
1302
1303TEST_F(DBWALTest, RecoverWithoutFlush) {
1304 Options options = CurrentOptions();
1305 options.avoid_flush_during_recovery = true;
1306 options.create_if_missing = false;
1307 options.disable_auto_compactions = true;
1308 options.write_buffer_size = 64 * 1024 * 1024;
1309
1310 size_t count = RecoveryTestHelper::FillData(this, &options);
1311 auto validateData = [this, count]() {
1312 for (size_t i = 0; i < count; i++) {
1313 ASSERT_NE(Get("key" + ToString(i)), "NOT_FOUND");
1314 }
1315 };
1316 Reopen(options);
1317 validateData();
1318 // Insert some data without flush
1319 ASSERT_OK(Put("foo", "foo_v1"));
1320 ASSERT_OK(Put("bar", "bar_v1"));
1321 Reopen(options);
1322 validateData();
1323 ASSERT_EQ(Get("foo"), "foo_v1");
1324 ASSERT_EQ(Get("bar"), "bar_v1");
1325 // Insert again and reopen
1326 ASSERT_OK(Put("foo", "foo_v2"));
1327 ASSERT_OK(Put("bar", "bar_v2"));
1328 Reopen(options);
1329 validateData();
1330 ASSERT_EQ(Get("foo"), "foo_v2");
1331 ASSERT_EQ(Get("bar"), "bar_v2");
1332 // manual flush and insert again
1333 Flush();
1334 ASSERT_EQ(Get("foo"), "foo_v2");
1335 ASSERT_EQ(Get("bar"), "bar_v2");
1336 ASSERT_OK(Put("foo", "foo_v3"));
1337 ASSERT_OK(Put("bar", "bar_v3"));
1338 Reopen(options);
1339 validateData();
1340 ASSERT_EQ(Get("foo"), "foo_v3");
1341 ASSERT_EQ(Get("bar"), "bar_v3");
1342}
1343
1344TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) {
1345 const std::string kSmallValue = "v";
1346 const std::string kLargeValue = DummyString(1024);
1347 Options options = CurrentOptions();
1348 options.avoid_flush_during_recovery = true;
1349 options.create_if_missing = false;
1350 options.disable_auto_compactions = true;
1351
1352 auto countWalFiles = [this]() {
1353 VectorLogPtr log_files;
1354 dbfull()->GetSortedWalFiles(log_files);
1355 return log_files.size();
1356 };
1357
1358 // Create DB with multiple column families and multiple log files.
1359 CreateAndReopenWithCF({"one", "two"}, options);
1360 ASSERT_OK(Put(0, "key1", kSmallValue));
1361 ASSERT_OK(Put(1, "key2", kLargeValue));
1362 Flush(1);
1363 ASSERT_EQ(1, countWalFiles());
1364 ASSERT_OK(Put(0, "key3", kSmallValue));
1365 ASSERT_OK(Put(2, "key4", kLargeValue));
1366 Flush(2);
1367 ASSERT_EQ(2, countWalFiles());
1368
1369 // Reopen, insert and flush.
1370 options.db_write_buffer_size = 64 * 1024 * 1024;
1371 ReopenWithColumnFamilies({"default", "one", "two"}, options);
1372 ASSERT_EQ(Get(0, "key1"), kSmallValue);
1373 ASSERT_EQ(Get(1, "key2"), kLargeValue);
1374 ASSERT_EQ(Get(0, "key3"), kSmallValue);
1375 ASSERT_EQ(Get(2, "key4"), kLargeValue);
1376 // Insert more data.
1377 ASSERT_OK(Put(0, "key5", kLargeValue));
1378 ASSERT_OK(Put(1, "key6", kLargeValue));
1379 ASSERT_EQ(3, countWalFiles());
1380 Flush(1);
1381 ASSERT_OK(Put(2, "key7", kLargeValue));
11fdf7f2 1382 dbfull()->FlushWAL(false);
7c673cae
FG
1383 ASSERT_EQ(4, countWalFiles());
1384
1385 // Reopen twice and validate.
1386 for (int i = 0; i < 2; i++) {
1387 ReopenWithColumnFamilies({"default", "one", "two"}, options);
1388 ASSERT_EQ(Get(0, "key1"), kSmallValue);
1389 ASSERT_EQ(Get(1, "key2"), kLargeValue);
1390 ASSERT_EQ(Get(0, "key3"), kSmallValue);
1391 ASSERT_EQ(Get(2, "key4"), kLargeValue);
1392 ASSERT_EQ(Get(0, "key5"), kLargeValue);
1393 ASSERT_EQ(Get(1, "key6"), kLargeValue);
1394 ASSERT_EQ(Get(2, "key7"), kLargeValue);
1395 ASSERT_EQ(4, countWalFiles());
1396 }
1397}
1398
1399// In this test we are trying to do the following:
1400// 1. Create a DB with corrupted WAL log;
1401// 2. Open with avoid_flush_during_recovery = true;
1402// 3. Append more data without flushing, which creates new WAL log.
1403// 4. Open again. See if it can correctly handle previous corruption.
1404TEST_F(DBWALTest, RecoverFromCorruptedWALWithoutFlush) {
1405 const int jstart = RecoveryTestHelper::kWALFileOffset;
1406 const int jend = jstart + RecoveryTestHelper::kWALFilesCount;
1407 const int kAppendKeys = 100;
1408 Options options = CurrentOptions();
1409 options.avoid_flush_during_recovery = true;
1410 options.create_if_missing = false;
1411 options.disable_auto_compactions = true;
1412 options.write_buffer_size = 64 * 1024 * 1024;
1413
1414 auto getAll = [this]() {
1415 std::vector<std::pair<std::string, std::string>> data;
1416 ReadOptions ropt;
1417 Iterator* iter = dbfull()->NewIterator(ropt);
1418 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
1419 data.push_back(
1420 std::make_pair(iter->key().ToString(), iter->value().ToString()));
1421 }
1422 delete iter;
1423 return data;
1424 };
1425 for (auto& mode : wal_recovery_mode_string_map) {
1426 options.wal_recovery_mode = mode.second;
1427 for (auto trunc : {true, false}) {
1428 for (int i = 0; i < 4; i++) {
1429 for (int j = jstart; j < jend; j++) {
1430 // Create corrupted WAL
1431 RecoveryTestHelper::FillData(this, &options);
1432 RecoveryTestHelper::CorruptWAL(this, options, /*off=*/i * .3,
1433 /*len%=*/.1, /*wal=*/j, trunc);
1434 // Skip the test if DB won't open.
1435 if (!TryReopen(options).ok()) {
1436 ASSERT_TRUE(options.wal_recovery_mode ==
1437 WALRecoveryMode::kAbsoluteConsistency ||
1438 (!trunc &&
1439 options.wal_recovery_mode ==
1440 WALRecoveryMode::kTolerateCorruptedTailRecords));
1441 continue;
1442 }
1443 ASSERT_OK(TryReopen(options));
1444 // Append some more data.
1445 for (int k = 0; k < kAppendKeys; k++) {
1446 std::string key = "extra_key" + ToString(k);
1447 std::string value = DummyString(RecoveryTestHelper::kValueSize);
1448 ASSERT_OK(Put(key, value));
1449 }
1450 // Save data for comparison.
1451 auto data = getAll();
1452 // Reopen. Verify data.
1453 ASSERT_OK(TryReopen(options));
1454 auto actual_data = getAll();
1455 ASSERT_EQ(data, actual_data);
1456 }
1457 }
1458 }
1459 }
1460}
1461
11fdf7f2
TL
1462// Tests that total log size is recovered if we set
1463// avoid_flush_during_recovery=true.
1464// Flush should trigger if max_total_wal_size is reached.
1465TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) {
1466 class TestFlushListener : public EventListener {
1467 public:
1468 std::atomic<int> count{0};
1469
1470 TestFlushListener() = default;
1471
1472 void OnFlushBegin(DB* /*db*/, const FlushJobInfo& flush_job_info) override {
1473 count++;
1474 assert(FlushReason::kWriteBufferManager == flush_job_info.flush_reason);
1475 }
1476 };
1477 std::shared_ptr<TestFlushListener> test_listener =
1478 std::make_shared<TestFlushListener>();
1479
1480 constexpr size_t kKB = 1024;
1481 constexpr size_t kMB = 1024 * 1024;
1482 Options options = CurrentOptions();
1483 options.avoid_flush_during_recovery = true;
1484 options.max_total_wal_size = 1 * kMB;
1485 options.listeners.push_back(test_listener);
1486 // Have to open DB in multi-CF mode to trigger flush when
1487 // max_total_wal_size is reached.
1488 CreateAndReopenWithCF({"one"}, options);
1489 // Write some keys and we will end up with one log file which is slightly
1490 // smaller than 1MB.
1491 std::string value_100k(100 * kKB, 'v');
1492 std::string value_300k(300 * kKB, 'v');
1493 ASSERT_OK(Put(0, "foo", "v1"));
1494 for (int i = 0; i < 9; i++) {
1495 ASSERT_OK(Put(1, "key" + ToString(i), value_100k));
1496 }
1497 // Get log files before reopen.
1498 VectorLogPtr log_files_before;
1499 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
1500 ASSERT_EQ(1, log_files_before.size());
1501 uint64_t log_size_before = log_files_before[0]->SizeFileBytes();
1502 ASSERT_GT(log_size_before, 900 * kKB);
1503 ASSERT_LT(log_size_before, 1 * kMB);
1504 ReopenWithColumnFamilies({"default", "one"}, options);
1505 // Write one more value to make log larger than 1MB.
1506 ASSERT_OK(Put(1, "bar", value_300k));
1507 // Get log files again. A new log file will be opened.
1508 VectorLogPtr log_files_after_reopen;
1509 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after_reopen));
1510 ASSERT_EQ(2, log_files_after_reopen.size());
1511 ASSERT_EQ(log_files_before[0]->LogNumber(),
1512 log_files_after_reopen[0]->LogNumber());
1513 ASSERT_GT(log_files_after_reopen[0]->SizeFileBytes() +
1514 log_files_after_reopen[1]->SizeFileBytes(),
1515 1 * kMB);
1516 // Write one more key to trigger flush.
1517 ASSERT_OK(Put(0, "foo", "v2"));
1518 dbfull()->TEST_WaitForFlushMemTable();
1519 // Flushed two column families.
1520 ASSERT_EQ(2, test_listener->count.load());
1521}
1522
1523#if defined(ROCKSDB_PLATFORM_POSIX)
1524#if defined(ROCKSDB_FALLOCATE_PRESENT)
1525// Tests that we will truncate the preallocated space of the last log from
1526// previous.
1527TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) {
1528 constexpr size_t kKB = 1024;
1529 Options options = CurrentOptions();
1530 options.avoid_flush_during_recovery = true;
1531 DestroyAndReopen(options);
1532 size_t preallocated_size =
1533 dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size);
1534 ASSERT_OK(Put("foo", "v1"));
1535 VectorLogPtr log_files_before;
1536 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before));
1537 ASSERT_EQ(1, log_files_before.size());
1538 auto& file_before = log_files_before[0];
1539 ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB);
1540 // The log file has preallocated space.
1541 ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()),
1542 preallocated_size);
1543 Reopen(options);
1544 VectorLogPtr log_files_after;
1545 ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after));
1546 ASSERT_EQ(1, log_files_after.size());
1547 ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB);
1548 // The preallocated space should be truncated.
1549 ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()),
1550 preallocated_size);
1551}
1552#endif // ROCKSDB_FALLOCATE_PRESENT
1553#endif // ROCKSDB_PLATFORM_POSIX
1554
7c673cae
FG
1555#endif // ROCKSDB_LITE
1556
1557TEST_F(DBWALTest, WalTermTest) {
1558 Options options = CurrentOptions();
1559 options.env = env_;
1560 CreateAndReopenWithCF({"pikachu"}, options);
1561
1562 ASSERT_OK(Put(1, "foo", "bar"));
1563
1564 WriteOptions wo;
1565 wo.sync = true;
1566 wo.disableWAL = false;
1567
1568 WriteBatch batch;
1569 batch.Put("foo", "bar");
1570 batch.MarkWalTerminationPoint();
1571 batch.Put("foo2", "bar2");
1572
1573 ASSERT_OK(dbfull()->Write(wo, &batch));
1574
1575 // make sure we can re-open it.
1576 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
1577 ASSERT_EQ("bar", Get(1, "foo"));
1578 ASSERT_EQ("NOT_FOUND", Get(1, "foo2"));
1579}
f67539c2 1580} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
1581
1582int main(int argc, char** argv) {
f67539c2 1583 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
7c673cae
FG
1584 ::testing::InitGoogleTest(&argc, argv);
1585 return RUN_ALL_TESTS();
1586}