]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_kv_checksum_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_kv_checksum_test.cc
CommitLineData
1e59de90
TL
1// Copyright (c) 2020-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#include "db/blob/blob_index.h"
7#include "db/db_test_util.h"
8#include "rocksdb/rocksdb_namespace.h"
9
10namespace ROCKSDB_NAMESPACE {
11
12enum class WriteBatchOpType {
13 kPut = 0,
14 kDelete,
15 kSingleDelete,
16 kMerge,
17 kPutEntity,
18 kDeleteRange,
19 kNum,
20};
21
22// Integer addition is needed for `::testing::Range()` to take the enum type.
23WriteBatchOpType operator+(WriteBatchOpType lhs, const int rhs) {
24 using T = std::underlying_type<WriteBatchOpType>::type;
25 return static_cast<WriteBatchOpType>(static_cast<T>(lhs) + rhs);
26}
27
28enum class WriteMode {
29 // `Write()` a `WriteBatch` constructed with `protection_bytes_per_key = 0`
30 // and `WriteOptions::protection_bytes_per_key = 0`
31 kWriteUnprotectedBatch = 0,
32 // `Write()` a `WriteBatch` constructed with `protection_bytes_per_key > 0`.
33 kWriteProtectedBatch,
34 // `Write()` a `WriteBatch` constructed with `protection_bytes_per_key == 0`.
35 // Protection is enabled via `WriteOptions::protection_bytes_per_key > 0`.
36 kWriteOptionProtectedBatch,
37 // TODO(ajkr): add a mode that uses `Write()` wrappers, e.g., `Put()`.
38 kNum,
39};
40
41// Integer addition is needed for `::testing::Range()` to take the enum type.
42WriteMode operator+(WriteMode lhs, const int rhs) {
43 using T = std::underlying_type<WriteMode>::type;
44 return static_cast<WriteMode>(static_cast<T>(lhs) + rhs);
45}
46
47std::pair<WriteBatch, Status> GetWriteBatch(ColumnFamilyHandle* cf_handle,
48 size_t protection_bytes_per_key,
49 WriteBatchOpType op_type) {
50 Status s;
51 WriteBatch wb(0 /* reserved_bytes */, 0 /* max_bytes */,
52 protection_bytes_per_key, 0 /* default_cf_ts_sz */);
53 switch (op_type) {
54 case WriteBatchOpType::kPut:
55 s = wb.Put(cf_handle, "key", "val");
56 break;
57 case WriteBatchOpType::kDelete:
58 s = wb.Delete(cf_handle, "key");
59 break;
60 case WriteBatchOpType::kSingleDelete:
61 s = wb.SingleDelete(cf_handle, "key");
62 break;
63 case WriteBatchOpType::kDeleteRange:
64 s = wb.DeleteRange(cf_handle, "begin", "end");
65 break;
66 case WriteBatchOpType::kMerge:
67 s = wb.Merge(cf_handle, "key", "val");
68 break;
69 case WriteBatchOpType::kPutEntity:
70 s = wb.PutEntity(cf_handle, "key",
71 {{"attr_name1", "foo"}, {"attr_name2", "bar"}});
72 break;
73 case WriteBatchOpType::kNum:
74 assert(false);
75 }
76 return {std::move(wb), std::move(s)};
77}
78
79class DbKvChecksumTestBase : public DBTestBase {
80 public:
81 DbKvChecksumTestBase(const std::string& path, bool env_do_fsync)
82 : DBTestBase(path, env_do_fsync) {}
83
84 ColumnFamilyHandle* GetCFHandleToUse(ColumnFamilyHandle* column_family,
85 WriteBatchOpType op_type) const {
86 // Note: PutEntity cannot be called without column family
87 if (op_type == WriteBatchOpType::kPutEntity && !column_family) {
88 return db_->DefaultColumnFamily();
89 }
90
91 return column_family;
92 }
93};
94
95class DbKvChecksumTest
96 : public DbKvChecksumTestBase,
97 public ::testing::WithParamInterface<
98 std::tuple<WriteBatchOpType, char, WriteMode,
99 uint32_t /* memtable_protection_bytes_per_key */>> {
100 public:
101 DbKvChecksumTest()
102 : DbKvChecksumTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) {
103 op_type_ = std::get<0>(GetParam());
104 corrupt_byte_addend_ = std::get<1>(GetParam());
105 write_mode_ = std::get<2>(GetParam());
106 memtable_protection_bytes_per_key_ = std::get<3>(GetParam());
107 }
108
109 Status ExecuteWrite(ColumnFamilyHandle* cf_handle) {
110 switch (write_mode_) {
111 case WriteMode::kWriteUnprotectedBatch: {
112 auto batch_and_status =
113 GetWriteBatch(GetCFHandleToUse(cf_handle, op_type_),
114 0 /* protection_bytes_per_key */, op_type_);
115 assert(batch_and_status.second.ok());
116 // Default write option has protection_bytes_per_key = 0
117 return db_->Write(WriteOptions(), &batch_and_status.first);
118 }
119 case WriteMode::kWriteProtectedBatch: {
120 auto batch_and_status =
121 GetWriteBatch(GetCFHandleToUse(cf_handle, op_type_),
122 8 /* protection_bytes_per_key */, op_type_);
123 assert(batch_and_status.second.ok());
124 return db_->Write(WriteOptions(), &batch_and_status.first);
125 }
126 case WriteMode::kWriteOptionProtectedBatch: {
127 auto batch_and_status =
128 GetWriteBatch(GetCFHandleToUse(cf_handle, op_type_),
129 0 /* protection_bytes_per_key */, op_type_);
130 assert(batch_and_status.second.ok());
131 WriteOptions write_opts;
132 write_opts.protection_bytes_per_key = 8;
133 return db_->Write(write_opts, &batch_and_status.first);
134 }
135 case WriteMode::kNum:
136 assert(false);
137 }
138 return Status::NotSupported("WriteMode " +
139 std::to_string(static_cast<int>(write_mode_)));
140 }
141
142 void CorruptNextByteCallBack(void* arg) {
143 Slice encoded = *static_cast<Slice*>(arg);
144 if (entry_len_ == std::numeric_limits<size_t>::max()) {
145 // We learn the entry size on the first attempt
146 entry_len_ = encoded.size();
147 }
148 char* buf = const_cast<char*>(encoded.data());
149 buf[corrupt_byte_offset_] += corrupt_byte_addend_;
150 ++corrupt_byte_offset_;
151 }
152
153 bool MoreBytesToCorrupt() { return corrupt_byte_offset_ < entry_len_; }
154
155 protected:
156 WriteBatchOpType op_type_;
157 char corrupt_byte_addend_;
158 WriteMode write_mode_;
159 uint32_t memtable_protection_bytes_per_key_;
160 size_t corrupt_byte_offset_ = 0;
161 size_t entry_len_ = std::numeric_limits<size_t>::max();
162};
163
164std::string GetOpTypeString(const WriteBatchOpType& op_type) {
165 switch (op_type) {
166 case WriteBatchOpType::kPut:
167 return "Put";
168 case WriteBatchOpType::kDelete:
169 return "Delete";
170 case WriteBatchOpType::kSingleDelete:
171 return "SingleDelete";
172 case WriteBatchOpType::kDeleteRange:
173 return "DeleteRange";
174 case WriteBatchOpType::kMerge:
175 return "Merge";
176 case WriteBatchOpType::kPutEntity:
177 return "PutEntity";
178 case WriteBatchOpType::kNum:
179 assert(false);
180 }
181 assert(false);
182 return "";
183}
184
185std::string GetWriteModeString(const WriteMode& mode) {
186 switch (mode) {
187 case WriteMode::kWriteUnprotectedBatch:
188 return "WriteUnprotectedBatch";
189 case WriteMode::kWriteProtectedBatch:
190 return "WriteProtectedBatch";
191 case WriteMode::kWriteOptionProtectedBatch:
192 return "kWriteOptionProtectedBatch";
193 case WriteMode::kNum:
194 assert(false);
195 }
196 return "";
197}
198
199INSTANTIATE_TEST_CASE_P(
200 DbKvChecksumTest, DbKvChecksumTest,
201 ::testing::Combine(::testing::Range(static_cast<WriteBatchOpType>(0),
202 WriteBatchOpType::kNum),
203 ::testing::Values(2, 103, 251),
204 ::testing::Range(WriteMode::kWriteProtectedBatch,
205 WriteMode::kNum),
206 ::testing::Values(0)),
207 [](const testing::TestParamInfo<
208 std::tuple<WriteBatchOpType, char, WriteMode, uint32_t>>& args) {
209 std::ostringstream oss;
210 oss << GetOpTypeString(std::get<0>(args.param)) << "Add"
211 << static_cast<int>(
212 static_cast<unsigned char>(std::get<1>(args.param)))
213 << GetWriteModeString(std::get<2>(args.param))
214 << static_cast<uint32_t>(std::get<3>(args.param));
215 return oss.str();
216 });
217
218// TODO(ajkr): add a test that corrupts the `WriteBatch` contents. Such
219// corruptions should only be detectable in `WriteMode::kWriteProtectedBatch`.
220
221TEST_P(DbKvChecksumTest, MemTableAddCorrupted) {
222 // This test repeatedly attempts to write `WriteBatch`es containing a single
223 // entry of type `op_type_`. Each attempt has one byte corrupted in its
224 // memtable entry by adding `corrupt_byte_addend_` to its original value. The
225 // test repeats until an attempt has been made on each byte in the encoded
226 // memtable entry. All attempts are expected to fail with `Status::Corruption`
227 SyncPoint::GetInstance()->SetCallBack(
228 "MemTable::Add:Encoded",
229 std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
230 std::placeholders::_1));
231
232 while (MoreBytesToCorrupt()) {
233 // Failed memtable insert always leads to read-only mode, so we have to
234 // reopen for every attempt.
235 Options options = CurrentOptions();
236 if (op_type_ == WriteBatchOpType::kMerge) {
237 options.merge_operator = MergeOperators::CreateStringAppendOperator();
238 }
239 Reopen(options);
240
241 SyncPoint::GetInstance()->EnableProcessing();
242 ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption());
243 SyncPoint::GetInstance()->DisableProcessing();
244
245 // In case the above callback is not invoked, this test will run
246 // numeric_limits<size_t>::max() times until it reports an error (or will
247 // exhaust disk space). Added this assert to report error early.
248 ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max());
249 }
250}
251
252TEST_P(DbKvChecksumTest, MemTableAddWithColumnFamilyCorrupted) {
253 // This test repeatedly attempts to write `WriteBatch`es containing a single
254 // entry of type `op_type_` to a non-default column family. Each attempt has
255 // one byte corrupted in its memtable entry by adding `corrupt_byte_addend_`
256 // to its original value. The test repeats until an attempt has been made on
257 // each byte in the encoded memtable entry. All attempts are expected to fail
258 // with `Status::Corruption`.
259 Options options = CurrentOptions();
260 if (op_type_ == WriteBatchOpType::kMerge) {
261 options.merge_operator = MergeOperators::CreateStringAppendOperator();
262 }
263 CreateAndReopenWithCF({"pikachu"}, options);
264 SyncPoint::GetInstance()->SetCallBack(
265 "MemTable::Add:Encoded",
266 std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
267 std::placeholders::_1));
268
269 while (MoreBytesToCorrupt()) {
270 // Failed memtable insert always leads to read-only mode, so we have to
271 // reopen for every attempt.
272 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
273
274 SyncPoint::GetInstance()->EnableProcessing();
275 ASSERT_TRUE(ExecuteWrite(handles_[1]).IsCorruption());
276 SyncPoint::GetInstance()->DisableProcessing();
277
278 // In case the above callback is not invoked, this test will run
279 // numeric_limits<size_t>::max() times until it reports an error (or will
280 // exhaust disk space). Added this assert to report error early.
281 ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max());
282 }
283}
284
285TEST_P(DbKvChecksumTest, NoCorruptionCase) {
286 // If this test fails, we may have found a piece of malfunctioned hardware
287 auto batch_and_status =
288 GetWriteBatch(GetCFHandleToUse(nullptr, op_type_),
289 8 /* protection_bytes_per_key */, op_type_);
290 ASSERT_OK(batch_and_status.second);
291 ASSERT_OK(batch_and_status.first.VerifyChecksum());
292}
293
294TEST_P(DbKvChecksumTest, WriteToWALCorrupted) {
295 // This test repeatedly attempts to write `WriteBatch`es containing a single
296 // entry of type `op_type_`. Each attempt has one byte corrupted by adding
297 // `corrupt_byte_addend_` to its original value. The test repeats until an
298 // attempt has been made on each byte in the encoded write batch. All attempts
299 // are expected to fail with `Status::Corruption`
300 Options options = CurrentOptions();
301 if (op_type_ == WriteBatchOpType::kMerge) {
302 options.merge_operator = MergeOperators::CreateStringAppendOperator();
303 }
304 SyncPoint::GetInstance()->SetCallBack(
305 "DBImpl::WriteToWAL:log_entry",
306 std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
307 std::placeholders::_1));
308 // First 8 bytes are for sequence number which is not protected in write batch
309 corrupt_byte_offset_ = 8;
310
311 while (MoreBytesToCorrupt()) {
312 // Corrupted write batch leads to read-only mode, so we have to
313 // reopen for every attempt.
314 Reopen(options);
315 auto log_size_pre_write = dbfull()->TEST_total_log_size();
316
317 SyncPoint::GetInstance()->EnableProcessing();
318 ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption());
319 // Confirm that nothing was written to WAL
320 ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size());
321 ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
322 SyncPoint::GetInstance()->DisableProcessing();
323
324 // In case the above callback is not invoked, this test will run
325 // numeric_limits<size_t>::max() times until it reports an error (or will
326 // exhaust disk space). Added this assert to report error early.
327 ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max());
328 }
329}
330
331TEST_P(DbKvChecksumTest, WriteToWALWithColumnFamilyCorrupted) {
332 // This test repeatedly attempts to write `WriteBatch`es containing a single
333 // entry of type `op_type_`. Each attempt has one byte corrupted by adding
334 // `corrupt_byte_addend_` to its original value. The test repeats until an
335 // attempt has been made on each byte in the encoded write batch. All attempts
336 // are expected to fail with `Status::Corruption`
337 Options options = CurrentOptions();
338 if (op_type_ == WriteBatchOpType::kMerge) {
339 options.merge_operator = MergeOperators::CreateStringAppendOperator();
340 }
341 CreateAndReopenWithCF({"pikachu"}, options);
342 SyncPoint::GetInstance()->SetCallBack(
343 "DBImpl::WriteToWAL:log_entry",
344 std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
345 std::placeholders::_1));
346 // First 8 bytes are for sequence number which is not protected in write batch
347 corrupt_byte_offset_ = 8;
348
349 while (MoreBytesToCorrupt()) {
350 // Corrupted write batch leads to read-only mode, so we have to
351 // reopen for every attempt.
352 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options);
353 auto log_size_pre_write = dbfull()->TEST_total_log_size();
354
355 SyncPoint::GetInstance()->EnableProcessing();
356 ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption());
357 // Confirm that nothing was written to WAL
358 ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size());
359 ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
360 SyncPoint::GetInstance()->DisableProcessing();
361
362 // In case the above callback is not invoked, this test will run
363 // numeric_limits<size_t>::max() times until it reports an error (or will
364 // exhaust disk space). Added this assert to report error early.
365 ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max());
366 }
367}
368
369class DbKvChecksumTestMergedBatch
370 : public DbKvChecksumTestBase,
371 public ::testing::WithParamInterface<
372 std::tuple<WriteBatchOpType, WriteBatchOpType, char>> {
373 public:
374 DbKvChecksumTestMergedBatch()
375 : DbKvChecksumTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) {
376 op_type1_ = std::get<0>(GetParam());
377 op_type2_ = std::get<1>(GetParam());
378 corrupt_byte_addend_ = std::get<2>(GetParam());
379 }
380
381 protected:
382 WriteBatchOpType op_type1_;
383 WriteBatchOpType op_type2_;
384 char corrupt_byte_addend_;
385};
386
387void CorruptWriteBatch(Slice* content, size_t offset,
388 char corrupt_byte_addend) {
389 ASSERT_TRUE(offset < content->size());
390 char* buf = const_cast<char*>(content->data());
391 buf[offset] += corrupt_byte_addend;
392}
393
394TEST_P(DbKvChecksumTestMergedBatch, NoCorruptionCase) {
395 // Veirfy write batch checksum after write batch append
396 auto batch1 = GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_),
397 8 /* protection_bytes_per_key */, op_type1_);
398 ASSERT_OK(batch1.second);
399 auto batch2 = GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_),
400 8 /* protection_bytes_per_key */, op_type2_);
401 ASSERT_OK(batch2.second);
402 ASSERT_OK(WriteBatchInternal::Append(&batch1.first, &batch2.first));
403 ASSERT_OK(batch1.first.VerifyChecksum());
404}
405
406TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) {
407 // This test has two writers repeatedly attempt to write `WriteBatch`es
408 // containing a single entry of type op_type1_ and op_type2_ respectively. The
409 // leader of the write group writes the batch containinng the entry of type
410 // op_type1_. One byte of the pre-merged write batches is corrupted by adding
411 // `corrupt_byte_addend_` to the batch's original value during each attempt.
412 // The test repeats until an attempt has been made on each byte in both
413 // pre-merged write batches. All attempts are expected to fail with
414 // `Status::Corruption`.
415 Options options = CurrentOptions();
416 if (op_type1_ == WriteBatchOpType::kMerge ||
417 op_type2_ == WriteBatchOpType::kMerge) {
418 options.merge_operator = MergeOperators::CreateStringAppendOperator();
419 }
420
421 auto leader_batch_and_status =
422 GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_),
423 8 /* protection_bytes_per_key */, op_type1_);
424 ASSERT_OK(leader_batch_and_status.second);
425 auto follower_batch_and_status =
426 GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_),
427 8 /* protection_bytes_per_key */, op_type2_);
428 size_t leader_batch_size = leader_batch_and_status.first.GetDataSize();
429 size_t total_bytes =
430 leader_batch_size + follower_batch_and_status.first.GetDataSize();
431 // First 8 bytes are for sequence number which is not protected in write batch
432 size_t corrupt_byte_offset = 8;
433
434 std::atomic<bool> follower_joined{false};
435 std::atomic<int> leader_count{0};
436 port::Thread follower_thread;
437 // This callback should only be called by the leader thread
438 SyncPoint::GetInstance()->SetCallBack(
439 "WriteThread::JoinBatchGroup:Wait2", [&](void* arg_leader) {
440 auto* leader = reinterpret_cast<WriteThread::Writer*>(arg_leader);
441 ASSERT_EQ(leader->state, WriteThread::STATE_GROUP_LEADER);
442
443 // This callback should only be called by the follower thread
444 SyncPoint::GetInstance()->SetCallBack(
445 "WriteThread::JoinBatchGroup:Wait", [&](void* arg_follower) {
446 auto* follower =
447 reinterpret_cast<WriteThread::Writer*>(arg_follower);
448 // The leader thread will wait on this bool and hence wait until
449 // this writer joins the write group
450 ASSERT_NE(follower->state, WriteThread::STATE_GROUP_LEADER);
451 if (corrupt_byte_offset >= leader_batch_size) {
452 Slice batch_content = follower->batch->Data();
453 CorruptWriteBatch(&batch_content,
454 corrupt_byte_offset - leader_batch_size,
455 corrupt_byte_addend_);
456 }
457 // Leader busy waits on this flag
458 follower_joined = true;
459 // So the follower does not enter the outer callback at
460 // WriteThread::JoinBatchGroup:Wait2
461 SyncPoint::GetInstance()->DisableProcessing();
462 });
463
464 // Start the other writer thread which will join the write group as
465 // follower
466 follower_thread = port::Thread([&]() {
467 follower_batch_and_status =
468 GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_),
469 8 /* protection_bytes_per_key */, op_type2_);
470 ASSERT_OK(follower_batch_and_status.second);
471 ASSERT_TRUE(
472 db_->Write(WriteOptions(), &follower_batch_and_status.first)
473 .IsCorruption());
474 });
475
476 ASSERT_EQ(leader->batch->GetDataSize(), leader_batch_size);
477 if (corrupt_byte_offset < leader_batch_size) {
478 Slice batch_content = leader->batch->Data();
479 CorruptWriteBatch(&batch_content, corrupt_byte_offset,
480 corrupt_byte_addend_);
481 }
482 leader_count++;
483 while (!follower_joined) {
484 // busy waiting
485 }
486 });
487 while (corrupt_byte_offset < total_bytes) {
488 // Reopen DB since it failed WAL write which lead to read-only mode
489 Reopen(options);
490 SyncPoint::GetInstance()->EnableProcessing();
491 auto log_size_pre_write = dbfull()->TEST_total_log_size();
492 leader_batch_and_status =
493 GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_),
494 8 /* protection_bytes_per_key */, op_type1_);
495 ASSERT_OK(leader_batch_and_status.second);
496 ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first)
497 .IsCorruption());
498 follower_thread.join();
499 // Prevent leader thread from entering this callback
500 SyncPoint::GetInstance()->ClearCallBack("WriteThread::JoinBatchGroup:Wait");
501 ASSERT_EQ(1, leader_count);
502 // Nothing should have been written to WAL
503 ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size());
504 ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
505
506 corrupt_byte_offset++;
507 if (corrupt_byte_offset == leader_batch_size) {
508 // skip over the sequence number part of follower's write batch
509 corrupt_byte_offset += 8;
510 }
511 follower_joined = false;
512 leader_count = 0;
513 }
514 SyncPoint::GetInstance()->DisableProcessing();
515}
516
517TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) {
518 // This test has two writers repeatedly attempt to write `WriteBatch`es
519 // containing a single entry of type op_type1_ and op_type2_ respectively. The
520 // leader of the write group writes the batch containinng the entry of type
521 // op_type1_. One byte of the pre-merged write batches is corrupted by adding
522 // `corrupt_byte_addend_` to the batch's original value during each attempt.
523 // The test repeats until an attempt has been made on each byte in both
524 // pre-merged write batches. All attempts are expected to fail with
525 // `Status::Corruption`.
526 Options options = CurrentOptions();
527 if (op_type1_ == WriteBatchOpType::kMerge ||
528 op_type2_ == WriteBatchOpType::kMerge) {
529 options.merge_operator = MergeOperators::CreateStringAppendOperator();
530 }
531 CreateAndReopenWithCF({"ramen"}, options);
532
533 auto leader_batch_and_status =
534 GetWriteBatch(GetCFHandleToUse(handles_[1], op_type1_),
535 8 /* protection_bytes_per_key */, op_type1_);
536 ASSERT_OK(leader_batch_and_status.second);
537 auto follower_batch_and_status =
538 GetWriteBatch(GetCFHandleToUse(handles_[1], op_type2_),
539 8 /* protection_bytes_per_key */, op_type2_);
540 size_t leader_batch_size = leader_batch_and_status.first.GetDataSize();
541 size_t total_bytes =
542 leader_batch_size + follower_batch_and_status.first.GetDataSize();
543 // First 8 bytes are for sequence number which is not protected in write batch
544 size_t corrupt_byte_offset = 8;
545
546 std::atomic<bool> follower_joined{false};
547 std::atomic<int> leader_count{0};
548 port::Thread follower_thread;
549 // This callback should only be called by the leader thread
550 SyncPoint::GetInstance()->SetCallBack(
551 "WriteThread::JoinBatchGroup:Wait2", [&](void* arg_leader) {
552 auto* leader = reinterpret_cast<WriteThread::Writer*>(arg_leader);
553 ASSERT_EQ(leader->state, WriteThread::STATE_GROUP_LEADER);
554
555 // This callback should only be called by the follower thread
556 SyncPoint::GetInstance()->SetCallBack(
557 "WriteThread::JoinBatchGroup:Wait", [&](void* arg_follower) {
558 auto* follower =
559 reinterpret_cast<WriteThread::Writer*>(arg_follower);
560 // The leader thread will wait on this bool and hence wait until
561 // this writer joins the write group
562 ASSERT_NE(follower->state, WriteThread::STATE_GROUP_LEADER);
563 if (corrupt_byte_offset >= leader_batch_size) {
564 Slice batch_content =
565 WriteBatchInternal::Contents(follower->batch);
566 CorruptWriteBatch(&batch_content,
567 corrupt_byte_offset - leader_batch_size,
568 corrupt_byte_addend_);
569 }
570 follower_joined = true;
571 // So the follower does not enter the outer callback at
572 // WriteThread::JoinBatchGroup:Wait2
573 SyncPoint::GetInstance()->DisableProcessing();
574 });
575
576 // Start the other writer thread which will join the write group as
577 // follower
578 follower_thread = port::Thread([&]() {
579 follower_batch_and_status =
580 GetWriteBatch(GetCFHandleToUse(handles_[1], op_type2_),
581 8 /* protection_bytes_per_key */, op_type2_);
582 ASSERT_OK(follower_batch_and_status.second);
583 ASSERT_TRUE(
584 db_->Write(WriteOptions(), &follower_batch_and_status.first)
585 .IsCorruption());
586 });
587
588 ASSERT_EQ(leader->batch->GetDataSize(), leader_batch_size);
589 if (corrupt_byte_offset < leader_batch_size) {
590 Slice batch_content = WriteBatchInternal::Contents(leader->batch);
591 CorruptWriteBatch(&batch_content, corrupt_byte_offset,
592 corrupt_byte_addend_);
593 }
594 leader_count++;
595 while (!follower_joined) {
596 // busy waiting
597 }
598 });
599 SyncPoint::GetInstance()->EnableProcessing();
600 while (corrupt_byte_offset < total_bytes) {
601 // Reopen DB since it failed WAL write which lead to read-only mode
602 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "ramen"}, options);
603 SyncPoint::GetInstance()->EnableProcessing();
604 auto log_size_pre_write = dbfull()->TEST_total_log_size();
605 leader_batch_and_status =
606 GetWriteBatch(GetCFHandleToUse(handles_[1], op_type1_),
607 8 /* protection_bytes_per_key */, op_type1_);
608 ASSERT_OK(leader_batch_and_status.second);
609 ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first)
610 .IsCorruption());
611 follower_thread.join();
612 // Prevent leader thread from entering this callback
613 SyncPoint::GetInstance()->ClearCallBack("WriteThread::JoinBatchGroup:Wait");
614
615 ASSERT_EQ(1, leader_count);
616 // Nothing should have been written to WAL
617 ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size());
618 ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
619
620 corrupt_byte_offset++;
621 if (corrupt_byte_offset == leader_batch_size) {
622 // skip over the sequence number part of follower's write batch
623 corrupt_byte_offset += 8;
624 }
625 follower_joined = false;
626 leader_count = 0;
627 }
628 SyncPoint::GetInstance()->DisableProcessing();
629}
630
631INSTANTIATE_TEST_CASE_P(
632 DbKvChecksumTestMergedBatch, DbKvChecksumTestMergedBatch,
633 ::testing::Combine(::testing::Range(static_cast<WriteBatchOpType>(0),
634 WriteBatchOpType::kNum),
635 ::testing::Range(static_cast<WriteBatchOpType>(0),
636 WriteBatchOpType::kNum),
637 ::testing::Values(2, 103, 251)),
638 [](const testing::TestParamInfo<
639 std::tuple<WriteBatchOpType, WriteBatchOpType, char>>& args) {
640 std::ostringstream oss;
641 oss << GetOpTypeString(std::get<0>(args.param))
642 << GetOpTypeString(std::get<1>(args.param)) << "Add"
643 << static_cast<int>(
644 static_cast<unsigned char>(std::get<2>(args.param)));
645 return oss.str();
646 });
647
648// TODO: add test for transactions
649// TODO: add test for corrupted write batch with WAL disabled
650
651class DbKVChecksumWALToWriteBatchTest : public DBTestBase {
652 public:
653 DbKVChecksumWALToWriteBatchTest()
654 : DBTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) {}
655};
656
657TEST_F(DbKVChecksumWALToWriteBatchTest, WriteBatchChecksumHandoff) {
658 Options options = CurrentOptions();
659 Reopen(options);
660 ASSERT_OK(db_->Put(WriteOptions(), "key", "val"));
661 std::string content = "";
662 SyncPoint::GetInstance()->SetCallBack(
663 "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch",
664 [&](void* batch_ptr) {
665 WriteBatch* batch = reinterpret_cast<WriteBatch*>(batch_ptr);
666 content.assign(batch->Data().data(), batch->GetDataSize());
667 Slice batch_content = batch->Data();
668 // Corrupt first bit
669 CorruptWriteBatch(&batch_content, 0, 1);
670 });
671 SyncPoint::GetInstance()->SetCallBack(
672 "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum",
673 [&](void* checksum_ptr) {
674 // Verify that checksum is produced on the batch content
675 uint64_t checksum = *reinterpret_cast<uint64_t*>(checksum_ptr);
676 ASSERT_EQ(checksum, XXH3_64bits(content.data(), content.size()));
677 });
678 SyncPoint::GetInstance()->EnableProcessing();
679 ASSERT_TRUE(TryReopen(options).IsCorruption());
680 SyncPoint::GetInstance()->DisableProcessing();
681};
682
683// TODO (cbi): add DeleteRange coverage once it is implemented
684class DbMemtableKVChecksumTest : public DbKvChecksumTest {
685 public:
686 DbMemtableKVChecksumTest() : DbKvChecksumTest() {}
687
688 protected:
689 // Indices in the memtable entry that we will not corrupt.
690 // For memtable entry format, see comments in MemTable::Add().
691 // We do not corrupt key length and value length fields in this test
692 // case since it causes segfault and ASAN will complain.
693 // For this test case, key and value are all of length 3, so
694 // key length field is at index 0 and value length field is at index 12.
695 const std::set<size_t> index_not_to_corrupt{0, 12};
696
697 void SkipNotToCorruptEntry() {
698 if (index_not_to_corrupt.find(corrupt_byte_offset_) !=
699 index_not_to_corrupt.end()) {
700 corrupt_byte_offset_++;
701 }
702 }
703};
704
705INSTANTIATE_TEST_CASE_P(
706 DbMemtableKVChecksumTest, DbMemtableKVChecksumTest,
707 ::testing::Combine(::testing::Range(static_cast<WriteBatchOpType>(0),
708 WriteBatchOpType::kDeleteRange),
709 ::testing::Values(2, 103, 251),
710 ::testing::Range(static_cast<WriteMode>(0),
711 WriteMode::kWriteOptionProtectedBatch),
712 // skip 1 byte checksum as it makes test flaky
713 ::testing::Values(2, 4, 8)),
714 [](const testing::TestParamInfo<
715 std::tuple<WriteBatchOpType, char, WriteMode, uint32_t>>& args) {
716 std::ostringstream oss;
717 oss << GetOpTypeString(std::get<0>(args.param)) << "Add"
718 << static_cast<int>(
719 static_cast<unsigned char>(std::get<1>(args.param)))
720 << GetWriteModeString(std::get<2>(args.param))
721 << static_cast<uint32_t>(std::get<3>(args.param));
722 return oss.str();
723 });
724
725TEST_P(DbMemtableKVChecksumTest, GetWithCorruptAfterMemtableInsert) {
726 // Record memtable entry size.
727 // Not corrupting memtable entry here since it will segfault
728 // or fail some asserts inside memtablerep implementation
729 // e.g., when key_len is corrupted.
730 SyncPoint::GetInstance()->SetCallBack(
731 "MemTable::Add:BeforeReturn:Encoded", [&](void* arg) {
732 Slice encoded = *static_cast<Slice*>(arg);
733 entry_len_ = encoded.size();
734 });
735
736 SyncPoint::GetInstance()->SetCallBack(
737 "Memtable::SaveValue:Begin:entry", [&](void* entry) {
738 char* buf = *static_cast<char**>(entry);
739 buf[corrupt_byte_offset_] += corrupt_byte_addend_;
740 ++corrupt_byte_offset_;
741 });
742 SyncPoint::GetInstance()->EnableProcessing();
743 Options options = CurrentOptions();
744 options.memtable_protection_bytes_per_key =
745 memtable_protection_bytes_per_key_;
746 if (op_type_ == WriteBatchOpType::kMerge) {
747 options.merge_operator = MergeOperators::CreateStringAppendOperator();
748 }
749
750 SkipNotToCorruptEntry();
751 while (MoreBytesToCorrupt()) {
752 Reopen(options);
753 ASSERT_OK(ExecuteWrite(nullptr));
754 std::string val;
755 ASSERT_TRUE(db_->Get(ReadOptions(), "key", &val).IsCorruption());
756 Destroy(options);
757 SkipNotToCorruptEntry();
758 }
759}
760
761TEST_P(DbMemtableKVChecksumTest,
762 GetWithColumnFamilyCorruptAfterMemtableInsert) {
763 // Record memtable entry size.
764 // Not corrupting memtable entry here since it will segfault
765 // or fail some asserts inside memtablerep implementation
766 // e.g., when key_len is corrupted.
767 SyncPoint::GetInstance()->SetCallBack(
768 "MemTable::Add:BeforeReturn:Encoded", [&](void* arg) {
769 Slice encoded = *static_cast<Slice*>(arg);
770 entry_len_ = encoded.size();
771 });
772
773 SyncPoint::GetInstance()->SetCallBack(
774 "Memtable::SaveValue:Begin:entry", [&](void* entry) {
775 char* buf = *static_cast<char**>(entry);
776 buf[corrupt_byte_offset_] += corrupt_byte_addend_;
777 ++corrupt_byte_offset_;
778 });
779 SyncPoint::GetInstance()->EnableProcessing();
780 Options options = CurrentOptions();
781 options.memtable_protection_bytes_per_key =
782 memtable_protection_bytes_per_key_;
783 if (op_type_ == WriteBatchOpType::kMerge) {
784 options.merge_operator = MergeOperators::CreateStringAppendOperator();
785 }
786
787 SkipNotToCorruptEntry();
788 while (MoreBytesToCorrupt()) {
789 Reopen(options);
790 CreateAndReopenWithCF({"pikachu"}, options);
791 ASSERT_OK(ExecuteWrite(handles_[1]));
792 std::string val;
793 ASSERT_TRUE(
794 db_->Get(ReadOptions(), handles_[1], "key", &val).IsCorruption());
795 Destroy(options);
796 SkipNotToCorruptEntry();
797 }
798}
799
800TEST_P(DbMemtableKVChecksumTest, IteratorWithCorruptAfterMemtableInsert) {
801 SyncPoint::GetInstance()->SetCallBack(
802 "MemTable::Add:BeforeReturn:Encoded",
803 std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
804 std::placeholders::_1));
805 SyncPoint::GetInstance()->EnableProcessing();
806 Options options = CurrentOptions();
807 options.memtable_protection_bytes_per_key =
808 memtable_protection_bytes_per_key_;
809 if (op_type_ == WriteBatchOpType::kMerge) {
810 options.merge_operator = MergeOperators::CreateStringAppendOperator();
811 }
812
813 SkipNotToCorruptEntry();
814 while (MoreBytesToCorrupt()) {
815 Reopen(options);
816 ASSERT_OK(ExecuteWrite(nullptr));
817 Iterator* it = db_->NewIterator(ReadOptions());
818 it->SeekToFirst();
819 ASSERT_FALSE(it->Valid());
820 ASSERT_TRUE(it->status().IsCorruption());
821 delete it;
822 Destroy(options);
823 SkipNotToCorruptEntry();
824 }
825}
826
827TEST_P(DbMemtableKVChecksumTest,
828 IteratorWithColumnFamilyCorruptAfterMemtableInsert) {
829 SyncPoint::GetInstance()->SetCallBack(
830 "MemTable::Add:BeforeReturn:Encoded",
831 std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
832 std::placeholders::_1));
833 SyncPoint::GetInstance()->EnableProcessing();
834 Options options = CurrentOptions();
835 options.memtable_protection_bytes_per_key =
836 memtable_protection_bytes_per_key_;
837 if (op_type_ == WriteBatchOpType::kMerge) {
838 options.merge_operator = MergeOperators::CreateStringAppendOperator();
839 }
840
841 SkipNotToCorruptEntry();
842 while (MoreBytesToCorrupt()) {
843 Reopen(options);
844 CreateAndReopenWithCF({"pikachu"}, options);
845 ASSERT_OK(ExecuteWrite(handles_[1]));
846 Iterator* it = db_->NewIterator(ReadOptions(), handles_[1]);
847 it->SeekToFirst();
848 ASSERT_FALSE(it->Valid());
849 ASSERT_TRUE(it->status().IsCorruption());
850 delete it;
851 Destroy(options);
852 SkipNotToCorruptEntry();
853 }
854}
855
856TEST_P(DbMemtableKVChecksumTest, FlushWithCorruptAfterMemtableInsert) {
857 SyncPoint::GetInstance()->SetCallBack(
858 "MemTable::Add:BeforeReturn:Encoded",
859 std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this,
860 std::placeholders::_1));
861 SyncPoint::GetInstance()->EnableProcessing();
862 Options options = CurrentOptions();
863 options.memtable_protection_bytes_per_key =
864 memtable_protection_bytes_per_key_;
865 if (op_type_ == WriteBatchOpType::kMerge) {
866 options.merge_operator = MergeOperators::CreateStringAppendOperator();
867 }
868
869 SkipNotToCorruptEntry();
870 // Not corruping each byte like other tests since Flush() is relatively slow.
871 Reopen(options);
872 ASSERT_OK(ExecuteWrite(nullptr));
873 ASSERT_TRUE(Flush().IsCorruption());
874 // DB enters read-only state when flush reads corrupted data
875 ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption());
876 Destroy(options);
877}
878
879} // namespace ROCKSDB_NAMESPACE
880
881int main(int argc, char** argv) {
882 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
883 ::testing::InitGoogleTest(&argc, argv);
884 return RUN_ALL_TESTS();
885}