]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/corruption_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / corruption_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #ifndef ROCKSDB_LITE
11
12 #include "rocksdb/db.h"
13
14 #include <errno.h>
15 #include <fcntl.h>
16 #include <inttypes.h>
17 #include <sys/stat.h>
18 #include <sys/types.h>
19 #include "db/db_impl.h"
20 #include "db/log_format.h"
21 #include "db/version_set.h"
22 #include "rocksdb/cache.h"
23 #include "rocksdb/convenience.h"
24 #include "rocksdb/env.h"
25 #include "rocksdb/table.h"
26 #include "rocksdb/write_batch.h"
27 #include "util/filename.h"
28 #include "util/string_util.h"
29 #include "util/testharness.h"
30 #include "util/testutil.h"
31
32 namespace rocksdb {
33
34 static const int kValueSize = 1000;
35
36 class CorruptionTest : public testing::Test {
37 public:
38 test::ErrorEnv env_;
39 std::string dbname_;
40 shared_ptr<Cache> tiny_cache_;
41 Options options_;
42 DB* db_;
43
44 CorruptionTest() {
45 // If LRU cache shard bit is smaller than 2 (or -1 which will automatically
46 // set it to 0), test SequenceNumberRecovery will fail, likely because of a
47 // bug in recovery code. Keep it 4 for now to make the test passes.
48 tiny_cache_ = NewLRUCache(100, 4);
49 options_.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
50 options_.env = &env_;
51 dbname_ = test::PerThreadDBPath("corruption_test");
52 DestroyDB(dbname_, options_);
53
54 db_ = nullptr;
55 options_.create_if_missing = true;
56 BlockBasedTableOptions table_options;
57 table_options.block_size_deviation = 0; // make unit test pass for now
58 options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
59 Reopen();
60 options_.create_if_missing = false;
61 }
62
63 ~CorruptionTest() {
64 delete db_;
65 DestroyDB(dbname_, Options());
66 }
67
68 void CloseDb() {
69 delete db_;
70 db_ = nullptr;
71 }
72
73 Status TryReopen(Options* options = nullptr) {
74 delete db_;
75 db_ = nullptr;
76 Options opt = (options ? *options : options_);
77 opt.env = &env_;
78 opt.arena_block_size = 4096;
79 BlockBasedTableOptions table_options;
80 table_options.block_cache = tiny_cache_;
81 table_options.block_size_deviation = 0;
82 opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
83 return DB::Open(opt, dbname_, &db_);
84 }
85
86 void Reopen(Options* options = nullptr) {
87 ASSERT_OK(TryReopen(options));
88 }
89
90 void RepairDB() {
91 delete db_;
92 db_ = nullptr;
93 ASSERT_OK(::rocksdb::RepairDB(dbname_, options_));
94 }
95
96 void Build(int n, int flush_every = 0) {
97 std::string key_space, value_space;
98 WriteBatch batch;
99 for (int i = 0; i < n; i++) {
100 if (flush_every != 0 && i != 0 && i % flush_every == 0) {
101 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
102 dbi->TEST_FlushMemTable();
103 }
104 //if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n);
105 Slice key = Key(i, &key_space);
106 batch.Clear();
107 batch.Put(key, Value(i, &value_space));
108 ASSERT_OK(db_->Write(WriteOptions(), &batch));
109 }
110 }
111
112 void Check(int min_expected, int max_expected) {
113 uint64_t next_expected = 0;
114 uint64_t missed = 0;
115 int bad_keys = 0;
116 int bad_values = 0;
117 int correct = 0;
118 std::string value_space;
119 // Do not verify checksums. If we verify checksums then the
120 // db itself will raise errors because data is corrupted.
121 // Instead, we want the reads to be successful and this test
122 // will detect whether the appropriate corruptions have
123 // occurred.
124 Iterator* iter = db_->NewIterator(ReadOptions(false, true));
125 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
126 uint64_t key;
127 Slice in(iter->key());
128 if (!ConsumeDecimalNumber(&in, &key) ||
129 !in.empty() ||
130 key < next_expected) {
131 bad_keys++;
132 continue;
133 }
134 missed += (key - next_expected);
135 next_expected = key + 1;
136 if (iter->value() != Value(static_cast<int>(key), &value_space)) {
137 bad_values++;
138 } else {
139 correct++;
140 }
141 }
142 delete iter;
143
144 fprintf(stderr,
145 "expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%llu\n",
146 min_expected, max_expected, correct, bad_keys, bad_values,
147 static_cast<unsigned long long>(missed));
148 ASSERT_LE(min_expected, correct);
149 ASSERT_GE(max_expected, correct);
150 }
151
152 void CorruptFile(const std::string& fname, int offset, int bytes_to_corrupt) {
153 struct stat sbuf;
154 if (stat(fname.c_str(), &sbuf) != 0) {
155 const char* msg = strerror(errno);
156 FAIL() << fname << ": " << msg;
157 }
158
159 if (offset < 0) {
160 // Relative to end of file; make it absolute
161 if (-offset > sbuf.st_size) {
162 offset = 0;
163 } else {
164 offset = static_cast<int>(sbuf.st_size + offset);
165 }
166 }
167 if (offset > sbuf.st_size) {
168 offset = static_cast<int>(sbuf.st_size);
169 }
170 if (offset + bytes_to_corrupt > sbuf.st_size) {
171 bytes_to_corrupt = static_cast<int>(sbuf.st_size - offset);
172 }
173
174 // Do it
175 std::string contents;
176 Status s = ReadFileToString(Env::Default(), fname, &contents);
177 ASSERT_TRUE(s.ok()) << s.ToString();
178 for (int i = 0; i < bytes_to_corrupt; i++) {
179 contents[i + offset] ^= 0x80;
180 }
181 s = WriteStringToFile(Env::Default(), contents, fname);
182 ASSERT_TRUE(s.ok()) << s.ToString();
183 Options options;
184 EnvOptions env_options;
185 ASSERT_NOK(VerifySstFileChecksum(options, env_options, fname));
186 }
187
188 void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) {
189 // Pick file to corrupt
190 std::vector<std::string> filenames;
191 ASSERT_OK(env_.GetChildren(dbname_, &filenames));
192 uint64_t number;
193 FileType type;
194 std::string fname;
195 int picked_number = -1;
196 for (size_t i = 0; i < filenames.size(); i++) {
197 if (ParseFileName(filenames[i], &number, &type) &&
198 type == filetype &&
199 static_cast<int>(number) > picked_number) { // Pick latest file
200 fname = dbname_ + "/" + filenames[i];
201 picked_number = static_cast<int>(number);
202 }
203 }
204 ASSERT_TRUE(!fname.empty()) << filetype;
205
206 CorruptFile(fname, offset, bytes_to_corrupt);
207 }
208
209 // corrupts exactly one file at level `level`. if no file found at level,
210 // asserts
211 void CorruptTableFileAtLevel(int level, int offset, int bytes_to_corrupt) {
212 std::vector<LiveFileMetaData> metadata;
213 db_->GetLiveFilesMetaData(&metadata);
214 for (const auto& m : metadata) {
215 if (m.level == level) {
216 CorruptFile(dbname_ + "/" + m.name, offset, bytes_to_corrupt);
217 return;
218 }
219 }
220 FAIL() << "no file found at level";
221 }
222
223
224 int Property(const std::string& name) {
225 std::string property;
226 int result;
227 if (db_->GetProperty(name, &property) &&
228 sscanf(property.c_str(), "%d", &result) == 1) {
229 return result;
230 } else {
231 return -1;
232 }
233 }
234
235 // Return the ith key
236 Slice Key(int i, std::string* storage) {
237 char buf[100];
238 snprintf(buf, sizeof(buf), "%016d", i);
239 storage->assign(buf, strlen(buf));
240 return Slice(*storage);
241 }
242
243 // Return the value to associate with the specified key
244 Slice Value(int k, std::string* storage) {
245 if (k == 0) {
246 // Ugh. Random seed of 0 used to produce no entropy. This code
247 // preserves the implementation that was in place when all of the
248 // magic values in this file were picked.
249 *storage = std::string(kValueSize, ' ');
250 return Slice(*storage);
251 } else {
252 Random r(k);
253 return test::RandomString(&r, kValueSize, storage);
254 }
255 }
256 };
257
258 TEST_F(CorruptionTest, Recovery) {
259 Build(100);
260 Check(100, 100);
261 #ifdef OS_WIN
262 // On Wndows OS Disk cache does not behave properly
263 // We do not call FlushBuffers on every Flush. If we do not close
264 // the log file prior to the corruption we end up with the first
265 // block not corrupted but only the second. However, under the debugger
266 // things work just fine but never pass when running normally
267 // For that reason people may want to run with unbuffered I/O. That option
268 // is not available for WAL though.
269 CloseDb();
270 #endif
271 Corrupt(kLogFile, 19, 1); // WriteBatch tag for first record
272 Corrupt(kLogFile, log::kBlockSize + 1000, 1); // Somewhere in second block
273 ASSERT_TRUE(!TryReopen().ok());
274 options_.paranoid_checks = false;
275 Reopen(&options_);
276
277 // The 64 records in the first two log blocks are completely lost.
278 Check(36, 36);
279 }
280
281 TEST_F(CorruptionTest, RecoverWriteError) {
282 env_.writable_file_error_ = true;
283 Status s = TryReopen();
284 ASSERT_TRUE(!s.ok());
285 }
286
287 TEST_F(CorruptionTest, NewFileErrorDuringWrite) {
288 // Do enough writing to force minor compaction
289 env_.writable_file_error_ = true;
290 const int num =
291 static_cast<int>(3 + (Options().write_buffer_size / kValueSize));
292 std::string value_storage;
293 Status s;
294 bool failed = false;
295 for (int i = 0; i < num; i++) {
296 WriteBatch batch;
297 batch.Put("a", Value(100, &value_storage));
298 s = db_->Write(WriteOptions(), &batch);
299 if (!s.ok()) {
300 failed = true;
301 }
302 ASSERT_TRUE(!failed || !s.ok());
303 }
304 ASSERT_TRUE(!s.ok());
305 ASSERT_GE(env_.num_writable_file_errors_, 1);
306 env_.writable_file_error_ = false;
307 Reopen();
308 }
309
310 TEST_F(CorruptionTest, TableFile) {
311 Build(100);
312 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
313 dbi->TEST_FlushMemTable();
314 dbi->TEST_CompactRange(0, nullptr, nullptr);
315 dbi->TEST_CompactRange(1, nullptr, nullptr);
316
317 Corrupt(kTableFile, 100, 1);
318 Check(99, 99);
319 ASSERT_NOK(dbi->VerifyChecksum());
320 }
321
322 TEST_F(CorruptionTest, TableFileIndexData) {
323 Options options;
324 // very big, we'll trigger flushes manually
325 options.write_buffer_size = 100 * 1024 * 1024;
326 Reopen(&options);
327 // build 2 tables, flush at 5000
328 Build(10000, 5000);
329 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
330 dbi->TEST_FlushMemTable();
331
332 // corrupt an index block of an entire file
333 Corrupt(kTableFile, -2000, 500);
334 Reopen();
335 dbi = reinterpret_cast<DBImpl*>(db_);
336 // one full file may be readable, since only one was corrupted
337 // the other file should be fully non-readable, since index was corrupted
338 Check(0, 5000);
339 ASSERT_NOK(dbi->VerifyChecksum());
340 }
341
342 TEST_F(CorruptionTest, MissingDescriptor) {
343 Build(1000);
344 RepairDB();
345 Reopen();
346 Check(1000, 1000);
347 }
348
349 TEST_F(CorruptionTest, SequenceNumberRecovery) {
350 ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1"));
351 ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2"));
352 ASSERT_OK(db_->Put(WriteOptions(), "foo", "v3"));
353 ASSERT_OK(db_->Put(WriteOptions(), "foo", "v4"));
354 ASSERT_OK(db_->Put(WriteOptions(), "foo", "v5"));
355 RepairDB();
356 Reopen();
357 std::string v;
358 ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
359 ASSERT_EQ("v5", v);
360 // Write something. If sequence number was not recovered properly,
361 // it will be hidden by an earlier write.
362 ASSERT_OK(db_->Put(WriteOptions(), "foo", "v6"));
363 ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
364 ASSERT_EQ("v6", v);
365 Reopen();
366 ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
367 ASSERT_EQ("v6", v);
368 }
369
370 TEST_F(CorruptionTest, CorruptedDescriptor) {
371 ASSERT_OK(db_->Put(WriteOptions(), "foo", "hello"));
372 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
373 dbi->TEST_FlushMemTable();
374 dbi->TEST_CompactRange(0, nullptr, nullptr);
375
376 Corrupt(kDescriptorFile, 0, 1000);
377 Status s = TryReopen();
378 ASSERT_TRUE(!s.ok());
379
380 RepairDB();
381 Reopen();
382 std::string v;
383 ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
384 ASSERT_EQ("hello", v);
385 }
386
387 TEST_F(CorruptionTest, CompactionInputError) {
388 Options options;
389 Reopen(&options);
390 Build(10);
391 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
392 dbi->TEST_FlushMemTable();
393 dbi->TEST_CompactRange(0, nullptr, nullptr);
394 dbi->TEST_CompactRange(1, nullptr, nullptr);
395 ASSERT_EQ(1, Property("rocksdb.num-files-at-level2"));
396
397 Corrupt(kTableFile, 100, 1);
398 Check(9, 9);
399 ASSERT_NOK(dbi->VerifyChecksum());
400
401 // Force compactions by writing lots of values
402 Build(10000);
403 Check(10000, 10000);
404 ASSERT_NOK(dbi->VerifyChecksum());
405 }
406
407 TEST_F(CorruptionTest, CompactionInputErrorParanoid) {
408 Options options;
409 options.paranoid_checks = true;
410 options.write_buffer_size = 131072;
411 options.max_write_buffer_number = 2;
412 Reopen(&options);
413 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
414
415 // Fill levels >= 1
416 for (int level = 1; level < dbi->NumberLevels(); level++) {
417 dbi->Put(WriteOptions(), "", "begin");
418 dbi->Put(WriteOptions(), "~", "end");
419 dbi->TEST_FlushMemTable();
420 for (int comp_level = 0; comp_level < dbi->NumberLevels() - level;
421 ++comp_level) {
422 dbi->TEST_CompactRange(comp_level, nullptr, nullptr);
423 }
424 }
425
426 Reopen(&options);
427
428 dbi = reinterpret_cast<DBImpl*>(db_);
429 Build(10);
430 dbi->TEST_FlushMemTable();
431 dbi->TEST_WaitForCompact();
432 ASSERT_EQ(1, Property("rocksdb.num-files-at-level0"));
433
434 CorruptTableFileAtLevel(0, 100, 1);
435 Check(9, 9);
436 ASSERT_NOK(dbi->VerifyChecksum());
437
438 // Write must eventually fail because of corrupted table
439 Status s;
440 std::string tmp1, tmp2;
441 bool failed = false;
442 for (int i = 0; i < 10000; i++) {
443 s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2));
444 if (!s.ok()) {
445 failed = true;
446 }
447 // if one write failed, every subsequent write must fail, too
448 ASSERT_TRUE(!failed || !s.ok()) << "write did not fail in a corrupted db";
449 }
450 ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
451 }
452
453 TEST_F(CorruptionTest, UnrelatedKeys) {
454 Build(10);
455 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
456 dbi->TEST_FlushMemTable();
457 Corrupt(kTableFile, 100, 1);
458 ASSERT_NOK(dbi->VerifyChecksum());
459
460 std::string tmp1, tmp2;
461 ASSERT_OK(db_->Put(WriteOptions(), Key(1000, &tmp1), Value(1000, &tmp2)));
462 std::string v;
463 ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
464 ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
465 dbi->TEST_FlushMemTable();
466 ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
467 ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
468 }
469
470 TEST_F(CorruptionTest, FileSystemStateCorrupted) {
471 for (int iter = 0; iter < 2; ++iter) {
472 Options options;
473 options.paranoid_checks = true;
474 options.create_if_missing = true;
475 Reopen(&options);
476 Build(10);
477 ASSERT_OK(db_->Flush(FlushOptions()));
478 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
479 std::vector<LiveFileMetaData> metadata;
480 dbi->GetLiveFilesMetaData(&metadata);
481 ASSERT_GT(metadata.size(), size_t(0));
482 std::string filename = dbname_ + metadata[0].name;
483
484 delete db_;
485 db_ = nullptr;
486
487 if (iter == 0) { // corrupt file size
488 unique_ptr<WritableFile> file;
489 env_.NewWritableFile(filename, &file, EnvOptions());
490 file->Append(Slice("corrupted sst"));
491 file.reset();
492 } else { // delete the file
493 env_.DeleteFile(filename);
494 }
495
496 Status x = TryReopen(&options);
497 ASSERT_TRUE(x.IsCorruption());
498 DestroyDB(dbname_, options_);
499 Reopen(&options);
500 }
501 }
502
503 } // namespace rocksdb
504
505 int main(int argc, char** argv) {
506 ::testing::InitGoogleTest(&argc, argv);
507 return RUN_ALL_TESTS();
508 }
509
510 #else
511 #include <stdio.h>
512
513 int main(int /*argc*/, char** /*argv*/) {
514 fprintf(stderr, "SKIPPED as RepairDB() is not supported in ROCKSDB_LITE\n");
515 return 0;
516 }
517
518 #endif // !ROCKSDB_LITE