]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/error_handler_fs_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / error_handler_fs_test.cc
CommitLineData
20effc67
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9#ifndef ROCKSDB_LITE
10
11#include "db/db_test_util.h"
12#include "port/stack_trace.h"
13#include "rocksdb/io_status.h"
14#include "rocksdb/perf_context.h"
15#include "rocksdb/sst_file_manager.h"
16#if !defined(ROCKSDB_LITE)
17#include "test_util/sync_point.h"
18#endif
19#include "util/random.h"
20#include "utilities/fault_injection_env.h"
21#include "utilities/fault_injection_fs.h"
22
23namespace ROCKSDB_NAMESPACE {
24
25class DBErrorHandlingFSTest : public DBTestBase {
26 public:
27 DBErrorHandlingFSTest()
28 : DBTestBase("/db_error_handling_fs_test", /*env_do_fsync=*/true) {
29 fault_fs_.reset(new FaultInjectionTestFS(env_->GetFileSystem()));
30 fault_env_.reset(new CompositeEnvWrapper(env_, fault_fs_));
31 }
32
33 std::string GetManifestNameFromLiveFiles() {
34 std::vector<std::string> live_files;
35 uint64_t manifest_size;
36
37 Status s = dbfull()->GetLiveFiles(live_files, &manifest_size, false);
38 if (!s.ok()) {
39 return "";
40 }
41 for (auto& file : live_files) {
42 uint64_t num = 0;
43 FileType type;
44 if (ParseFileName(file, &num, &type) && type == kDescriptorFile) {
45 return file;
46 }
47 }
48 return "";
49 }
50
51 std::shared_ptr<FaultInjectionTestFS> fault_fs_;
52 std::unique_ptr<Env> fault_env_;
53};
54
55class ErrorHandlerFSListener : public EventListener {
56 public:
57 ErrorHandlerFSListener()
58 : mutex_(),
59 cv_(&mutex_),
60 no_auto_recovery_(false),
61 recovery_complete_(false),
62 file_creation_started_(false),
63 override_bg_error_(false),
64 file_count_(0),
65 fault_fs_(nullptr) {}
66 ~ErrorHandlerFSListener() {
67 file_creation_error_.PermitUncheckedError();
68 bg_error_.PermitUncheckedError();
69 }
70
71 void OnTableFileCreationStarted(
72 const TableFileCreationBriefInfo& /*ti*/) override {
73 InstrumentedMutexLock l(&mutex_);
74 file_creation_started_ = true;
75 if (file_count_ > 0) {
76 if (--file_count_ == 0) {
77 fault_fs_->SetFilesystemActive(false, file_creation_error_);
78 file_creation_error_ = IOStatus::OK();
79 }
80 }
81 cv_.SignalAll();
82 }
83
84 void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/, Status bg_error,
85 bool* auto_recovery) override {
86 bg_error.PermitUncheckedError();
87 if (*auto_recovery && no_auto_recovery_) {
88 *auto_recovery = false;
89 }
90 }
91
92 void OnErrorRecoveryCompleted(Status old_bg_error) override {
93 InstrumentedMutexLock l(&mutex_);
94 recovery_complete_ = true;
95 cv_.SignalAll();
96 old_bg_error.PermitUncheckedError();
97 }
98
99 bool WaitForRecovery(uint64_t /*abs_time_us*/) {
100 InstrumentedMutexLock l(&mutex_);
101 while (!recovery_complete_) {
102 cv_.Wait(/*abs_time_us*/);
103 }
104 if (recovery_complete_) {
105 recovery_complete_ = false;
106 return true;
107 }
108 return false;
109 }
110
111 void WaitForTableFileCreationStarted(uint64_t /*abs_time_us*/) {
112 InstrumentedMutexLock l(&mutex_);
113 while (!file_creation_started_) {
114 cv_.Wait(/*abs_time_us*/);
115 }
116 file_creation_started_ = false;
117 }
118
119 void OnBackgroundError(BackgroundErrorReason /*reason*/,
120 Status* bg_error) override {
121 if (override_bg_error_) {
122 *bg_error = bg_error_;
123 override_bg_error_ = false;
124 }
125 }
126
127 void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; }
128
129 void OverrideBGError(Status bg_err) {
130 bg_error_ = bg_err;
131 override_bg_error_ = true;
132 }
133
134 void InjectFileCreationError(FaultInjectionTestFS* fs, int file_count,
135 IOStatus io_s) {
136 fault_fs_ = fs;
137 file_count_ = file_count;
138 file_creation_error_ = io_s;
139 }
140
141 private:
142 InstrumentedMutex mutex_;
143 InstrumentedCondVar cv_;
144 bool no_auto_recovery_;
145 bool recovery_complete_;
146 bool file_creation_started_;
147 bool override_bg_error_;
148 int file_count_;
149 IOStatus file_creation_error_;
150 Status bg_error_;
151 FaultInjectionTestFS* fault_fs_;
152};
153
154TEST_F(DBErrorHandlingFSTest, FLushWriteError) {
155 std::shared_ptr<ErrorHandlerFSListener> listener(
156 new ErrorHandlerFSListener());
157 Options options = GetDefaultOptions();
158 options.env = fault_env_.get();
159 options.create_if_missing = true;
160 options.listeners.emplace_back(listener);
161 Status s;
162
163 listener->EnableAutoRecovery(false);
164 DestroyAndReopen(options);
165
166 ASSERT_OK(Put(Key(0), "val"));
167 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
168 fault_fs_->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
169 });
170 SyncPoint::GetInstance()->EnableProcessing();
171 s = Flush();
172 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
173 SyncPoint::GetInstance()->DisableProcessing();
174 fault_fs_->SetFilesystemActive(true);
175 s = dbfull()->Resume();
176 ASSERT_EQ(s, Status::OK());
177
178 Reopen(options);
179 ASSERT_EQ("val", Get(Key(0)));
180 Destroy(options);
181}
182
183TEST_F(DBErrorHandlingFSTest, FLushWritRetryableError) {
184 std::shared_ptr<ErrorHandlerFSListener> listener(
185 new ErrorHandlerFSListener());
186 Options options = GetDefaultOptions();
187 options.env = fault_env_.get();
188 options.create_if_missing = true;
189 options.listeners.emplace_back(listener);
190 options.max_bgerror_resume_count = 0;
191 Status s;
192
193 listener->EnableAutoRecovery(false);
194 DestroyAndReopen(options);
195
196 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
197 error_msg.SetRetryable(true);
198
199 ASSERT_OK(Put(Key(1), "val1"));
200 SyncPoint::GetInstance()->SetCallBack(
201 "BuildTable:BeforeFinishBuildTable",
202 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
203 SyncPoint::GetInstance()->EnableProcessing();
204 s = Flush();
205 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
206 SyncPoint::GetInstance()->DisableProcessing();
207 fault_fs_->SetFilesystemActive(true);
208 s = dbfull()->Resume();
209 ASSERT_OK(s);
210 Reopen(options);
211 ASSERT_EQ("val1", Get(Key(1)));
212
213 ASSERT_OK(Put(Key(2), "val2"));
214 SyncPoint::GetInstance()->SetCallBack(
215 "BuildTable:BeforeSyncTable",
216 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
217 SyncPoint::GetInstance()->EnableProcessing();
218 s = Flush();
219 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
220 SyncPoint::GetInstance()->DisableProcessing();
221 fault_fs_->SetFilesystemActive(true);
222 s = dbfull()->Resume();
223 ASSERT_OK(s);
224 Reopen(options);
225 ASSERT_EQ("val2", Get(Key(2)));
226
227 ASSERT_OK(Put(Key(3), "val3"));
228 SyncPoint::GetInstance()->SetCallBack(
229 "BuildTable:BeforeCloseTableFile",
230 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
231 SyncPoint::GetInstance()->EnableProcessing();
232 s = Flush();
233 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
234 SyncPoint::GetInstance()->DisableProcessing();
235 fault_fs_->SetFilesystemActive(true);
236 s = dbfull()->Resume();
237 ASSERT_OK(s);
238 Reopen(options);
239 ASSERT_EQ("val3", Get(Key(3)));
240
241 Destroy(options);
242}
243
244TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError1) {
245 std::shared_ptr<ErrorHandlerFSListener> listener(
246 new ErrorHandlerFSListener());
247 Options options = GetDefaultOptions();
248 options.env = fault_env_.get();
249 options.create_if_missing = true;
250 options.listeners.emplace_back(listener);
251 options.max_bgerror_resume_count = 0;
252 Status s;
253
254 listener->EnableAutoRecovery(false);
255 DestroyAndReopen(options);
256
257 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
258 error_msg.SetRetryable(true);
259
260 WriteOptions wo = WriteOptions();
261 wo.disableWAL = true;
262 ASSERT_OK(Put(Key(1), "val1", wo));
263 SyncPoint::GetInstance()->SetCallBack(
264 "BuildTable:BeforeFinishBuildTable",
265 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
266 SyncPoint::GetInstance()->EnableProcessing();
267 s = Flush();
268 ASSERT_OK(Put(Key(2), "val2", wo));
269 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
270 ASSERT_EQ("val2", Get(Key(2)));
271 SyncPoint::GetInstance()->DisableProcessing();
272 fault_fs_->SetFilesystemActive(true);
273 s = dbfull()->Resume();
274 ASSERT_EQ(s, Status::OK());
275 ASSERT_EQ("val1", Get(Key(1)));
276 ASSERT_EQ("val2", Get(Key(2)));
277 ASSERT_OK(Put(Key(3), "val3", wo));
278 ASSERT_EQ("val3", Get(Key(3)));
279 s = Flush();
280 ASSERT_OK(s);
281 ASSERT_EQ("val3", Get(Key(3)));
282
283 Destroy(options);
284}
285
286TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError2) {
287 std::shared_ptr<ErrorHandlerFSListener> listener(
288 new ErrorHandlerFSListener());
289 Options options = GetDefaultOptions();
290 options.env = fault_env_.get();
291 options.create_if_missing = true;
292 options.listeners.emplace_back(listener);
293 options.max_bgerror_resume_count = 0;
294 Status s;
295
296 listener->EnableAutoRecovery(false);
297 DestroyAndReopen(options);
298
299 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
300 error_msg.SetRetryable(true);
301
302 WriteOptions wo = WriteOptions();
303 wo.disableWAL = true;
304
305 ASSERT_OK(Put(Key(1), "val1", wo));
306 SyncPoint::GetInstance()->SetCallBack(
307 "BuildTable:BeforeSyncTable",
308 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
309 SyncPoint::GetInstance()->EnableProcessing();
310 s = Flush();
311 ASSERT_OK(Put(Key(2), "val2", wo));
312 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
313 ASSERT_EQ("val2", Get(Key(2)));
314 SyncPoint::GetInstance()->DisableProcessing();
315 fault_fs_->SetFilesystemActive(true);
316 s = dbfull()->Resume();
317 ASSERT_EQ(s, Status::OK());
318 ASSERT_EQ("val1", Get(Key(1)));
319 ASSERT_EQ("val2", Get(Key(2)));
320 ASSERT_OK(Put(Key(3), "val3", wo));
321 ASSERT_EQ("val3", Get(Key(3)));
322 s = Flush();
323 ASSERT_OK(s);
324 ASSERT_EQ("val3", Get(Key(3)));
325
326 Destroy(options);
327}
328
329TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError3) {
330 std::shared_ptr<ErrorHandlerFSListener> listener(
331 new ErrorHandlerFSListener());
332 Options options = GetDefaultOptions();
333 options.env = fault_env_.get();
334 options.create_if_missing = true;
335 options.listeners.emplace_back(listener);
336 options.max_bgerror_resume_count = 0;
337 Status s;
338
339 listener->EnableAutoRecovery(false);
340 DestroyAndReopen(options);
341
342 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
343 error_msg.SetRetryable(true);
344
345 WriteOptions wo = WriteOptions();
346 wo.disableWAL = true;
347
348 ASSERT_OK(Put(Key(1), "val1", wo));
349 SyncPoint::GetInstance()->SetCallBack(
350 "BuildTable:BeforeCloseTableFile",
351 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
352 SyncPoint::GetInstance()->EnableProcessing();
353 s = Flush();
354 ASSERT_OK(Put(Key(2), "val2", wo));
355 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
356 ASSERT_EQ("val2", Get(Key(2)));
357 SyncPoint::GetInstance()->DisableProcessing();
358 fault_fs_->SetFilesystemActive(true);
359 s = dbfull()->Resume();
360 ASSERT_EQ(s, Status::OK());
361 ASSERT_EQ("val1", Get(Key(1)));
362 ASSERT_EQ("val2", Get(Key(2)));
363 ASSERT_OK(Put(Key(3), "val3", wo));
364 ASSERT_EQ("val3", Get(Key(3)));
365 s = Flush();
366 ASSERT_OK(s);
367 ASSERT_EQ("val3", Get(Key(3)));
368
369 Destroy(options);
370}
371
372TEST_F(DBErrorHandlingFSTest, ManifestWriteError) {
373 std::shared_ptr<ErrorHandlerFSListener> listener(
374 new ErrorHandlerFSListener());
375 Options options = GetDefaultOptions();
376 options.env = fault_env_.get();
377 options.create_if_missing = true;
378 options.listeners.emplace_back(listener);
379 Status s;
380 std::string old_manifest;
381 std::string new_manifest;
382
383 listener->EnableAutoRecovery(false);
384 DestroyAndReopen(options);
385 old_manifest = GetManifestNameFromLiveFiles();
386
387 ASSERT_OK(Put(Key(0), "val"));
388 ASSERT_OK(Flush());
389 ASSERT_OK(Put(Key(1), "val"));
390 SyncPoint::GetInstance()->SetCallBack(
391 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
392 fault_fs_->SetFilesystemActive(false,
393 IOStatus::NoSpace("Out of space"));
394 });
395 SyncPoint::GetInstance()->EnableProcessing();
396 s = Flush();
397 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
398 SyncPoint::GetInstance()->ClearAllCallBacks();
399 SyncPoint::GetInstance()->DisableProcessing();
400 fault_fs_->SetFilesystemActive(true);
401 s = dbfull()->Resume();
402 ASSERT_EQ(s, Status::OK());
403
404 new_manifest = GetManifestNameFromLiveFiles();
405 ASSERT_NE(new_manifest, old_manifest);
406
407 Reopen(options);
408 ASSERT_EQ("val", Get(Key(0)));
409 ASSERT_EQ("val", Get(Key(1)));
410 Close();
411}
412
413TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableError) {
414 std::shared_ptr<ErrorHandlerFSListener> listener(
415 new ErrorHandlerFSListener());
416 Options options = GetDefaultOptions();
417 options.env = fault_env_.get();
418 options.create_if_missing = true;
419 options.listeners.emplace_back(listener);
420 options.max_bgerror_resume_count = 0;
421 Status s;
422 std::string old_manifest;
423 std::string new_manifest;
424
425 listener->EnableAutoRecovery(false);
426 DestroyAndReopen(options);
427 old_manifest = GetManifestNameFromLiveFiles();
428
429 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
430 error_msg.SetRetryable(true);
431
432 ASSERT_OK(Put(Key(0), "val"));
433 ASSERT_OK(Flush());
434 ASSERT_OK(Put(Key(1), "val"));
435 SyncPoint::GetInstance()->SetCallBack(
436 "VersionSet::LogAndApply:WriteManifest",
437 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
438 SyncPoint::GetInstance()->EnableProcessing();
439 s = Flush();
440 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
441 SyncPoint::GetInstance()->ClearAllCallBacks();
442 SyncPoint::GetInstance()->DisableProcessing();
443 fault_fs_->SetFilesystemActive(true);
444 s = dbfull()->Resume();
445 ASSERT_EQ(s, Status::OK());
446
447 new_manifest = GetManifestNameFromLiveFiles();
448 ASSERT_NE(new_manifest, old_manifest);
449
450 Reopen(options);
451 ASSERT_EQ("val", Get(Key(0)));
452 ASSERT_EQ("val", Get(Key(1)));
453 Close();
454}
455
456TEST_F(DBErrorHandlingFSTest, DoubleManifestWriteError) {
457 std::shared_ptr<ErrorHandlerFSListener> listener(
458 new ErrorHandlerFSListener());
459 Options options = GetDefaultOptions();
460 options.env = fault_env_.get();
461 options.create_if_missing = true;
462 options.listeners.emplace_back(listener);
463 Status s;
464 std::string old_manifest;
465 std::string new_manifest;
466
467 listener->EnableAutoRecovery(false);
468 DestroyAndReopen(options);
469 old_manifest = GetManifestNameFromLiveFiles();
470
471 ASSERT_OK(Put(Key(0), "val"));
472 ASSERT_OK(Flush());
473 ASSERT_OK(Put(Key(1), "val"));
474 SyncPoint::GetInstance()->SetCallBack(
475 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
476 fault_fs_->SetFilesystemActive(false,
477 IOStatus::NoSpace("Out of space"));
478 });
479 SyncPoint::GetInstance()->EnableProcessing();
480 s = Flush();
481 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
482 fault_fs_->SetFilesystemActive(true);
483
484 // This Resume() will attempt to create a new manifest file and fail again
485 s = dbfull()->Resume();
486 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
487 fault_fs_->SetFilesystemActive(true);
488 SyncPoint::GetInstance()->ClearAllCallBacks();
489 SyncPoint::GetInstance()->DisableProcessing();
490
491 // A successful Resume() will create a new manifest file
492 s = dbfull()->Resume();
493 ASSERT_EQ(s, Status::OK());
494
495 new_manifest = GetManifestNameFromLiveFiles();
496 ASSERT_NE(new_manifest, old_manifest);
497
498 Reopen(options);
499 ASSERT_EQ("val", Get(Key(0)));
500 ASSERT_EQ("val", Get(Key(1)));
501 Close();
502}
503
504TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) {
505 if (mem_env_ != nullptr) {
506 ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
507 return;
508 }
509 std::shared_ptr<ErrorHandlerFSListener> listener(
510 new ErrorHandlerFSListener());
511 Options options = GetDefaultOptions();
512 options.env = fault_env_.get();
513 options.create_if_missing = true;
514 options.level0_file_num_compaction_trigger = 2;
515 options.listeners.emplace_back(listener);
516 Status s;
517 std::string old_manifest;
518 std::string new_manifest;
519 std::atomic<bool> fail_manifest(false);
520 DestroyAndReopen(options);
521 old_manifest = GetManifestNameFromLiveFiles();
522
523 ASSERT_OK(Put(Key(0), "val"));
524 ASSERT_OK(Put(Key(2), "val"));
525 s = Flush();
526 ASSERT_EQ(s, Status::OK());
527
528 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
529 // Wait for flush of 2nd L0 file before starting compaction
530 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
531 "BackgroundCallCompaction:0"},
532 // Wait for compaction to detect manifest write error
533 {"BackgroundCallCompaction:1", "CompactionManifestWriteError:0"},
534 // Make compaction thread wait for error to be cleared
535 {"CompactionManifestWriteError:1",
536 "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"},
537 // Wait for DB instance to clear bg_error before calling
538 // TEST_WaitForCompact
539 {"SstFileManagerImpl::ErrorCleared", "CompactionManifestWriteError:2"}});
540 // trigger manifest write failure in compaction thread
541 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
542 "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
543 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
544 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
545 if (fail_manifest.load()) {
546 fault_fs_->SetFilesystemActive(false,
547 IOStatus::NoSpace("Out of space"));
548 }
549 });
550 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
551
552 ASSERT_OK(Put(Key(1), "val"));
553 // This Flush will trigger a compaction, which will fail when appending to
554 // the manifest
555 s = Flush();
556 ASSERT_EQ(s, Status::OK());
557
558 TEST_SYNC_POINT("CompactionManifestWriteError:0");
559 // Clear all errors so when the compaction is retried, it will succeed
560 fault_fs_->SetFilesystemActive(true);
561 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
562 TEST_SYNC_POINT("CompactionManifestWriteError:1");
563 TEST_SYNC_POINT("CompactionManifestWriteError:2");
564
565 s = dbfull()->TEST_WaitForCompact();
566 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
567 ASSERT_EQ(s, Status::OK());
568
569 new_manifest = GetManifestNameFromLiveFiles();
570 ASSERT_NE(new_manifest, old_manifest);
571 Reopen(options);
572 ASSERT_EQ("val", Get(Key(0)));
573 ASSERT_EQ("val", Get(Key(1)));
574 ASSERT_EQ("val", Get(Key(2)));
575 Close();
576}
577
578TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) {
579 std::shared_ptr<ErrorHandlerFSListener> listener(
580 new ErrorHandlerFSListener());
581 Options options = GetDefaultOptions();
582 options.env = fault_env_.get();
583 options.create_if_missing = true;
584 options.level0_file_num_compaction_trigger = 2;
585 options.listeners.emplace_back(listener);
586 options.max_bgerror_resume_count = 0;
587 Status s;
588 std::string old_manifest;
589 std::string new_manifest;
590 std::atomic<bool> fail_manifest(false);
591 DestroyAndReopen(options);
592 old_manifest = GetManifestNameFromLiveFiles();
593
594 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
595 error_msg.SetRetryable(true);
596
597 ASSERT_OK(Put(Key(0), "val"));
598 ASSERT_OK(Put(Key(2), "val"));
599 s = Flush();
600 ASSERT_EQ(s, Status::OK());
601
602 listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
603 listener->EnableAutoRecovery(false);
604 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
605 // Wait for flush of 2nd L0 file before starting compaction
606 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
607 "BackgroundCallCompaction:0"},
608 // Wait for compaction to detect manifest write error
609 {"BackgroundCallCompaction:1", "CompactionManifestWriteError:0"},
610 // Make compaction thread wait for error to be cleared
611 {"CompactionManifestWriteError:1",
612 "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"}});
613 // trigger manifest write failure in compaction thread
614 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
615 "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
616 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
617 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
618 if (fail_manifest.load()) {
619 fault_fs_->SetFilesystemActive(false, error_msg);
620 }
621 });
622 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
623
624 ASSERT_OK(Put(Key(1), "val"));
625 s = Flush();
626 ASSERT_EQ(s, Status::OK());
627
628 TEST_SYNC_POINT("CompactionManifestWriteError:0");
629 TEST_SYNC_POINT("CompactionManifestWriteError:1");
630
631 s = dbfull()->TEST_WaitForCompact();
632 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
633
634 fault_fs_->SetFilesystemActive(true);
635 SyncPoint::GetInstance()->ClearAllCallBacks();
636 SyncPoint::GetInstance()->DisableProcessing();
637 s = dbfull()->Resume();
638 ASSERT_EQ(s, Status::OK());
639
640 new_manifest = GetManifestNameFromLiveFiles();
641 ASSERT_NE(new_manifest, old_manifest);
642
643 Reopen(options);
644 ASSERT_EQ("val", Get(Key(0)));
645 ASSERT_EQ("val", Get(Key(1)));
646 ASSERT_EQ("val", Get(Key(2)));
647 Close();
648}
649
650TEST_F(DBErrorHandlingFSTest, CompactionWriteError) {
651 std::shared_ptr<ErrorHandlerFSListener> listener(
652 new ErrorHandlerFSListener());
653 Options options = GetDefaultOptions();
654 options.env = fault_env_.get();
655 options.create_if_missing = true;
656 options.level0_file_num_compaction_trigger = 2;
657 options.listeners.emplace_back(listener);
658 Status s;
659 DestroyAndReopen(options);
660
661 ASSERT_OK(Put(Key(0), "va;"));
662 ASSERT_OK(Put(Key(2), "va;"));
663 s = Flush();
664 ASSERT_EQ(s, Status::OK());
665
666 listener->OverrideBGError(
667 Status(Status::NoSpace(), Status::Severity::kHardError));
668 listener->EnableAutoRecovery(false);
669 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
670 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
671 "BackgroundCallCompaction:0"}});
672 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
673 "BackgroundCallCompaction:0", [&](void*) {
674 fault_fs_->SetFilesystemActive(false,
675 IOStatus::NoSpace("Out of space"));
676 });
677 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
678
679 ASSERT_OK(Put(Key(1), "val"));
680 s = Flush();
681 ASSERT_EQ(s, Status::OK());
682
683 s = dbfull()->TEST_WaitForCompact();
684 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
685
686 fault_fs_->SetFilesystemActive(true);
687 s = dbfull()->Resume();
688 ASSERT_EQ(s, Status::OK());
689 Destroy(options);
690}
691
692TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) {
693 std::shared_ptr<ErrorHandlerFSListener> listener(
694 new ErrorHandlerFSListener());
695 Options options = GetDefaultOptions();
696 options.env = fault_env_.get();
697 options.create_if_missing = true;
698 options.level0_file_num_compaction_trigger = 2;
699 options.listeners.emplace_back(listener);
700 options.max_bgerror_resume_count = 0;
701 Status s;
702 DestroyAndReopen(options);
703
704 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
705 error_msg.SetRetryable(true);
706
707 ASSERT_OK(Put(Key(0), "va;"));
708 ASSERT_OK(Put(Key(2), "va;"));
709 s = Flush();
710 ASSERT_EQ(s, Status::OK());
711
712 listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
713 listener->EnableAutoRecovery(false);
714 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
715 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
716 "BackgroundCallCompaction:0"}});
717 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
718 "CompactionJob::OpenCompactionOutputFile",
719 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
720 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
721
722 ASSERT_OK(Put(Key(1), "val"));
723 s = Flush();
724 ASSERT_EQ(s, Status::OK());
725
726 s = dbfull()->TEST_WaitForCompact();
727 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
728
729 fault_fs_->SetFilesystemActive(true);
730 SyncPoint::GetInstance()->ClearAllCallBacks();
731 SyncPoint::GetInstance()->DisableProcessing();
732 s = dbfull()->Resume();
733 ASSERT_EQ(s, Status::OK());
734 Destroy(options);
735}
736
737TEST_F(DBErrorHandlingFSTest, CorruptionError) {
738 Options options = GetDefaultOptions();
739 options.env = fault_env_.get();
740 options.create_if_missing = true;
741 options.level0_file_num_compaction_trigger = 2;
742 Status s;
743 DestroyAndReopen(options);
744
745 ASSERT_OK(Put(Key(0), "va;"));
746 ASSERT_OK(Put(Key(2), "va;"));
747 s = Flush();
748 ASSERT_EQ(s, Status::OK());
749
750 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
751 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
752 "BackgroundCallCompaction:0"}});
753 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
754 "BackgroundCallCompaction:0", [&](void*) {
755 fault_fs_->SetFilesystemActive(false,
756 IOStatus::Corruption("Corruption"));
757 });
758 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
759
760 ASSERT_OK(Put(Key(1), "val"));
761 s = Flush();
762 ASSERT_EQ(s, Status::OK());
763
764 s = dbfull()->TEST_WaitForCompact();
765 ASSERT_EQ(s.severity(),
766 ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
767
768 fault_fs_->SetFilesystemActive(true);
769 s = dbfull()->Resume();
770 ASSERT_NE(s, Status::OK());
771 Destroy(options);
772}
773
774TEST_F(DBErrorHandlingFSTest, AutoRecoverFlushError) {
775 if (mem_env_ != nullptr) {
776 ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
777 return;
778 }
779 std::shared_ptr<ErrorHandlerFSListener> listener(
780 new ErrorHandlerFSListener());
781 Options options = GetDefaultOptions();
782 options.env = fault_env_.get();
783 options.create_if_missing = true;
784 options.listeners.emplace_back(listener);
785 Status s;
786
787 listener->EnableAutoRecovery();
788 DestroyAndReopen(options);
789
790 ASSERT_OK(Put(Key(0), "val"));
791 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
792 fault_fs_->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
793 });
794 SyncPoint::GetInstance()->EnableProcessing();
795 s = Flush();
796 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
797 SyncPoint::GetInstance()->DisableProcessing();
798 fault_fs_->SetFilesystemActive(true);
799 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
800
801 s = Put(Key(1), "val");
802 ASSERT_EQ(s, Status::OK());
803
804 Reopen(options);
805 ASSERT_EQ("val", Get(Key(0)));
806 ASSERT_EQ("val", Get(Key(1)));
807 Destroy(options);
808}
809
810TEST_F(DBErrorHandlingFSTest, FailRecoverFlushError) {
811 std::shared_ptr<ErrorHandlerFSListener> listener(
812 new ErrorHandlerFSListener());
813 Options options = GetDefaultOptions();
814 options.env = fault_env_.get();
815 options.create_if_missing = true;
816 options.listeners.emplace_back(listener);
817 Status s;
818
819 listener->EnableAutoRecovery();
820 DestroyAndReopen(options);
821
822 ASSERT_OK(Put(Key(0), "val"));
823 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
824 fault_fs_->SetFilesystemActive(false, IOStatus::NoSpace("Out of space"));
825 });
826 SyncPoint::GetInstance()->EnableProcessing();
827 s = Flush();
828 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
829 // We should be able to shutdown the database while auto recovery is going
830 // on in the background
831 Close();
832 DestroyDB(dbname_, options);
833}
834
835TEST_F(DBErrorHandlingFSTest, WALWriteError) {
836 if (mem_env_ != nullptr) {
837 ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
838 return;
839 }
840 std::shared_ptr<ErrorHandlerFSListener> listener(
841 new ErrorHandlerFSListener());
842 Options options = GetDefaultOptions();
843 options.env = fault_env_.get();
844 options.create_if_missing = true;
845 options.writable_file_max_buffer_size = 32768;
846 options.listeners.emplace_back(listener);
847 Status s;
848 Random rnd(301);
849
850 listener->EnableAutoRecovery();
851 DestroyAndReopen(options);
852
853 {
854 WriteBatch batch;
855
856 for (auto i = 0; i < 100; ++i) {
857 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
858 }
859
860 WriteOptions wopts;
861 wopts.sync = true;
862 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
863 };
864
865 {
866 WriteBatch batch;
867 int write_error = 0;
868
869 for (auto i = 100; i < 199; ++i) {
870 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
871 }
872
873 SyncPoint::GetInstance()->SetCallBack(
874 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
875 write_error++;
876 if (write_error > 2) {
877 fault_fs_->SetFilesystemActive(false,
878 IOStatus::NoSpace("Out of space"));
879 }
880 });
881 SyncPoint::GetInstance()->EnableProcessing();
882 WriteOptions wopts;
883 wopts.sync = true;
884 s = dbfull()->Write(wopts, &batch);
885 ASSERT_EQ(s, s.NoSpace());
886 }
887 SyncPoint::GetInstance()->DisableProcessing();
888 fault_fs_->SetFilesystemActive(true);
889 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
890 for (auto i = 0; i < 199; ++i) {
891 if (i < 100) {
892 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
893 } else {
894 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
895 }
896 }
897 Reopen(options);
898 for (auto i = 0; i < 199; ++i) {
899 if (i < 100) {
900 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
901 } else {
902 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
903 }
904 }
905 Close();
906}
907
908TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) {
909 std::shared_ptr<ErrorHandlerFSListener> listener(
910 new ErrorHandlerFSListener());
911 Options options = GetDefaultOptions();
912 options.env = fault_env_.get();
913 options.create_if_missing = true;
914 options.writable_file_max_buffer_size = 32768;
915 options.listeners.emplace_back(listener);
916 options.paranoid_checks = true;
917 options.max_bgerror_resume_count = 0;
918 Status s;
919 Random rnd(301);
920
921 DestroyAndReopen(options);
922
923 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
924 error_msg.SetRetryable(true);
925
926 // For the first batch, write is successful, require sync
927 {
928 WriteBatch batch;
929
930 for (auto i = 0; i < 100; ++i) {
931 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
932 }
933
934 WriteOptions wopts;
935 wopts.sync = true;
936 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
937 };
938
939 // For the second batch, the first 2 file Append are successful, then the
940 // following Append fails due to file system retryable IOError.
941 {
942 WriteBatch batch;
943 int write_error = 0;
944
945 for (auto i = 100; i < 200; ++i) {
946 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
947 }
948
949 SyncPoint::GetInstance()->SetCallBack(
950 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
951 write_error++;
952 if (write_error > 2) {
953 fault_fs_->SetFilesystemActive(false, error_msg);
954 }
955 });
956 SyncPoint::GetInstance()->EnableProcessing();
957 WriteOptions wopts;
958 wopts.sync = true;
959 s = dbfull()->Write(wopts, &batch);
960 ASSERT_EQ(true, s.IsIOError());
961 }
962 fault_fs_->SetFilesystemActive(true);
963 SyncPoint::GetInstance()->ClearAllCallBacks();
964 SyncPoint::GetInstance()->DisableProcessing();
965
966 // Data in corrupted WAL are not stored
967 for (auto i = 0; i < 199; ++i) {
968 if (i < 100) {
969 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
970 } else {
971 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
972 }
973 }
974
975 // Resume and write a new batch, should be in the WAL
976 s = dbfull()->Resume();
977 ASSERT_EQ(s, Status::OK());
978 {
979 WriteBatch batch;
980
981 for (auto i = 200; i < 300; ++i) {
982 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
983 }
984
985 WriteOptions wopts;
986 wopts.sync = true;
987 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
988 };
989
990 Reopen(options);
991 for (auto i = 0; i < 300; ++i) {
992 if (i < 100 || i >= 200) {
993 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
994 } else {
995 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
996 }
997 }
998 Close();
999}
1000
1001TEST_F(DBErrorHandlingFSTest, MultiCFWALWriteError) {
1002 if (mem_env_ != nullptr) {
1003 ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
1004 return;
1005 }
1006 std::shared_ptr<ErrorHandlerFSListener> listener(
1007 new ErrorHandlerFSListener());
1008 Options options = GetDefaultOptions();
1009 options.env = fault_env_.get();
1010 options.create_if_missing = true;
1011 options.writable_file_max_buffer_size = 32768;
1012 options.listeners.emplace_back(listener);
1013 Status s;
1014 Random rnd(301);
1015
1016 listener->EnableAutoRecovery();
1017 CreateAndReopenWithCF({"one", "two", "three"}, options);
1018
1019 {
1020 WriteBatch batch;
1021
1022 for (auto i = 1; i < 4; ++i) {
1023 for (auto j = 0; j < 100; ++j) {
1024 ASSERT_OK(batch.Put(handles_[i], Key(j), rnd.RandomString(1024)));
1025 }
1026 }
1027
1028 WriteOptions wopts;
1029 wopts.sync = true;
1030 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
1031 };
1032
1033 {
1034 WriteBatch batch;
1035 int write_error = 0;
1036
1037 // Write to one CF
1038 for (auto i = 100; i < 199; ++i) {
1039 ASSERT_OK(batch.Put(handles_[2], Key(i), rnd.RandomString(1024)));
1040 }
1041
1042 SyncPoint::GetInstance()->SetCallBack(
1043 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
1044 write_error++;
1045 if (write_error > 2) {
1046 fault_fs_->SetFilesystemActive(false,
1047 IOStatus::NoSpace("Out of space"));
1048 }
1049 });
1050 SyncPoint::GetInstance()->EnableProcessing();
1051 WriteOptions wopts;
1052 wopts.sync = true;
1053 s = dbfull()->Write(wopts, &batch);
1054 ASSERT_EQ(s, s.NoSpace());
1055 }
1056 SyncPoint::GetInstance()->DisableProcessing();
1057 fault_fs_->SetFilesystemActive(true);
1058 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
1059
1060 for (auto i = 1; i < 4; ++i) {
1061 // Every CF should have been flushed
1062 ASSERT_EQ(NumTableFilesAtLevel(0, i), 1);
1063 }
1064
1065 for (auto i = 1; i < 4; ++i) {
1066 for (auto j = 0; j < 199; ++j) {
1067 if (j < 100) {
1068 ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
1069 } else {
1070 ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
1071 }
1072 }
1073 }
1074 ReopenWithColumnFamilies({"default", "one", "two", "three"}, options);
1075 for (auto i = 1; i < 4; ++i) {
1076 for (auto j = 0; j < 199; ++j) {
1077 if (j < 100) {
1078 ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
1079 } else {
1080 ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
1081 }
1082 }
1083 }
1084 Close();
1085}
1086
1087TEST_F(DBErrorHandlingFSTest, MultiDBCompactionError) {
1088 if (mem_env_ != nullptr) {
1089 ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
1090 return;
1091 }
1092 FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(env_);
1093 std::vector<std::unique_ptr<Env>> fault_envs;
1094 std::vector<FaultInjectionTestFS*> fault_fs;
1095 std::vector<Options> options;
1096 std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener;
1097 std::vector<DB*> db;
1098 std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
1099 int kNumDbInstances = 3;
1100 Random rnd(301);
1101
1102 for (auto i = 0; i < kNumDbInstances; ++i) {
1103 listener.emplace_back(new ErrorHandlerFSListener());
1104 options.emplace_back(GetDefaultOptions());
1105 fault_fs.emplace_back(new FaultInjectionTestFS(env_->GetFileSystem()));
1106 std::shared_ptr<FileSystem> fs(fault_fs.back());
1107 fault_envs.emplace_back(new CompositeEnvWrapper(def_env, fs));
1108 options[i].env = fault_envs.back().get();
1109 options[i].create_if_missing = true;
1110 options[i].level0_file_num_compaction_trigger = 2;
1111 options[i].writable_file_max_buffer_size = 32768;
1112 options[i].listeners.emplace_back(listener[i]);
1113 options[i].sst_file_manager = sfm;
1114 DB* dbptr;
1115 char buf[16];
1116
1117 listener[i]->EnableAutoRecovery();
1118 // Setup for returning error for the 3rd SST, which would be level 1
1119 listener[i]->InjectFileCreationError(fault_fs[i], 3,
1120 IOStatus::NoSpace("Out of space"));
1121 snprintf(buf, sizeof(buf), "_%d", i);
1122 DestroyDB(dbname_ + std::string(buf), options[i]);
1123 ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),
1124 Status::OK());
1125 db.emplace_back(dbptr);
1126 }
1127
1128 for (auto i = 0; i < kNumDbInstances; ++i) {
1129 WriteBatch batch;
1130
1131 for (auto j = 0; j <= 100; ++j) {
1132 ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
1133 }
1134
1135 WriteOptions wopts;
1136 wopts.sync = true;
1137 ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
1138 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
1139 }
1140
1141 def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
1142 for (auto i = 0; i < kNumDbInstances; ++i) {
1143 WriteBatch batch;
1144
1145 // Write to one CF
1146 for (auto j = 100; j < 199; ++j) {
1147 ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
1148 }
1149
1150 WriteOptions wopts;
1151 wopts.sync = true;
1152 ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
1153 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
1154 }
1155
1156 for (auto i = 0; i < kNumDbInstances; ++i) {
1157 Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
1158 ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
1159 fault_fs[i]->SetFilesystemActive(true);
1160 }
1161
1162 def_env->SetFilesystemActive(true);
1163 for (auto i = 0; i < kNumDbInstances; ++i) {
1164 std::string prop;
1165 ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
1166 ASSERT_EQ(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true),
1167 Status::OK());
1168 EXPECT_TRUE(db[i]->GetProperty(
1169 "rocksdb.num-files-at-level" + NumberToString(0), &prop));
1170 EXPECT_EQ(atoi(prop.c_str()), 0);
1171 EXPECT_TRUE(db[i]->GetProperty(
1172 "rocksdb.num-files-at-level" + NumberToString(1), &prop));
1173 EXPECT_EQ(atoi(prop.c_str()), 1);
1174 }
1175
1176 for (auto i = 0; i < kNumDbInstances; ++i) {
1177 char buf[16];
1178 snprintf(buf, sizeof(buf), "_%d", i);
1179 delete db[i];
1180 fault_fs[i]->SetFilesystemActive(true);
1181 if (getenv("KEEP_DB")) {
1182 printf("DB is still at %s%s\n", dbname_.c_str(), buf);
1183 } else {
1184 Status s = DestroyDB(dbname_ + std::string(buf), options[i]);
1185 }
1186 }
1187 options.clear();
1188 sfm.reset();
1189 delete def_env;
1190}
1191
1192TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) {
1193 if (mem_env_ != nullptr) {
1194 ROCKSDB_GTEST_SKIP("Test requires non-mock environment");
1195 return;
1196 }
1197 FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(env_);
1198 std::vector<std::unique_ptr<Env>> fault_envs;
1199 std::vector<FaultInjectionTestFS*> fault_fs;
1200 std::vector<Options> options;
1201 std::vector<std::shared_ptr<ErrorHandlerFSListener>> listener;
1202 std::vector<DB*> db;
1203 std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
1204 int kNumDbInstances = 3;
1205 Random rnd(301);
1206
1207 for (auto i = 0; i < kNumDbInstances; ++i) {
1208 listener.emplace_back(new ErrorHandlerFSListener());
1209 options.emplace_back(GetDefaultOptions());
1210 fault_fs.emplace_back(new FaultInjectionTestFS(env_->GetFileSystem()));
1211 std::shared_ptr<FileSystem> fs(fault_fs.back());
1212 fault_envs.emplace_back(new CompositeEnvWrapper(def_env, fs));
1213 options[i].env = fault_envs.back().get();
1214 options[i].create_if_missing = true;
1215 options[i].level0_file_num_compaction_trigger = 2;
1216 options[i].writable_file_max_buffer_size = 32768;
1217 options[i].listeners.emplace_back(listener[i]);
1218 options[i].sst_file_manager = sfm;
1219 DB* dbptr;
1220 char buf[16];
1221
1222 listener[i]->EnableAutoRecovery();
1223 switch (i) {
1224 case 0:
1225 // Setup for returning error for the 3rd SST, which would be level 1
1226 listener[i]->InjectFileCreationError(fault_fs[i], 3,
1227 IOStatus::NoSpace("Out of space"));
1228 break;
1229 case 1:
1230 // Setup for returning error after the 1st SST, which would result
1231 // in a hard error
1232 listener[i]->InjectFileCreationError(fault_fs[i], 2,
1233 IOStatus::NoSpace("Out of space"));
1234 break;
1235 default:
1236 break;
1237 }
1238 snprintf(buf, sizeof(buf), "_%d", i);
1239 DestroyDB(dbname_ + std::string(buf), options[i]);
1240 ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),
1241 Status::OK());
1242 db.emplace_back(dbptr);
1243 }
1244
1245 for (auto i = 0; i < kNumDbInstances; ++i) {
1246 WriteBatch batch;
1247
1248 for (auto j = 0; j <= 100; ++j) {
1249 ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
1250 }
1251
1252 WriteOptions wopts;
1253 wopts.sync = true;
1254 ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
1255 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
1256 }
1257
1258 def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
1259 for (auto i = 0; i < kNumDbInstances; ++i) {
1260 WriteBatch batch;
1261
1262 // Write to one CF
1263 for (auto j = 100; j < 199; ++j) {
1264 ASSERT_OK(batch.Put(Key(j), rnd.RandomString(1024)));
1265 }
1266
1267 WriteOptions wopts;
1268 wopts.sync = true;
1269 ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
1270 if (i != 1) {
1271 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
1272 } else {
1273 ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::NoSpace());
1274 }
1275 }
1276
1277 for (auto i = 0; i < kNumDbInstances; ++i) {
1278 Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
1279 switch (i) {
1280 case 0:
1281 ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
1282 break;
1283 case 1:
1284 ASSERT_EQ(s.severity(), Status::Severity::kHardError);
1285 break;
1286 case 2:
1287 ASSERT_EQ(s, Status::OK());
1288 break;
1289 }
1290 fault_fs[i]->SetFilesystemActive(true);
1291 }
1292
1293 def_env->SetFilesystemActive(true);
1294 for (auto i = 0; i < kNumDbInstances; ++i) {
1295 std::string prop;
1296 if (i < 2) {
1297 ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
1298 }
1299 if (i == 1) {
1300 ASSERT_EQ(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true),
1301 Status::OK());
1302 }
1303 EXPECT_TRUE(db[i]->GetProperty(
1304 "rocksdb.num-files-at-level" + NumberToString(0), &prop));
1305 EXPECT_EQ(atoi(prop.c_str()), 0);
1306 EXPECT_TRUE(db[i]->GetProperty(
1307 "rocksdb.num-files-at-level" + NumberToString(1), &prop));
1308 EXPECT_EQ(atoi(prop.c_str()), 1);
1309 }
1310
1311 for (auto i = 0; i < kNumDbInstances; ++i) {
1312 char buf[16];
1313 snprintf(buf, sizeof(buf), "_%d", i);
1314 fault_fs[i]->SetFilesystemActive(true);
1315 delete db[i];
1316 if (getenv("KEEP_DB")) {
1317 printf("DB is still at %s%s\n", dbname_.c_str(), buf);
1318 } else {
1319 DestroyDB(dbname_ + std::string(buf), options[i]);
1320 }
1321 }
1322 options.clear();
1323 delete def_env;
1324}
1325
1326// When Put the KV-pair, the write option is set to disable WAL.
1327// If retryable error happens in this condition, map the bg error
1328// to soft error and trigger auto resume. During auto resume, SwitchMemtable
1329// is disabled to avoid small SST tables. Write can still be applied before
1330// the bg error is cleaned unless the memtable is full.
1331TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover1) {
1332 // Activate the FS before the first resume
1333 std::shared_ptr<ErrorHandlerFSListener> listener(
1334 new ErrorHandlerFSListener());
1335 Options options = GetDefaultOptions();
1336 options.env = fault_env_.get();
1337 options.create_if_missing = true;
1338 options.listeners.emplace_back(listener);
1339 options.max_bgerror_resume_count = 2;
1340 options.bgerror_resume_retry_interval = 100000; // 0.1 second
1341 Status s;
1342
1343 listener->EnableAutoRecovery(false);
1344 DestroyAndReopen(options);
1345
1346 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1347 error_msg.SetRetryable(true);
1348
1349 WriteOptions wo = WriteOptions();
1350 wo.disableWAL = true;
1351 ASSERT_OK(Put(Key(1), "val1", wo));
1352 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1353 {{"RecoverFromRetryableBGIOError:LoopOut",
1354 "FLushWritNoWALRetryableeErrorAutoRecover1:1"}});
1355 SyncPoint::GetInstance()->SetCallBack(
1356 "BuildTable:BeforeFinishBuildTable",
1357 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1358
1359 SyncPoint::GetInstance()->EnableProcessing();
1360 s = Flush();
1361 ASSERT_EQ("val1", Get(Key(1)));
1362 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1363 TEST_SYNC_POINT("FLushWritNoWALRetryableeErrorAutoRecover1:1");
1364 ASSERT_EQ("val1", Get(Key(1)));
1365 ASSERT_EQ("val1", Get(Key(1)));
1366 SyncPoint::GetInstance()->DisableProcessing();
1367 fault_fs_->SetFilesystemActive(true);
1368 ASSERT_OK(Put(Key(2), "val2", wo));
1369 s = Flush();
1370 // Since auto resume fails, the bg error is not cleand, flush will
1371 // return the bg_error set before.
1372 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1373 ASSERT_EQ("val2", Get(Key(2)));
1374
1375 // call auto resume
1376 s = dbfull()->Resume();
1377 ASSERT_EQ(s, Status::OK());
1378 ASSERT_OK(Put(Key(3), "val3", wo));
1379 s = Flush();
1380 // After resume is successful, the flush should be ok.
1381 ASSERT_EQ(s, Status::OK());
1382 ASSERT_EQ("val3", Get(Key(3)));
1383 Destroy(options);
1384}
1385
1386TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableeErrorAutoRecover2) {
1387 // Activate the FS before the first resume
1388 std::shared_ptr<ErrorHandlerFSListener> listener(
1389 new ErrorHandlerFSListener());
1390 Options options = GetDefaultOptions();
1391 options.env = fault_env_.get();
1392 options.create_if_missing = true;
1393 options.listeners.emplace_back(listener);
1394 options.max_bgerror_resume_count = 2;
1395 options.bgerror_resume_retry_interval = 100000; // 0.1 second
1396 Status s;
1397
1398 listener->EnableAutoRecovery(false);
1399 DestroyAndReopen(options);
1400
1401 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1402 error_msg.SetRetryable(true);
1403
1404 WriteOptions wo = WriteOptions();
1405 wo.disableWAL = true;
1406 ASSERT_OK(Put(Key(1), "val1", wo));
1407 SyncPoint::GetInstance()->SetCallBack(
1408 "BuildTable:BeforeFinishBuildTable",
1409 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1410
1411 SyncPoint::GetInstance()->EnableProcessing();
1412 s = Flush();
1413 ASSERT_EQ("val1", Get(Key(1)));
1414 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1415 SyncPoint::GetInstance()->DisableProcessing();
1416 fault_fs_->SetFilesystemActive(true);
1417 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
1418 ASSERT_EQ("val1", Get(Key(1)));
1419 ASSERT_OK(Put(Key(2), "val2", wo));
1420 s = Flush();
1421 // Since auto resume is successful, the bg error is cleaned, flush will
1422 // be successful.
1423 ASSERT_OK(s);
1424 ASSERT_EQ("val2", Get(Key(2)));
1425 Destroy(options);
1426}
1427
1428TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover1) {
1429 // Fail the first resume and make the second resume successful
1430 std::shared_ptr<ErrorHandlerFSListener> listener(
1431 new ErrorHandlerFSListener());
1432 Options options = GetDefaultOptions();
1433 options.env = fault_env_.get();
1434 options.create_if_missing = true;
1435 options.listeners.emplace_back(listener);
1436 options.max_bgerror_resume_count = 2;
1437 options.bgerror_resume_retry_interval = 100000; // 0.1 second
1438 Status s;
1439
1440 listener->EnableAutoRecovery(false);
1441 DestroyAndReopen(options);
1442
1443 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1444 error_msg.SetRetryable(true);
1445
1446 ASSERT_OK(Put(Key(1), "val1"));
1447 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1448 {{"RecoverFromRetryableBGIOError:BeforeWait0",
1449 "FLushWritRetryableeErrorAutoRecover1:0"},
1450 {"FLushWritRetryableeErrorAutoRecover1:1",
1451 "RecoverFromRetryableBGIOError:BeforeWait1"},
1452 {"RecoverFromRetryableBGIOError:RecoverSuccess",
1453 "FLushWritRetryableeErrorAutoRecover1:2"}});
1454 SyncPoint::GetInstance()->SetCallBack(
1455 "BuildTable:BeforeFinishBuildTable",
1456 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1457 SyncPoint::GetInstance()->EnableProcessing();
1458 s = Flush();
1459 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
1460 TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover1:0");
1461 fault_fs_->SetFilesystemActive(true);
1462 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1463 TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover1:1");
1464 TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover1:2");
1465 SyncPoint::GetInstance()->DisableProcessing();
1466
1467 ASSERT_EQ("val1", Get(Key(1)));
1468 Reopen(options);
1469 ASSERT_EQ("val1", Get(Key(1)));
1470 ASSERT_OK(Put(Key(2), "val2"));
1471 s = Flush();
1472 ASSERT_EQ(s, Status::OK());
1473 ASSERT_EQ("val2", Get(Key(2)));
1474
1475 Destroy(options);
1476}
1477
1478TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover2) {
1479 // Activate the FS before the first resume
1480 std::shared_ptr<ErrorHandlerFSListener> listener(
1481 new ErrorHandlerFSListener());
1482 Options options = GetDefaultOptions();
1483 options.env = fault_env_.get();
1484 options.create_if_missing = true;
1485 options.listeners.emplace_back(listener);
1486 options.max_bgerror_resume_count = 2;
1487 options.bgerror_resume_retry_interval = 100000; // 0.1 second
1488 Status s;
1489
1490 listener->EnableAutoRecovery(false);
1491 DestroyAndReopen(options);
1492
1493 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1494 error_msg.SetRetryable(true);
1495
1496 ASSERT_OK(Put(Key(1), "val1"));
1497 SyncPoint::GetInstance()->SetCallBack(
1498 "BuildTable:BeforeFinishBuildTable",
1499 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1500
1501 SyncPoint::GetInstance()->EnableProcessing();
1502 s = Flush();
1503 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
1504 SyncPoint::GetInstance()->DisableProcessing();
1505 fault_fs_->SetFilesystemActive(true);
1506 ASSERT_EQ(listener->WaitForRecovery(5000000), true);
1507
1508 ASSERT_EQ("val1", Get(Key(1)));
1509 Reopen(options);
1510 ASSERT_EQ("val1", Get(Key(1)));
1511 ASSERT_OK(Put(Key(2), "val2"));
1512 s = Flush();
1513 ASSERT_EQ(s, Status::OK());
1514 ASSERT_EQ("val2", Get(Key(2)));
1515
1516 Destroy(options);
1517}
1518
1519TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover3) {
1520 // Fail all the resume and let user to resume
1521 std::shared_ptr<ErrorHandlerFSListener> listener(
1522 new ErrorHandlerFSListener());
1523 Options options = GetDefaultOptions();
1524 options.env = fault_env_.get();
1525 options.create_if_missing = true;
1526 options.listeners.emplace_back(listener);
1527 options.max_bgerror_resume_count = 2;
1528 options.bgerror_resume_retry_interval = 100000; // 0.1 second
1529 Status s;
1530
1531 listener->EnableAutoRecovery(false);
1532 DestroyAndReopen(options);
1533
1534 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1535 error_msg.SetRetryable(true);
1536
1537 ASSERT_OK(Put(Key(1), "val1"));
1538 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1539 {{"FLushWritRetryableeErrorAutoRecover3:0",
1540 "RecoverFromRetryableBGIOError:BeforeStart"},
1541 {"RecoverFromRetryableBGIOError:LoopOut",
1542 "FLushWritRetryableeErrorAutoRecover3:1"}});
1543 SyncPoint::GetInstance()->SetCallBack(
1544 "BuildTable:BeforeFinishBuildTable",
1545 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1546 SyncPoint::GetInstance()->EnableProcessing();
1547 s = Flush();
1548 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
1549 TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover3:0");
1550 TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover3:1");
1551 fault_fs_->SetFilesystemActive(true);
1552 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1553 SyncPoint::GetInstance()->DisableProcessing();
1554
1555 ASSERT_EQ("val1", Get(Key(1)));
1556 // Auto resume fails due to FS does not recover during resume. User call
1557 // resume manually here.
1558 s = dbfull()->Resume();
1559 ASSERT_EQ("val1", Get(Key(1)));
1560 ASSERT_EQ(s, Status::OK());
1561 ASSERT_OK(Put(Key(2), "val2"));
1562 s = Flush();
1563 ASSERT_EQ(s, Status::OK());
1564 ASSERT_EQ("val2", Get(Key(2)));
1565
1566 Destroy(options);
1567}
1568
1569TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover4) {
1570 // Fail the first resume and does not do resume second time because
1571 // the IO error severity is Fatal Error and not Retryable.
1572 std::shared_ptr<ErrorHandlerFSListener> listener(
1573 new ErrorHandlerFSListener());
1574 Options options = GetDefaultOptions();
1575 options.env = fault_env_.get();
1576 options.create_if_missing = true;
1577 options.listeners.emplace_back(listener);
1578 options.max_bgerror_resume_count = 2;
1579 options.bgerror_resume_retry_interval = 10; // 0.1 second
1580 Status s;
1581
1582 listener->EnableAutoRecovery(false);
1583 DestroyAndReopen(options);
1584
1585 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1586 error_msg.SetRetryable(true);
1587 IOStatus nr_msg = IOStatus::IOError("No Retryable Fatal IO Error");
1588 nr_msg.SetRetryable(false);
1589
1590 ASSERT_OK(Put(Key(1), "val1"));
1591 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1592 {{"RecoverFromRetryableBGIOError:BeforeStart",
1593 "FLushWritRetryableeErrorAutoRecover4:0"},
1594 {"FLushWritRetryableeErrorAutoRecover4:2",
1595 "RecoverFromRetryableBGIOError:RecoverFail0"}});
1596 SyncPoint::GetInstance()->SetCallBack(
1597 "BuildTable:BeforeFinishBuildTable",
1598 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1599 SyncPoint::GetInstance()->SetCallBack(
1600 "RecoverFromRetryableBGIOError:BeforeResume1",
1601 [&](void*) { fault_fs_->SetFilesystemActive(false, nr_msg); });
1602
1603 SyncPoint::GetInstance()->EnableProcessing();
1604 s = Flush();
1605 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
1606 TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover4:0");
1607 TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover4:2");
1608 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1609 SyncPoint::GetInstance()->DisableProcessing();
1610 fault_fs_->SetFilesystemActive(true);
1611 // Even the FS is recoverd, due to the Fatal Error in bg_error_ the resume
1612 // and flush will all fail.
1613 ASSERT_EQ("val1", Get(Key(1)));
1614 s = dbfull()->Resume();
1615 ASSERT_NE(s, Status::OK());
1616 ASSERT_EQ("val1", Get(Key(1)));
1617 ASSERT_OK(Put(Key(2), "val2"));
1618 s = Flush();
1619 ASSERT_NE(s, Status::OK());
1620 ASSERT_EQ("NOT_FOUND", Get(Key(2)));
1621
1622 Reopen(options);
1623 ASSERT_EQ("val1", Get(Key(1)));
1624 ASSERT_OK(Put(Key(2), "val2"));
1625 s = Flush();
1626 ASSERT_EQ(s, Status::OK());
1627 ASSERT_EQ("val2", Get(Key(2)));
1628
1629 Destroy(options);
1630}
1631
1632TEST_F(DBErrorHandlingFSTest, DISABLED_FLushWritRetryableeErrorAutoRecover5) {
1633 // During the resume, call DB->CLose, make sure the resume thread exist
1634 // before close continues. Due to the shutdown, the resume is not successful
1635 // and the FS does not become active, so close status is still IO error
1636 std::shared_ptr<ErrorHandlerFSListener> listener(
1637 new ErrorHandlerFSListener());
1638 Options options = GetDefaultOptions();
1639 options.env = fault_env_.get();
1640 options.create_if_missing = true;
1641 options.listeners.emplace_back(listener);
1642 options.max_bgerror_resume_count = 2;
1643 options.bgerror_resume_retry_interval = 10; // 0.1 second
1644 Status s;
1645
1646 listener->EnableAutoRecovery(false);
1647 DestroyAndReopen(options);
1648
1649 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1650 error_msg.SetRetryable(true);
1651
1652 ASSERT_OK(Put(Key(1), "val1"));
1653 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1654 {{"RecoverFromRetryableBGIOError:BeforeStart",
1655 "FLushWritRetryableeErrorAutoRecover5:0"}});
1656 SyncPoint::GetInstance()->SetCallBack(
1657 "BuildTable:BeforeFinishBuildTable",
1658 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1659 SyncPoint::GetInstance()->EnableProcessing();
1660 s = Flush();
1661 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
1662 TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover5:0");
1663 // The first resume will cause recovery_error and its severity is the
1664 // Fatal error
1665 s = dbfull()->Close();
1666 ASSERT_NE(s, Status::OK());
1667 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1668 SyncPoint::GetInstance()->DisableProcessing();
1669 fault_fs_->SetFilesystemActive(true);
1670
1671 Reopen(options);
1672 ASSERT_NE("val1", Get(Key(1)));
1673 ASSERT_OK(Put(Key(2), "val2"));
1674 s = Flush();
1675 ASSERT_EQ(s, Status::OK());
1676 ASSERT_EQ("val2", Get(Key(2)));
1677
1678 Destroy(options);
1679}
1680
1681TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover6) {
1682 // During the resume, call DB->CLose, make sure the resume thread exist
1683 // before close continues. Due to the shutdown, the resume is not successful
1684 // and the FS does not become active, so close status is still IO error
1685 std::shared_ptr<ErrorHandlerFSListener> listener(
1686 new ErrorHandlerFSListener());
1687 Options options = GetDefaultOptions();
1688 options.env = fault_env_.get();
1689 options.create_if_missing = true;
1690 options.listeners.emplace_back(listener);
1691 options.max_bgerror_resume_count = 2;
1692 options.bgerror_resume_retry_interval = 10; // 0.1 second
1693 Status s;
1694
1695 listener->EnableAutoRecovery(false);
1696 DestroyAndReopen(options);
1697
1698 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1699 error_msg.SetRetryable(true);
1700
1701 ASSERT_OK(Put(Key(1), "val1"));
1702 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1703 {{"FLushWritRetryableeErrorAutoRecover6:0",
1704 "RecoverFromRetryableBGIOError:BeforeStart"},
1705 {"RecoverFromRetryableBGIOError:BeforeWait0",
1706 "FLushWritRetryableeErrorAutoRecover6:1"},
1707 {"FLushWritRetryableeErrorAutoRecover6:2",
1708 "RecoverFromRetryableBGIOError:BeforeWait1"},
1709 {"RecoverFromRetryableBGIOError:AfterWait0",
1710 "FLushWritRetryableeErrorAutoRecover6:3"}});
1711 SyncPoint::GetInstance()->SetCallBack(
1712 "BuildTable:BeforeFinishBuildTable",
1713 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1714 SyncPoint::GetInstance()->EnableProcessing();
1715 s = Flush();
1716 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
1717 TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:0");
1718 TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:1");
1719 fault_fs_->SetFilesystemActive(true);
1720 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1721 TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:2");
1722 TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:3");
1723 // The first resume will cause recovery_error and its severity is the
1724 // Fatal error
1725 s = dbfull()->Close();
1726 ASSERT_EQ(s, Status::OK());
1727 SyncPoint::GetInstance()->DisableProcessing();
1728
1729 Reopen(options);
1730 ASSERT_EQ("val1", Get(Key(1)));
1731 ASSERT_OK(Put(Key(2), "val2"));
1732 s = Flush();
1733 ASSERT_EQ(s, Status::OK());
1734 ASSERT_EQ("val2", Get(Key(2)));
1735
1736 Destroy(options);
1737}
1738
1739TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableErrorAutoRecover) {
1740 // Fail the first resume and let the second resume be successful
1741 std::shared_ptr<ErrorHandlerFSListener> listener(
1742 new ErrorHandlerFSListener());
1743 Options options = GetDefaultOptions();
1744 options.env = fault_env_.get();
1745 options.create_if_missing = true;
1746 options.listeners.emplace_back(listener);
1747 options.max_bgerror_resume_count = 2;
1748 options.bgerror_resume_retry_interval = 100000; // 0.1 second
1749 Status s;
1750 std::string old_manifest;
1751 std::string new_manifest;
1752
1753 listener->EnableAutoRecovery(false);
1754 DestroyAndReopen(options);
1755 old_manifest = GetManifestNameFromLiveFiles();
1756
1757 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1758 error_msg.SetRetryable(true);
1759
1760 ASSERT_OK(Put(Key(0), "val"));
1761 ASSERT_OK(Flush());
1762 ASSERT_OK(Put(Key(1), "val"));
1763 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1764 {{"RecoverFromRetryableBGIOError:BeforeStart",
1765 "ManifestWriteRetryableErrorAutoRecover:0"},
1766 {"ManifestWriteRetryableErrorAutoRecover:1",
1767 "RecoverFromRetryableBGIOError:BeforeWait1"},
1768 {"RecoverFromRetryableBGIOError:RecoverSuccess",
1769 "ManifestWriteRetryableErrorAutoRecover:2"}});
1770 SyncPoint::GetInstance()->SetCallBack(
1771 "VersionSet::LogAndApply:WriteManifest",
1772 [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
1773 SyncPoint::GetInstance()->EnableProcessing();
1774 s = Flush();
1775 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
1776 TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:0");
1777 fault_fs_->SetFilesystemActive(true);
1778 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
1779 TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:1");
1780 TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:2");
1781 SyncPoint::GetInstance()->DisableProcessing();
1782
1783 new_manifest = GetManifestNameFromLiveFiles();
1784 ASSERT_NE(new_manifest, old_manifest);
1785
1786 Reopen(options);
1787 ASSERT_EQ("val", Get(Key(0)));
1788 ASSERT_EQ("val", Get(Key(1)));
1789 Close();
1790}
1791
1792TEST_F(DBErrorHandlingFSTest,
1793 CompactionManifestWriteRetryableErrorAutoRecover) {
1794 std::shared_ptr<ErrorHandlerFSListener> listener(
1795 new ErrorHandlerFSListener());
1796 Options options = GetDefaultOptions();
1797 options.env = fault_env_.get();
1798 options.create_if_missing = true;
1799 options.level0_file_num_compaction_trigger = 2;
1800 options.listeners.emplace_back(listener);
1801 options.max_bgerror_resume_count = 2;
1802 options.bgerror_resume_retry_interval = 100000; // 0.1 second
1803 Status s;
1804 std::string old_manifest;
1805 std::string new_manifest;
1806 std::atomic<bool> fail_manifest(false);
1807 DestroyAndReopen(options);
1808 old_manifest = GetManifestNameFromLiveFiles();
1809
1810 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1811 error_msg.SetRetryable(true);
1812
1813 ASSERT_OK(Put(Key(0), "val"));
1814 ASSERT_OK(Put(Key(2), "val"));
1815 s = Flush();
1816 ASSERT_EQ(s, Status::OK());
1817
1818 listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
1819 listener->EnableAutoRecovery(false);
1820 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1821 // Wait for flush of 2nd L0 file before starting compaction
1822 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
1823 "BackgroundCallCompaction:0"},
1824 // Wait for compaction to detect manifest write error
1825 {"BackgroundCallCompaction:1", "CompactionManifestWriteErrorAR:0"},
1826 // Make compaction thread wait for error to be cleared
1827 {"CompactionManifestWriteErrorAR:1",
1828 "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"},
1829 {"CompactionManifestWriteErrorAR:2",
1830 "RecoverFromRetryableBGIOError:BeforeStart"},
1831 // Fail the first resume, before the wait in resume
1832 {"RecoverFromRetryableBGIOError:BeforeResume0",
1833 "CompactionManifestWriteErrorAR:3"},
1834 // Activate the FS before the second resume
1835 {"CompactionManifestWriteErrorAR:4",
1836 "RecoverFromRetryableBGIOError:BeforeResume1"},
1837 // Wait the auto resume be sucessful
1838 {"RecoverFromRetryableBGIOError:RecoverSuccess",
1839 "CompactionManifestWriteErrorAR:5"}});
1840 // trigger manifest write failure in compaction thread
1841 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1842 "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); });
1843 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1844 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
1845 if (fail_manifest.load()) {
1846 fault_fs_->SetFilesystemActive(false, error_msg);
1847 }
1848 });
1849 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1850
1851 ASSERT_OK(Put(Key(1), "val"));
1852 s = Flush();
1853 ASSERT_EQ(s, Status::OK());
1854
1855 TEST_SYNC_POINT("CompactionManifestWriteErrorAR:0");
1856 TEST_SYNC_POINT("CompactionManifestWriteErrorAR:1");
1857
1858 s = dbfull()->TEST_WaitForCompact();
1859 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
1860 TEST_SYNC_POINT("CompactionManifestWriteErrorAR:2");
1861 TEST_SYNC_POINT("CompactionManifestWriteErrorAR:3");
1862 fault_fs_->SetFilesystemActive(true);
1863 SyncPoint::GetInstance()->ClearAllCallBacks();
1864 TEST_SYNC_POINT("CompactionManifestWriteErrorAR:4");
1865 TEST_SYNC_POINT("CompactionManifestWriteErrorAR:5");
1866 SyncPoint::GetInstance()->DisableProcessing();
1867
1868 new_manifest = GetManifestNameFromLiveFiles();
1869 ASSERT_NE(new_manifest, old_manifest);
1870
1871 Reopen(options);
1872 ASSERT_EQ("val", Get(Key(0)));
1873 ASSERT_EQ("val", Get(Key(1)));
1874 ASSERT_EQ("val", Get(Key(2)));
1875 Close();
1876}
1877
1878TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableErrorAutoRecover) {
1879 // In this test, in the first round of compaction, the FS is set to error.
1880 // So the first compaction fails due to retryable IO error and it is mapped
1881 // to soft error. Then, compaction is rescheduled, in the second round of
1882 // compaction, the FS is set to active and compaction is successful, so
1883 // the test will hit the CompactionJob::FinishCompactionOutputFile1 sync
1884 // point.
1885 std::shared_ptr<ErrorHandlerFSListener> listener(
1886 new ErrorHandlerFSListener());
1887 Options options = GetDefaultOptions();
1888 options.env = fault_env_.get();
1889 options.create_if_missing = true;
1890 options.level0_file_num_compaction_trigger = 2;
1891 options.listeners.emplace_back(listener);
1892 Status s;
1893 std::atomic<bool> fail_first(false);
1894 std::atomic<bool> fail_second(true);
1895 DestroyAndReopen(options);
1896
1897 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1898 error_msg.SetRetryable(true);
1899
1900 ASSERT_OK(Put(Key(0), "va;"));
1901 ASSERT_OK(Put(Key(2), "va;"));
1902 s = Flush();
1903 ASSERT_EQ(s, Status::OK());
1904
1905 listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
1906 listener->EnableAutoRecovery(false);
1907 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1908 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
1909 "BackgroundCallCompaction:0"},
1910 {"CompactionJob::FinishCompactionOutputFile1",
1911 "CompactionWriteRetryableErrorAutoRecover0"}});
1912 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1913 "DBImpl::BackgroundCompaction:Start",
1914 [&](void*) { fault_fs_->SetFilesystemActive(true); });
1915 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1916 "BackgroundCallCompaction:0", [&](void*) { fail_first.store(true); });
1917 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1918 "CompactionJob::OpenCompactionOutputFile", [&](void*) {
1919 if (fail_first.load() && fail_second.load()) {
1920 fault_fs_->SetFilesystemActive(false, error_msg);
1921 fail_second.store(false);
1922 }
1923 });
1924 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1925
1926 ASSERT_OK(Put(Key(1), "val"));
1927 s = Flush();
1928 ASSERT_EQ(s, Status::OK());
1929
1930 s = dbfull()->TEST_WaitForCompact();
1931 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
1932
1933 TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0");
1934 SyncPoint::GetInstance()->ClearAllCallBacks();
1935 SyncPoint::GetInstance()->DisableProcessing();
1936 Destroy(options);
1937}
1938
1939TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover1) {
1940 std::shared_ptr<ErrorHandlerFSListener> listener(
1941 new ErrorHandlerFSListener());
1942 Options options = GetDefaultOptions();
1943 options.env = fault_env_.get();
1944 options.create_if_missing = true;
1945 options.writable_file_max_buffer_size = 32768;
1946 options.listeners.emplace_back(listener);
1947 options.paranoid_checks = true;
1948 options.max_bgerror_resume_count = 2;
1949 options.bgerror_resume_retry_interval = 100000; // 0.1 second
1950 Status s;
1951 Random rnd(301);
1952
1953 DestroyAndReopen(options);
1954
1955 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
1956 error_msg.SetRetryable(true);
1957
1958 // For the first batch, write is successful, require sync
1959 {
1960 WriteBatch batch;
1961
1962 for (auto i = 0; i < 100; ++i) {
1963 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
1964 }
1965
1966 WriteOptions wopts;
1967 wopts.sync = true;
1968 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
1969 };
1970
1971 // For the second batch, the first 2 file Append are successful, then the
1972 // following Append fails due to file system retryable IOError.
1973 {
1974 WriteBatch batch;
1975 int write_error = 0;
1976
1977 for (auto i = 100; i < 200; ++i) {
1978 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
1979 }
1980 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
1981 {{"RecoverFromRetryableBGIOError:BeforeResume0", "WALWriteError1:0"},
1982 {"WALWriteError1:1", "RecoverFromRetryableBGIOError:BeforeResume1"},
1983 {"RecoverFromRetryableBGIOError:RecoverSuccess", "WALWriteError1:2"}});
1984
1985 SyncPoint::GetInstance()->SetCallBack(
1986 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
1987 write_error++;
1988 if (write_error > 2) {
1989 fault_fs_->SetFilesystemActive(false, error_msg);
1990 }
1991 });
1992 SyncPoint::GetInstance()->EnableProcessing();
1993 WriteOptions wopts;
1994 wopts.sync = true;
1995 s = dbfull()->Write(wopts, &batch);
1996 ASSERT_EQ(true, s.IsIOError());
1997
1998 TEST_SYNC_POINT("WALWriteError1:0");
1999 fault_fs_->SetFilesystemActive(true);
2000 SyncPoint::GetInstance()->ClearAllCallBacks();
2001 TEST_SYNC_POINT("WALWriteError1:1");
2002 TEST_SYNC_POINT("WALWriteError1:2");
2003 }
2004 SyncPoint::GetInstance()->DisableProcessing();
2005
2006 // Data in corrupted WAL are not stored
2007 for (auto i = 0; i < 199; ++i) {
2008 if (i < 100) {
2009 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
2010 } else {
2011 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
2012 }
2013 }
2014
2015 // Resume and write a new batch, should be in the WAL
2016 {
2017 WriteBatch batch;
2018
2019 for (auto i = 200; i < 300; ++i) {
2020 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2021 }
2022
2023 WriteOptions wopts;
2024 wopts.sync = true;
2025 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
2026 };
2027
2028 Reopen(options);
2029 for (auto i = 0; i < 300; ++i) {
2030 if (i < 100 || i >= 200) {
2031 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
2032 } else {
2033 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
2034 }
2035 }
2036 Close();
2037}
2038
2039TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover2) {
2040 // Fail the first recover and try second time.
2041 std::shared_ptr<ErrorHandlerFSListener> listener(
2042 new ErrorHandlerFSListener());
2043 Options options = GetDefaultOptions();
2044 options.env = fault_env_.get();
2045 options.create_if_missing = true;
2046 options.writable_file_max_buffer_size = 32768;
2047 options.listeners.emplace_back(listener);
2048 options.paranoid_checks = true;
2049 options.max_bgerror_resume_count = 2;
2050 options.bgerror_resume_retry_interval = 100000; // 0.1 second
2051 Status s;
2052 Random rnd(301);
2053
2054 DestroyAndReopen(options);
2055
2056 IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
2057 error_msg.SetRetryable(true);
2058
2059 // For the first batch, write is successful, require sync
2060 {
2061 WriteBatch batch;
2062
2063 for (auto i = 0; i < 100; ++i) {
2064 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2065 }
2066
2067 WriteOptions wopts;
2068 wopts.sync = true;
2069 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
2070 };
2071
2072 // For the second batch, the first 2 file Append are successful, then the
2073 // following Append fails due to file system retryable IOError.
2074 {
2075 WriteBatch batch;
2076 int write_error = 0;
2077
2078 for (auto i = 100; i < 200; ++i) {
2079 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2080 }
2081 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2082 {{"RecoverFromRetryableBGIOError:BeforeWait0", "WALWriteError2:0"},
2083 {"WALWriteError2:1", "RecoverFromRetryableBGIOError:BeforeWait1"},
2084 {"RecoverFromRetryableBGIOError:RecoverSuccess", "WALWriteError2:2"}});
2085
2086 SyncPoint::GetInstance()->SetCallBack(
2087 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
2088 write_error++;
2089 if (write_error > 2) {
2090 fault_fs_->SetFilesystemActive(false, error_msg);
2091 }
2092 });
2093 SyncPoint::GetInstance()->EnableProcessing();
2094 WriteOptions wopts;
2095 wopts.sync = true;
2096 s = dbfull()->Write(wopts, &batch);
2097 ASSERT_EQ(true, s.IsIOError());
2098
2099 TEST_SYNC_POINT("WALWriteError2:0");
2100 fault_fs_->SetFilesystemActive(true);
2101 SyncPoint::GetInstance()->ClearAllCallBacks();
2102 TEST_SYNC_POINT("WALWriteError2:1");
2103 TEST_SYNC_POINT("WALWriteError2:2");
2104 }
2105 SyncPoint::GetInstance()->DisableProcessing();
2106
2107 // Data in corrupted WAL are not stored
2108 for (auto i = 0; i < 199; ++i) {
2109 if (i < 100) {
2110 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
2111 } else {
2112 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
2113 }
2114 }
2115
2116 // Resume and write a new batch, should be in the WAL
2117 {
2118 WriteBatch batch;
2119
2120 for (auto i = 200; i < 300; ++i) {
2121 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2122 }
2123
2124 WriteOptions wopts;
2125 wopts.sync = true;
2126 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
2127 };
2128
2129 Reopen(options);
2130 for (auto i = 0; i < 300; ++i) {
2131 if (i < 100 || i >= 200) {
2132 ASSERT_NE(Get(Key(i)), "NOT_FOUND");
2133 } else {
2134 ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
2135 }
2136 }
2137 Close();
2138}
2139
2140class DBErrorHandlingFencingTest : public DBErrorHandlingFSTest,
2141 public testing::WithParamInterface<bool> {};
2142
2143TEST_P(DBErrorHandlingFencingTest, FLushWriteFenced) {
2144 std::shared_ptr<ErrorHandlerFSListener> listener(
2145 new ErrorHandlerFSListener());
2146 Options options = GetDefaultOptions();
2147 options.env = fault_env_.get();
2148 options.create_if_missing = true;
2149 options.listeners.emplace_back(listener);
2150 options.paranoid_checks = GetParam();
2151 Status s;
2152
2153 listener->EnableAutoRecovery(true);
2154 DestroyAndReopen(options);
2155
2156 ASSERT_OK(Put(Key(0), "val"));
2157 SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
2158 fault_fs_->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
2159 });
2160 SyncPoint::GetInstance()->EnableProcessing();
2161 s = Flush();
2162 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
2163 ASSERT_TRUE(s.IsIOFenced());
2164 SyncPoint::GetInstance()->DisableProcessing();
2165 fault_fs_->SetFilesystemActive(true);
2166 s = dbfull()->Resume();
2167 ASSERT_TRUE(s.IsIOFenced());
2168 Destroy(options);
2169}
2170
2171TEST_P(DBErrorHandlingFencingTest, ManifestWriteFenced) {
2172 std::shared_ptr<ErrorHandlerFSListener> listener(
2173 new ErrorHandlerFSListener());
2174 Options options = GetDefaultOptions();
2175 options.env = fault_env_.get();
2176 options.create_if_missing = true;
2177 options.listeners.emplace_back(listener);
2178 options.paranoid_checks = GetParam();
2179 Status s;
2180 std::string old_manifest;
2181 std::string new_manifest;
2182
2183 listener->EnableAutoRecovery(true);
2184 DestroyAndReopen(options);
2185 old_manifest = GetManifestNameFromLiveFiles();
2186
2187 ASSERT_OK(Put(Key(0), "val"));
2188 Flush();
2189 ASSERT_OK(Put(Key(1), "val"));
2190 SyncPoint::GetInstance()->SetCallBack(
2191 "VersionSet::LogAndApply:WriteManifest", [&](void*) {
2192 fault_fs_->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
2193 });
2194 SyncPoint::GetInstance()->EnableProcessing();
2195 s = Flush();
2196 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
2197 ASSERT_TRUE(s.IsIOFenced());
2198 SyncPoint::GetInstance()->ClearAllCallBacks();
2199 SyncPoint::GetInstance()->DisableProcessing();
2200 fault_fs_->SetFilesystemActive(true);
2201 s = dbfull()->Resume();
2202 ASSERT_TRUE(s.IsIOFenced());
2203 Close();
2204}
2205
2206TEST_P(DBErrorHandlingFencingTest, CompactionWriteFenced) {
2207 std::shared_ptr<ErrorHandlerFSListener> listener(
2208 new ErrorHandlerFSListener());
2209 Options options = GetDefaultOptions();
2210 options.env = fault_env_.get();
2211 options.create_if_missing = true;
2212 options.level0_file_num_compaction_trigger = 2;
2213 options.listeners.emplace_back(listener);
2214 options.paranoid_checks = GetParam();
2215 Status s;
2216 DestroyAndReopen(options);
2217
2218 ASSERT_OK(Put(Key(0), "va;"));
2219 ASSERT_OK(Put(Key(2), "va;"));
2220 s = Flush();
2221 ASSERT_EQ(s, Status::OK());
2222
2223 listener->EnableAutoRecovery(true);
2224 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
2225 {{"DBImpl::FlushMemTable:FlushMemTableFinished",
2226 "BackgroundCallCompaction:0"}});
2227 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
2228 "BackgroundCallCompaction:0", [&](void*) {
2229 fault_fs_->SetFilesystemActive(false, IOStatus::IOFenced("IO fenced"));
2230 });
2231 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
2232
2233 ASSERT_OK(Put(Key(1), "val"));
2234 s = Flush();
2235 ASSERT_EQ(s, Status::OK());
2236
2237 s = dbfull()->TEST_WaitForCompact();
2238 ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
2239 ASSERT_TRUE(s.IsIOFenced());
2240
2241 fault_fs_->SetFilesystemActive(true);
2242 s = dbfull()->Resume();
2243 ASSERT_TRUE(s.IsIOFenced());
2244 Destroy(options);
2245}
2246
2247TEST_P(DBErrorHandlingFencingTest, WALWriteFenced) {
2248 std::shared_ptr<ErrorHandlerFSListener> listener(
2249 new ErrorHandlerFSListener());
2250 Options options = GetDefaultOptions();
2251 options.env = fault_env_.get();
2252 options.create_if_missing = true;
2253 options.writable_file_max_buffer_size = 32768;
2254 options.listeners.emplace_back(listener);
2255 options.paranoid_checks = GetParam();
2256 Status s;
2257 Random rnd(301);
2258
2259 listener->EnableAutoRecovery(true);
2260 DestroyAndReopen(options);
2261
2262 {
2263 WriteBatch batch;
2264
2265 for (auto i = 0; i < 100; ++i) {
2266 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2267 }
2268
2269 WriteOptions wopts;
2270 wopts.sync = true;
2271 ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
2272 };
2273
2274 {
2275 WriteBatch batch;
2276 int write_error = 0;
2277
2278 for (auto i = 100; i < 199; ++i) {
2279 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2280 }
2281
2282 SyncPoint::GetInstance()->SetCallBack(
2283 "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
2284 write_error++;
2285 if (write_error > 2) {
2286 fault_fs_->SetFilesystemActive(false,
2287 IOStatus::IOFenced("IO fenced"));
2288 }
2289 });
2290 SyncPoint::GetInstance()->EnableProcessing();
2291 WriteOptions wopts;
2292 wopts.sync = true;
2293 s = dbfull()->Write(wopts, &batch);
2294 ASSERT_TRUE(s.IsIOFenced());
2295 }
2296 SyncPoint::GetInstance()->DisableProcessing();
2297 fault_fs_->SetFilesystemActive(true);
2298 {
2299 WriteBatch batch;
2300
2301 for (auto i = 0; i < 100; ++i) {
2302 ASSERT_OK(batch.Put(Key(i), rnd.RandomString(1024)));
2303 }
2304
2305 WriteOptions wopts;
2306 wopts.sync = true;
2307 s = dbfull()->Write(wopts, &batch);
2308 ASSERT_TRUE(s.IsIOFenced());
2309 }
2310 Close();
2311}
2312
2313INSTANTIATE_TEST_CASE_P(DBErrorHandlingFSTest, DBErrorHandlingFencingTest,
2314 ::testing::Bool());
2315
2316} // namespace ROCKSDB_NAMESPACE
2317
2318int main(int argc, char** argv) {
2319 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
2320 ::testing::InitGoogleTest(&argc, argv);
2321 return RUN_ALL_TESTS();
2322}
2323
2324#else
2325#include <stdio.h>
2326
2327int main(int /*argc*/, char** /*argv*/) {
2328 fprintf(stderr, "SKIPPED as Cuckoo table is not supported in ROCKSDB_LITE\n");
2329 return 0;
2330}
2331
2332#endif // ROCKSDB_LITE