]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | // Copyright (c) 2020-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #include "db/blob/blob_index.h" | |
7 | #include "db/db_test_util.h" | |
8 | #include "rocksdb/rocksdb_namespace.h" | |
9 | ||
10 | namespace ROCKSDB_NAMESPACE { | |
11 | ||
12 | enum class WriteBatchOpType { | |
13 | kPut = 0, | |
14 | kDelete, | |
15 | kSingleDelete, | |
16 | kMerge, | |
17 | kPutEntity, | |
18 | kDeleteRange, | |
19 | kNum, | |
20 | }; | |
21 | ||
22 | // Integer addition is needed for `::testing::Range()` to take the enum type. | |
23 | WriteBatchOpType operator+(WriteBatchOpType lhs, const int rhs) { | |
24 | using T = std::underlying_type<WriteBatchOpType>::type; | |
25 | return static_cast<WriteBatchOpType>(static_cast<T>(lhs) + rhs); | |
26 | } | |
27 | ||
28 | enum class WriteMode { | |
29 | // `Write()` a `WriteBatch` constructed with `protection_bytes_per_key = 0` | |
30 | // and `WriteOptions::protection_bytes_per_key = 0` | |
31 | kWriteUnprotectedBatch = 0, | |
32 | // `Write()` a `WriteBatch` constructed with `protection_bytes_per_key > 0`. | |
33 | kWriteProtectedBatch, | |
34 | // `Write()` a `WriteBatch` constructed with `protection_bytes_per_key == 0`. | |
35 | // Protection is enabled via `WriteOptions::protection_bytes_per_key > 0`. | |
36 | kWriteOptionProtectedBatch, | |
37 | // TODO(ajkr): add a mode that uses `Write()` wrappers, e.g., `Put()`. | |
38 | kNum, | |
39 | }; | |
40 | ||
41 | // Integer addition is needed for `::testing::Range()` to take the enum type. | |
42 | WriteMode operator+(WriteMode lhs, const int rhs) { | |
43 | using T = std::underlying_type<WriteMode>::type; | |
44 | return static_cast<WriteMode>(static_cast<T>(lhs) + rhs); | |
45 | } | |
46 | ||
47 | std::pair<WriteBatch, Status> GetWriteBatch(ColumnFamilyHandle* cf_handle, | |
48 | size_t protection_bytes_per_key, | |
49 | WriteBatchOpType op_type) { | |
50 | Status s; | |
51 | WriteBatch wb(0 /* reserved_bytes */, 0 /* max_bytes */, | |
52 | protection_bytes_per_key, 0 /* default_cf_ts_sz */); | |
53 | switch (op_type) { | |
54 | case WriteBatchOpType::kPut: | |
55 | s = wb.Put(cf_handle, "key", "val"); | |
56 | break; | |
57 | case WriteBatchOpType::kDelete: | |
58 | s = wb.Delete(cf_handle, "key"); | |
59 | break; | |
60 | case WriteBatchOpType::kSingleDelete: | |
61 | s = wb.SingleDelete(cf_handle, "key"); | |
62 | break; | |
63 | case WriteBatchOpType::kDeleteRange: | |
64 | s = wb.DeleteRange(cf_handle, "begin", "end"); | |
65 | break; | |
66 | case WriteBatchOpType::kMerge: | |
67 | s = wb.Merge(cf_handle, "key", "val"); | |
68 | break; | |
69 | case WriteBatchOpType::kPutEntity: | |
70 | s = wb.PutEntity(cf_handle, "key", | |
71 | {{"attr_name1", "foo"}, {"attr_name2", "bar"}}); | |
72 | break; | |
73 | case WriteBatchOpType::kNum: | |
74 | assert(false); | |
75 | } | |
76 | return {std::move(wb), std::move(s)}; | |
77 | } | |
78 | ||
79 | class DbKvChecksumTestBase : public DBTestBase { | |
80 | public: | |
81 | DbKvChecksumTestBase(const std::string& path, bool env_do_fsync) | |
82 | : DBTestBase(path, env_do_fsync) {} | |
83 | ||
84 | ColumnFamilyHandle* GetCFHandleToUse(ColumnFamilyHandle* column_family, | |
85 | WriteBatchOpType op_type) const { | |
86 | // Note: PutEntity cannot be called without column family | |
87 | if (op_type == WriteBatchOpType::kPutEntity && !column_family) { | |
88 | return db_->DefaultColumnFamily(); | |
89 | } | |
90 | ||
91 | return column_family; | |
92 | } | |
93 | }; | |
94 | ||
95 | class DbKvChecksumTest | |
96 | : public DbKvChecksumTestBase, | |
97 | public ::testing::WithParamInterface< | |
98 | std::tuple<WriteBatchOpType, char, WriteMode, | |
99 | uint32_t /* memtable_protection_bytes_per_key */>> { | |
100 | public: | |
101 | DbKvChecksumTest() | |
102 | : DbKvChecksumTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) { | |
103 | op_type_ = std::get<0>(GetParam()); | |
104 | corrupt_byte_addend_ = std::get<1>(GetParam()); | |
105 | write_mode_ = std::get<2>(GetParam()); | |
106 | memtable_protection_bytes_per_key_ = std::get<3>(GetParam()); | |
107 | } | |
108 | ||
109 | Status ExecuteWrite(ColumnFamilyHandle* cf_handle) { | |
110 | switch (write_mode_) { | |
111 | case WriteMode::kWriteUnprotectedBatch: { | |
112 | auto batch_and_status = | |
113 | GetWriteBatch(GetCFHandleToUse(cf_handle, op_type_), | |
114 | 0 /* protection_bytes_per_key */, op_type_); | |
115 | assert(batch_and_status.second.ok()); | |
116 | // Default write option has protection_bytes_per_key = 0 | |
117 | return db_->Write(WriteOptions(), &batch_and_status.first); | |
118 | } | |
119 | case WriteMode::kWriteProtectedBatch: { | |
120 | auto batch_and_status = | |
121 | GetWriteBatch(GetCFHandleToUse(cf_handle, op_type_), | |
122 | 8 /* protection_bytes_per_key */, op_type_); | |
123 | assert(batch_and_status.second.ok()); | |
124 | return db_->Write(WriteOptions(), &batch_and_status.first); | |
125 | } | |
126 | case WriteMode::kWriteOptionProtectedBatch: { | |
127 | auto batch_and_status = | |
128 | GetWriteBatch(GetCFHandleToUse(cf_handle, op_type_), | |
129 | 0 /* protection_bytes_per_key */, op_type_); | |
130 | assert(batch_and_status.second.ok()); | |
131 | WriteOptions write_opts; | |
132 | write_opts.protection_bytes_per_key = 8; | |
133 | return db_->Write(write_opts, &batch_and_status.first); | |
134 | } | |
135 | case WriteMode::kNum: | |
136 | assert(false); | |
137 | } | |
138 | return Status::NotSupported("WriteMode " + | |
139 | std::to_string(static_cast<int>(write_mode_))); | |
140 | } | |
141 | ||
142 | void CorruptNextByteCallBack(void* arg) { | |
143 | Slice encoded = *static_cast<Slice*>(arg); | |
144 | if (entry_len_ == std::numeric_limits<size_t>::max()) { | |
145 | // We learn the entry size on the first attempt | |
146 | entry_len_ = encoded.size(); | |
147 | } | |
148 | char* buf = const_cast<char*>(encoded.data()); | |
149 | buf[corrupt_byte_offset_] += corrupt_byte_addend_; | |
150 | ++corrupt_byte_offset_; | |
151 | } | |
152 | ||
153 | bool MoreBytesToCorrupt() { return corrupt_byte_offset_ < entry_len_; } | |
154 | ||
155 | protected: | |
156 | WriteBatchOpType op_type_; | |
157 | char corrupt_byte_addend_; | |
158 | WriteMode write_mode_; | |
159 | uint32_t memtable_protection_bytes_per_key_; | |
160 | size_t corrupt_byte_offset_ = 0; | |
161 | size_t entry_len_ = std::numeric_limits<size_t>::max(); | |
162 | }; | |
163 | ||
164 | std::string GetOpTypeString(const WriteBatchOpType& op_type) { | |
165 | switch (op_type) { | |
166 | case WriteBatchOpType::kPut: | |
167 | return "Put"; | |
168 | case WriteBatchOpType::kDelete: | |
169 | return "Delete"; | |
170 | case WriteBatchOpType::kSingleDelete: | |
171 | return "SingleDelete"; | |
172 | case WriteBatchOpType::kDeleteRange: | |
173 | return "DeleteRange"; | |
174 | case WriteBatchOpType::kMerge: | |
175 | return "Merge"; | |
176 | case WriteBatchOpType::kPutEntity: | |
177 | return "PutEntity"; | |
178 | case WriteBatchOpType::kNum: | |
179 | assert(false); | |
180 | } | |
181 | assert(false); | |
182 | return ""; | |
183 | } | |
184 | ||
185 | std::string GetWriteModeString(const WriteMode& mode) { | |
186 | switch (mode) { | |
187 | case WriteMode::kWriteUnprotectedBatch: | |
188 | return "WriteUnprotectedBatch"; | |
189 | case WriteMode::kWriteProtectedBatch: | |
190 | return "WriteProtectedBatch"; | |
191 | case WriteMode::kWriteOptionProtectedBatch: | |
192 | return "kWriteOptionProtectedBatch"; | |
193 | case WriteMode::kNum: | |
194 | assert(false); | |
195 | } | |
196 | return ""; | |
197 | } | |
198 | ||
199 | INSTANTIATE_TEST_CASE_P( | |
200 | DbKvChecksumTest, DbKvChecksumTest, | |
201 | ::testing::Combine(::testing::Range(static_cast<WriteBatchOpType>(0), | |
202 | WriteBatchOpType::kNum), | |
203 | ::testing::Values(2, 103, 251), | |
204 | ::testing::Range(WriteMode::kWriteProtectedBatch, | |
205 | WriteMode::kNum), | |
206 | ::testing::Values(0)), | |
207 | [](const testing::TestParamInfo< | |
208 | std::tuple<WriteBatchOpType, char, WriteMode, uint32_t>>& args) { | |
209 | std::ostringstream oss; | |
210 | oss << GetOpTypeString(std::get<0>(args.param)) << "Add" | |
211 | << static_cast<int>( | |
212 | static_cast<unsigned char>(std::get<1>(args.param))) | |
213 | << GetWriteModeString(std::get<2>(args.param)) | |
214 | << static_cast<uint32_t>(std::get<3>(args.param)); | |
215 | return oss.str(); | |
216 | }); | |
217 | ||
218 | // TODO(ajkr): add a test that corrupts the `WriteBatch` contents. Such | |
219 | // corruptions should only be detectable in `WriteMode::kWriteProtectedBatch`. | |
220 | ||
221 | TEST_P(DbKvChecksumTest, MemTableAddCorrupted) { | |
222 | // This test repeatedly attempts to write `WriteBatch`es containing a single | |
223 | // entry of type `op_type_`. Each attempt has one byte corrupted in its | |
224 | // memtable entry by adding `corrupt_byte_addend_` to its original value. The | |
225 | // test repeats until an attempt has been made on each byte in the encoded | |
226 | // memtable entry. All attempts are expected to fail with `Status::Corruption` | |
227 | SyncPoint::GetInstance()->SetCallBack( | |
228 | "MemTable::Add:Encoded", | |
229 | std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this, | |
230 | std::placeholders::_1)); | |
231 | ||
232 | while (MoreBytesToCorrupt()) { | |
233 | // Failed memtable insert always leads to read-only mode, so we have to | |
234 | // reopen for every attempt. | |
235 | Options options = CurrentOptions(); | |
236 | if (op_type_ == WriteBatchOpType::kMerge) { | |
237 | options.merge_operator = MergeOperators::CreateStringAppendOperator(); | |
238 | } | |
239 | Reopen(options); | |
240 | ||
241 | SyncPoint::GetInstance()->EnableProcessing(); | |
242 | ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption()); | |
243 | SyncPoint::GetInstance()->DisableProcessing(); | |
244 | ||
245 | // In case the above callback is not invoked, this test will run | |
246 | // numeric_limits<size_t>::max() times until it reports an error (or will | |
247 | // exhaust disk space). Added this assert to report error early. | |
248 | ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max()); | |
249 | } | |
250 | } | |
251 | ||
252 | TEST_P(DbKvChecksumTest, MemTableAddWithColumnFamilyCorrupted) { | |
253 | // This test repeatedly attempts to write `WriteBatch`es containing a single | |
254 | // entry of type `op_type_` to a non-default column family. Each attempt has | |
255 | // one byte corrupted in its memtable entry by adding `corrupt_byte_addend_` | |
256 | // to its original value. The test repeats until an attempt has been made on | |
257 | // each byte in the encoded memtable entry. All attempts are expected to fail | |
258 | // with `Status::Corruption`. | |
259 | Options options = CurrentOptions(); | |
260 | if (op_type_ == WriteBatchOpType::kMerge) { | |
261 | options.merge_operator = MergeOperators::CreateStringAppendOperator(); | |
262 | } | |
263 | CreateAndReopenWithCF({"pikachu"}, options); | |
264 | SyncPoint::GetInstance()->SetCallBack( | |
265 | "MemTable::Add:Encoded", | |
266 | std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this, | |
267 | std::placeholders::_1)); | |
268 | ||
269 | while (MoreBytesToCorrupt()) { | |
270 | // Failed memtable insert always leads to read-only mode, so we have to | |
271 | // reopen for every attempt. | |
272 | ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); | |
273 | ||
274 | SyncPoint::GetInstance()->EnableProcessing(); | |
275 | ASSERT_TRUE(ExecuteWrite(handles_[1]).IsCorruption()); | |
276 | SyncPoint::GetInstance()->DisableProcessing(); | |
277 | ||
278 | // In case the above callback is not invoked, this test will run | |
279 | // numeric_limits<size_t>::max() times until it reports an error (or will | |
280 | // exhaust disk space). Added this assert to report error early. | |
281 | ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max()); | |
282 | } | |
283 | } | |
284 | ||
285 | TEST_P(DbKvChecksumTest, NoCorruptionCase) { | |
286 | // If this test fails, we may have found a piece of malfunctioned hardware | |
287 | auto batch_and_status = | |
288 | GetWriteBatch(GetCFHandleToUse(nullptr, op_type_), | |
289 | 8 /* protection_bytes_per_key */, op_type_); | |
290 | ASSERT_OK(batch_and_status.second); | |
291 | ASSERT_OK(batch_and_status.first.VerifyChecksum()); | |
292 | } | |
293 | ||
294 | TEST_P(DbKvChecksumTest, WriteToWALCorrupted) { | |
295 | // This test repeatedly attempts to write `WriteBatch`es containing a single | |
296 | // entry of type `op_type_`. Each attempt has one byte corrupted by adding | |
297 | // `corrupt_byte_addend_` to its original value. The test repeats until an | |
298 | // attempt has been made on each byte in the encoded write batch. All attempts | |
299 | // are expected to fail with `Status::Corruption` | |
300 | Options options = CurrentOptions(); | |
301 | if (op_type_ == WriteBatchOpType::kMerge) { | |
302 | options.merge_operator = MergeOperators::CreateStringAppendOperator(); | |
303 | } | |
304 | SyncPoint::GetInstance()->SetCallBack( | |
305 | "DBImpl::WriteToWAL:log_entry", | |
306 | std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this, | |
307 | std::placeholders::_1)); | |
308 | // First 8 bytes are for sequence number which is not protected in write batch | |
309 | corrupt_byte_offset_ = 8; | |
310 | ||
311 | while (MoreBytesToCorrupt()) { | |
312 | // Corrupted write batch leads to read-only mode, so we have to | |
313 | // reopen for every attempt. | |
314 | Reopen(options); | |
315 | auto log_size_pre_write = dbfull()->TEST_total_log_size(); | |
316 | ||
317 | SyncPoint::GetInstance()->EnableProcessing(); | |
318 | ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption()); | |
319 | // Confirm that nothing was written to WAL | |
320 | ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size()); | |
321 | ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption()); | |
322 | SyncPoint::GetInstance()->DisableProcessing(); | |
323 | ||
324 | // In case the above callback is not invoked, this test will run | |
325 | // numeric_limits<size_t>::max() times until it reports an error (or will | |
326 | // exhaust disk space). Added this assert to report error early. | |
327 | ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max()); | |
328 | } | |
329 | } | |
330 | ||
331 | TEST_P(DbKvChecksumTest, WriteToWALWithColumnFamilyCorrupted) { | |
332 | // This test repeatedly attempts to write `WriteBatch`es containing a single | |
333 | // entry of type `op_type_`. Each attempt has one byte corrupted by adding | |
334 | // `corrupt_byte_addend_` to its original value. The test repeats until an | |
335 | // attempt has been made on each byte in the encoded write batch. All attempts | |
336 | // are expected to fail with `Status::Corruption` | |
337 | Options options = CurrentOptions(); | |
338 | if (op_type_ == WriteBatchOpType::kMerge) { | |
339 | options.merge_operator = MergeOperators::CreateStringAppendOperator(); | |
340 | } | |
341 | CreateAndReopenWithCF({"pikachu"}, options); | |
342 | SyncPoint::GetInstance()->SetCallBack( | |
343 | "DBImpl::WriteToWAL:log_entry", | |
344 | std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this, | |
345 | std::placeholders::_1)); | |
346 | // First 8 bytes are for sequence number which is not protected in write batch | |
347 | corrupt_byte_offset_ = 8; | |
348 | ||
349 | while (MoreBytesToCorrupt()) { | |
350 | // Corrupted write batch leads to read-only mode, so we have to | |
351 | // reopen for every attempt. | |
352 | ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); | |
353 | auto log_size_pre_write = dbfull()->TEST_total_log_size(); | |
354 | ||
355 | SyncPoint::GetInstance()->EnableProcessing(); | |
356 | ASSERT_TRUE(ExecuteWrite(nullptr /* cf_handle */).IsCorruption()); | |
357 | // Confirm that nothing was written to WAL | |
358 | ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size()); | |
359 | ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption()); | |
360 | SyncPoint::GetInstance()->DisableProcessing(); | |
361 | ||
362 | // In case the above callback is not invoked, this test will run | |
363 | // numeric_limits<size_t>::max() times until it reports an error (or will | |
364 | // exhaust disk space). Added this assert to report error early. | |
365 | ASSERT_TRUE(entry_len_ < std::numeric_limits<size_t>::max()); | |
366 | } | |
367 | } | |
368 | ||
369 | class DbKvChecksumTestMergedBatch | |
370 | : public DbKvChecksumTestBase, | |
371 | public ::testing::WithParamInterface< | |
372 | std::tuple<WriteBatchOpType, WriteBatchOpType, char>> { | |
373 | public: | |
374 | DbKvChecksumTestMergedBatch() | |
375 | : DbKvChecksumTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) { | |
376 | op_type1_ = std::get<0>(GetParam()); | |
377 | op_type2_ = std::get<1>(GetParam()); | |
378 | corrupt_byte_addend_ = std::get<2>(GetParam()); | |
379 | } | |
380 | ||
381 | protected: | |
382 | WriteBatchOpType op_type1_; | |
383 | WriteBatchOpType op_type2_; | |
384 | char corrupt_byte_addend_; | |
385 | }; | |
386 | ||
387 | void CorruptWriteBatch(Slice* content, size_t offset, | |
388 | char corrupt_byte_addend) { | |
389 | ASSERT_TRUE(offset < content->size()); | |
390 | char* buf = const_cast<char*>(content->data()); | |
391 | buf[offset] += corrupt_byte_addend; | |
392 | } | |
393 | ||
394 | TEST_P(DbKvChecksumTestMergedBatch, NoCorruptionCase) { | |
395 | // Veirfy write batch checksum after write batch append | |
396 | auto batch1 = GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_), | |
397 | 8 /* protection_bytes_per_key */, op_type1_); | |
398 | ASSERT_OK(batch1.second); | |
399 | auto batch2 = GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_), | |
400 | 8 /* protection_bytes_per_key */, op_type2_); | |
401 | ASSERT_OK(batch2.second); | |
402 | ASSERT_OK(WriteBatchInternal::Append(&batch1.first, &batch2.first)); | |
403 | ASSERT_OK(batch1.first.VerifyChecksum()); | |
404 | } | |
405 | ||
406 | TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) { | |
407 | // This test has two writers repeatedly attempt to write `WriteBatch`es | |
408 | // containing a single entry of type op_type1_ and op_type2_ respectively. The | |
409 | // leader of the write group writes the batch containinng the entry of type | |
410 | // op_type1_. One byte of the pre-merged write batches is corrupted by adding | |
411 | // `corrupt_byte_addend_` to the batch's original value during each attempt. | |
412 | // The test repeats until an attempt has been made on each byte in both | |
413 | // pre-merged write batches. All attempts are expected to fail with | |
414 | // `Status::Corruption`. | |
415 | Options options = CurrentOptions(); | |
416 | if (op_type1_ == WriteBatchOpType::kMerge || | |
417 | op_type2_ == WriteBatchOpType::kMerge) { | |
418 | options.merge_operator = MergeOperators::CreateStringAppendOperator(); | |
419 | } | |
420 | ||
421 | auto leader_batch_and_status = | |
422 | GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_), | |
423 | 8 /* protection_bytes_per_key */, op_type1_); | |
424 | ASSERT_OK(leader_batch_and_status.second); | |
425 | auto follower_batch_and_status = | |
426 | GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_), | |
427 | 8 /* protection_bytes_per_key */, op_type2_); | |
428 | size_t leader_batch_size = leader_batch_and_status.first.GetDataSize(); | |
429 | size_t total_bytes = | |
430 | leader_batch_size + follower_batch_and_status.first.GetDataSize(); | |
431 | // First 8 bytes are for sequence number which is not protected in write batch | |
432 | size_t corrupt_byte_offset = 8; | |
433 | ||
434 | std::atomic<bool> follower_joined{false}; | |
435 | std::atomic<int> leader_count{0}; | |
436 | port::Thread follower_thread; | |
437 | // This callback should only be called by the leader thread | |
438 | SyncPoint::GetInstance()->SetCallBack( | |
439 | "WriteThread::JoinBatchGroup:Wait2", [&](void* arg_leader) { | |
440 | auto* leader = reinterpret_cast<WriteThread::Writer*>(arg_leader); | |
441 | ASSERT_EQ(leader->state, WriteThread::STATE_GROUP_LEADER); | |
442 | ||
443 | // This callback should only be called by the follower thread | |
444 | SyncPoint::GetInstance()->SetCallBack( | |
445 | "WriteThread::JoinBatchGroup:Wait", [&](void* arg_follower) { | |
446 | auto* follower = | |
447 | reinterpret_cast<WriteThread::Writer*>(arg_follower); | |
448 | // The leader thread will wait on this bool and hence wait until | |
449 | // this writer joins the write group | |
450 | ASSERT_NE(follower->state, WriteThread::STATE_GROUP_LEADER); | |
451 | if (corrupt_byte_offset >= leader_batch_size) { | |
452 | Slice batch_content = follower->batch->Data(); | |
453 | CorruptWriteBatch(&batch_content, | |
454 | corrupt_byte_offset - leader_batch_size, | |
455 | corrupt_byte_addend_); | |
456 | } | |
457 | // Leader busy waits on this flag | |
458 | follower_joined = true; | |
459 | // So the follower does not enter the outer callback at | |
460 | // WriteThread::JoinBatchGroup:Wait2 | |
461 | SyncPoint::GetInstance()->DisableProcessing(); | |
462 | }); | |
463 | ||
464 | // Start the other writer thread which will join the write group as | |
465 | // follower | |
466 | follower_thread = port::Thread([&]() { | |
467 | follower_batch_and_status = | |
468 | GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_), | |
469 | 8 /* protection_bytes_per_key */, op_type2_); | |
470 | ASSERT_OK(follower_batch_and_status.second); | |
471 | ASSERT_TRUE( | |
472 | db_->Write(WriteOptions(), &follower_batch_and_status.first) | |
473 | .IsCorruption()); | |
474 | }); | |
475 | ||
476 | ASSERT_EQ(leader->batch->GetDataSize(), leader_batch_size); | |
477 | if (corrupt_byte_offset < leader_batch_size) { | |
478 | Slice batch_content = leader->batch->Data(); | |
479 | CorruptWriteBatch(&batch_content, corrupt_byte_offset, | |
480 | corrupt_byte_addend_); | |
481 | } | |
482 | leader_count++; | |
483 | while (!follower_joined) { | |
484 | // busy waiting | |
485 | } | |
486 | }); | |
487 | while (corrupt_byte_offset < total_bytes) { | |
488 | // Reopen DB since it failed WAL write which lead to read-only mode | |
489 | Reopen(options); | |
490 | SyncPoint::GetInstance()->EnableProcessing(); | |
491 | auto log_size_pre_write = dbfull()->TEST_total_log_size(); | |
492 | leader_batch_and_status = | |
493 | GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_), | |
494 | 8 /* protection_bytes_per_key */, op_type1_); | |
495 | ASSERT_OK(leader_batch_and_status.second); | |
496 | ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first) | |
497 | .IsCorruption()); | |
498 | follower_thread.join(); | |
499 | // Prevent leader thread from entering this callback | |
500 | SyncPoint::GetInstance()->ClearCallBack("WriteThread::JoinBatchGroup:Wait"); | |
501 | ASSERT_EQ(1, leader_count); | |
502 | // Nothing should have been written to WAL | |
503 | ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size()); | |
504 | ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption()); | |
505 | ||
506 | corrupt_byte_offset++; | |
507 | if (corrupt_byte_offset == leader_batch_size) { | |
508 | // skip over the sequence number part of follower's write batch | |
509 | corrupt_byte_offset += 8; | |
510 | } | |
511 | follower_joined = false; | |
512 | leader_count = 0; | |
513 | } | |
514 | SyncPoint::GetInstance()->DisableProcessing(); | |
515 | } | |
516 | ||
517 | TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) { | |
518 | // This test has two writers repeatedly attempt to write `WriteBatch`es | |
519 | // containing a single entry of type op_type1_ and op_type2_ respectively. The | |
520 | // leader of the write group writes the batch containinng the entry of type | |
521 | // op_type1_. One byte of the pre-merged write batches is corrupted by adding | |
522 | // `corrupt_byte_addend_` to the batch's original value during each attempt. | |
523 | // The test repeats until an attempt has been made on each byte in both | |
524 | // pre-merged write batches. All attempts are expected to fail with | |
525 | // `Status::Corruption`. | |
526 | Options options = CurrentOptions(); | |
527 | if (op_type1_ == WriteBatchOpType::kMerge || | |
528 | op_type2_ == WriteBatchOpType::kMerge) { | |
529 | options.merge_operator = MergeOperators::CreateStringAppendOperator(); | |
530 | } | |
531 | CreateAndReopenWithCF({"ramen"}, options); | |
532 | ||
533 | auto leader_batch_and_status = | |
534 | GetWriteBatch(GetCFHandleToUse(handles_[1], op_type1_), | |
535 | 8 /* protection_bytes_per_key */, op_type1_); | |
536 | ASSERT_OK(leader_batch_and_status.second); | |
537 | auto follower_batch_and_status = | |
538 | GetWriteBatch(GetCFHandleToUse(handles_[1], op_type2_), | |
539 | 8 /* protection_bytes_per_key */, op_type2_); | |
540 | size_t leader_batch_size = leader_batch_and_status.first.GetDataSize(); | |
541 | size_t total_bytes = | |
542 | leader_batch_size + follower_batch_and_status.first.GetDataSize(); | |
543 | // First 8 bytes are for sequence number which is not protected in write batch | |
544 | size_t corrupt_byte_offset = 8; | |
545 | ||
546 | std::atomic<bool> follower_joined{false}; | |
547 | std::atomic<int> leader_count{0}; | |
548 | port::Thread follower_thread; | |
549 | // This callback should only be called by the leader thread | |
550 | SyncPoint::GetInstance()->SetCallBack( | |
551 | "WriteThread::JoinBatchGroup:Wait2", [&](void* arg_leader) { | |
552 | auto* leader = reinterpret_cast<WriteThread::Writer*>(arg_leader); | |
553 | ASSERT_EQ(leader->state, WriteThread::STATE_GROUP_LEADER); | |
554 | ||
555 | // This callback should only be called by the follower thread | |
556 | SyncPoint::GetInstance()->SetCallBack( | |
557 | "WriteThread::JoinBatchGroup:Wait", [&](void* arg_follower) { | |
558 | auto* follower = | |
559 | reinterpret_cast<WriteThread::Writer*>(arg_follower); | |
560 | // The leader thread will wait on this bool and hence wait until | |
561 | // this writer joins the write group | |
562 | ASSERT_NE(follower->state, WriteThread::STATE_GROUP_LEADER); | |
563 | if (corrupt_byte_offset >= leader_batch_size) { | |
564 | Slice batch_content = | |
565 | WriteBatchInternal::Contents(follower->batch); | |
566 | CorruptWriteBatch(&batch_content, | |
567 | corrupt_byte_offset - leader_batch_size, | |
568 | corrupt_byte_addend_); | |
569 | } | |
570 | follower_joined = true; | |
571 | // So the follower does not enter the outer callback at | |
572 | // WriteThread::JoinBatchGroup:Wait2 | |
573 | SyncPoint::GetInstance()->DisableProcessing(); | |
574 | }); | |
575 | ||
576 | // Start the other writer thread which will join the write group as | |
577 | // follower | |
578 | follower_thread = port::Thread([&]() { | |
579 | follower_batch_and_status = | |
580 | GetWriteBatch(GetCFHandleToUse(handles_[1], op_type2_), | |
581 | 8 /* protection_bytes_per_key */, op_type2_); | |
582 | ASSERT_OK(follower_batch_and_status.second); | |
583 | ASSERT_TRUE( | |
584 | db_->Write(WriteOptions(), &follower_batch_and_status.first) | |
585 | .IsCorruption()); | |
586 | }); | |
587 | ||
588 | ASSERT_EQ(leader->batch->GetDataSize(), leader_batch_size); | |
589 | if (corrupt_byte_offset < leader_batch_size) { | |
590 | Slice batch_content = WriteBatchInternal::Contents(leader->batch); | |
591 | CorruptWriteBatch(&batch_content, corrupt_byte_offset, | |
592 | corrupt_byte_addend_); | |
593 | } | |
594 | leader_count++; | |
595 | while (!follower_joined) { | |
596 | // busy waiting | |
597 | } | |
598 | }); | |
599 | SyncPoint::GetInstance()->EnableProcessing(); | |
600 | while (corrupt_byte_offset < total_bytes) { | |
601 | // Reopen DB since it failed WAL write which lead to read-only mode | |
602 | ReopenWithColumnFamilies({kDefaultColumnFamilyName, "ramen"}, options); | |
603 | SyncPoint::GetInstance()->EnableProcessing(); | |
604 | auto log_size_pre_write = dbfull()->TEST_total_log_size(); | |
605 | leader_batch_and_status = | |
606 | GetWriteBatch(GetCFHandleToUse(handles_[1], op_type1_), | |
607 | 8 /* protection_bytes_per_key */, op_type1_); | |
608 | ASSERT_OK(leader_batch_and_status.second); | |
609 | ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first) | |
610 | .IsCorruption()); | |
611 | follower_thread.join(); | |
612 | // Prevent leader thread from entering this callback | |
613 | SyncPoint::GetInstance()->ClearCallBack("WriteThread::JoinBatchGroup:Wait"); | |
614 | ||
615 | ASSERT_EQ(1, leader_count); | |
616 | // Nothing should have been written to WAL | |
617 | ASSERT_EQ(log_size_pre_write, dbfull()->TEST_total_log_size()); | |
618 | ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption()); | |
619 | ||
620 | corrupt_byte_offset++; | |
621 | if (corrupt_byte_offset == leader_batch_size) { | |
622 | // skip over the sequence number part of follower's write batch | |
623 | corrupt_byte_offset += 8; | |
624 | } | |
625 | follower_joined = false; | |
626 | leader_count = 0; | |
627 | } | |
628 | SyncPoint::GetInstance()->DisableProcessing(); | |
629 | } | |
630 | ||
631 | INSTANTIATE_TEST_CASE_P( | |
632 | DbKvChecksumTestMergedBatch, DbKvChecksumTestMergedBatch, | |
633 | ::testing::Combine(::testing::Range(static_cast<WriteBatchOpType>(0), | |
634 | WriteBatchOpType::kNum), | |
635 | ::testing::Range(static_cast<WriteBatchOpType>(0), | |
636 | WriteBatchOpType::kNum), | |
637 | ::testing::Values(2, 103, 251)), | |
638 | [](const testing::TestParamInfo< | |
639 | std::tuple<WriteBatchOpType, WriteBatchOpType, char>>& args) { | |
640 | std::ostringstream oss; | |
641 | oss << GetOpTypeString(std::get<0>(args.param)) | |
642 | << GetOpTypeString(std::get<1>(args.param)) << "Add" | |
643 | << static_cast<int>( | |
644 | static_cast<unsigned char>(std::get<2>(args.param))); | |
645 | return oss.str(); | |
646 | }); | |
647 | ||
648 | // TODO: add test for transactions | |
649 | // TODO: add test for corrupted write batch with WAL disabled | |
650 | ||
651 | class DbKVChecksumWALToWriteBatchTest : public DBTestBase { | |
652 | public: | |
653 | DbKVChecksumWALToWriteBatchTest() | |
654 | : DBTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) {} | |
655 | }; | |
656 | ||
657 | TEST_F(DbKVChecksumWALToWriteBatchTest, WriteBatchChecksumHandoff) { | |
658 | Options options = CurrentOptions(); | |
659 | Reopen(options); | |
660 | ASSERT_OK(db_->Put(WriteOptions(), "key", "val")); | |
661 | std::string content = ""; | |
662 | SyncPoint::GetInstance()->SetCallBack( | |
663 | "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch", | |
664 | [&](void* batch_ptr) { | |
665 | WriteBatch* batch = reinterpret_cast<WriteBatch*>(batch_ptr); | |
666 | content.assign(batch->Data().data(), batch->GetDataSize()); | |
667 | Slice batch_content = batch->Data(); | |
668 | // Corrupt first bit | |
669 | CorruptWriteBatch(&batch_content, 0, 1); | |
670 | }); | |
671 | SyncPoint::GetInstance()->SetCallBack( | |
672 | "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum", | |
673 | [&](void* checksum_ptr) { | |
674 | // Verify that checksum is produced on the batch content | |
675 | uint64_t checksum = *reinterpret_cast<uint64_t*>(checksum_ptr); | |
676 | ASSERT_EQ(checksum, XXH3_64bits(content.data(), content.size())); | |
677 | }); | |
678 | SyncPoint::GetInstance()->EnableProcessing(); | |
679 | ASSERT_TRUE(TryReopen(options).IsCorruption()); | |
680 | SyncPoint::GetInstance()->DisableProcessing(); | |
681 | }; | |
682 | ||
683 | // TODO (cbi): add DeleteRange coverage once it is implemented | |
684 | class DbMemtableKVChecksumTest : public DbKvChecksumTest { | |
685 | public: | |
686 | DbMemtableKVChecksumTest() : DbKvChecksumTest() {} | |
687 | ||
688 | protected: | |
689 | // Indices in the memtable entry that we will not corrupt. | |
690 | // For memtable entry format, see comments in MemTable::Add(). | |
691 | // We do not corrupt key length and value length fields in this test | |
692 | // case since it causes segfault and ASAN will complain. | |
693 | // For this test case, key and value are all of length 3, so | |
694 | // key length field is at index 0 and value length field is at index 12. | |
695 | const std::set<size_t> index_not_to_corrupt{0, 12}; | |
696 | ||
697 | void SkipNotToCorruptEntry() { | |
698 | if (index_not_to_corrupt.find(corrupt_byte_offset_) != | |
699 | index_not_to_corrupt.end()) { | |
700 | corrupt_byte_offset_++; | |
701 | } | |
702 | } | |
703 | }; | |
704 | ||
705 | INSTANTIATE_TEST_CASE_P( | |
706 | DbMemtableKVChecksumTest, DbMemtableKVChecksumTest, | |
707 | ::testing::Combine(::testing::Range(static_cast<WriteBatchOpType>(0), | |
708 | WriteBatchOpType::kDeleteRange), | |
709 | ::testing::Values(2, 103, 251), | |
710 | ::testing::Range(static_cast<WriteMode>(0), | |
711 | WriteMode::kWriteOptionProtectedBatch), | |
712 | // skip 1 byte checksum as it makes test flaky | |
713 | ::testing::Values(2, 4, 8)), | |
714 | [](const testing::TestParamInfo< | |
715 | std::tuple<WriteBatchOpType, char, WriteMode, uint32_t>>& args) { | |
716 | std::ostringstream oss; | |
717 | oss << GetOpTypeString(std::get<0>(args.param)) << "Add" | |
718 | << static_cast<int>( | |
719 | static_cast<unsigned char>(std::get<1>(args.param))) | |
720 | << GetWriteModeString(std::get<2>(args.param)) | |
721 | << static_cast<uint32_t>(std::get<3>(args.param)); | |
722 | return oss.str(); | |
723 | }); | |
724 | ||
725 | TEST_P(DbMemtableKVChecksumTest, GetWithCorruptAfterMemtableInsert) { | |
726 | // Record memtable entry size. | |
727 | // Not corrupting memtable entry here since it will segfault | |
728 | // or fail some asserts inside memtablerep implementation | |
729 | // e.g., when key_len is corrupted. | |
730 | SyncPoint::GetInstance()->SetCallBack( | |
731 | "MemTable::Add:BeforeReturn:Encoded", [&](void* arg) { | |
732 | Slice encoded = *static_cast<Slice*>(arg); | |
733 | entry_len_ = encoded.size(); | |
734 | }); | |
735 | ||
736 | SyncPoint::GetInstance()->SetCallBack( | |
737 | "Memtable::SaveValue:Begin:entry", [&](void* entry) { | |
738 | char* buf = *static_cast<char**>(entry); | |
739 | buf[corrupt_byte_offset_] += corrupt_byte_addend_; | |
740 | ++corrupt_byte_offset_; | |
741 | }); | |
742 | SyncPoint::GetInstance()->EnableProcessing(); | |
743 | Options options = CurrentOptions(); | |
744 | options.memtable_protection_bytes_per_key = | |
745 | memtable_protection_bytes_per_key_; | |
746 | if (op_type_ == WriteBatchOpType::kMerge) { | |
747 | options.merge_operator = MergeOperators::CreateStringAppendOperator(); | |
748 | } | |
749 | ||
750 | SkipNotToCorruptEntry(); | |
751 | while (MoreBytesToCorrupt()) { | |
752 | Reopen(options); | |
753 | ASSERT_OK(ExecuteWrite(nullptr)); | |
754 | std::string val; | |
755 | ASSERT_TRUE(db_->Get(ReadOptions(), "key", &val).IsCorruption()); | |
756 | Destroy(options); | |
757 | SkipNotToCorruptEntry(); | |
758 | } | |
759 | } | |
760 | ||
761 | TEST_P(DbMemtableKVChecksumTest, | |
762 | GetWithColumnFamilyCorruptAfterMemtableInsert) { | |
763 | // Record memtable entry size. | |
764 | // Not corrupting memtable entry here since it will segfault | |
765 | // or fail some asserts inside memtablerep implementation | |
766 | // e.g., when key_len is corrupted. | |
767 | SyncPoint::GetInstance()->SetCallBack( | |
768 | "MemTable::Add:BeforeReturn:Encoded", [&](void* arg) { | |
769 | Slice encoded = *static_cast<Slice*>(arg); | |
770 | entry_len_ = encoded.size(); | |
771 | }); | |
772 | ||
773 | SyncPoint::GetInstance()->SetCallBack( | |
774 | "Memtable::SaveValue:Begin:entry", [&](void* entry) { | |
775 | char* buf = *static_cast<char**>(entry); | |
776 | buf[corrupt_byte_offset_] += corrupt_byte_addend_; | |
777 | ++corrupt_byte_offset_; | |
778 | }); | |
779 | SyncPoint::GetInstance()->EnableProcessing(); | |
780 | Options options = CurrentOptions(); | |
781 | options.memtable_protection_bytes_per_key = | |
782 | memtable_protection_bytes_per_key_; | |
783 | if (op_type_ == WriteBatchOpType::kMerge) { | |
784 | options.merge_operator = MergeOperators::CreateStringAppendOperator(); | |
785 | } | |
786 | ||
787 | SkipNotToCorruptEntry(); | |
788 | while (MoreBytesToCorrupt()) { | |
789 | Reopen(options); | |
790 | CreateAndReopenWithCF({"pikachu"}, options); | |
791 | ASSERT_OK(ExecuteWrite(handles_[1])); | |
792 | std::string val; | |
793 | ASSERT_TRUE( | |
794 | db_->Get(ReadOptions(), handles_[1], "key", &val).IsCorruption()); | |
795 | Destroy(options); | |
796 | SkipNotToCorruptEntry(); | |
797 | } | |
798 | } | |
799 | ||
800 | TEST_P(DbMemtableKVChecksumTest, IteratorWithCorruptAfterMemtableInsert) { | |
801 | SyncPoint::GetInstance()->SetCallBack( | |
802 | "MemTable::Add:BeforeReturn:Encoded", | |
803 | std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this, | |
804 | std::placeholders::_1)); | |
805 | SyncPoint::GetInstance()->EnableProcessing(); | |
806 | Options options = CurrentOptions(); | |
807 | options.memtable_protection_bytes_per_key = | |
808 | memtable_protection_bytes_per_key_; | |
809 | if (op_type_ == WriteBatchOpType::kMerge) { | |
810 | options.merge_operator = MergeOperators::CreateStringAppendOperator(); | |
811 | } | |
812 | ||
813 | SkipNotToCorruptEntry(); | |
814 | while (MoreBytesToCorrupt()) { | |
815 | Reopen(options); | |
816 | ASSERT_OK(ExecuteWrite(nullptr)); | |
817 | Iterator* it = db_->NewIterator(ReadOptions()); | |
818 | it->SeekToFirst(); | |
819 | ASSERT_FALSE(it->Valid()); | |
820 | ASSERT_TRUE(it->status().IsCorruption()); | |
821 | delete it; | |
822 | Destroy(options); | |
823 | SkipNotToCorruptEntry(); | |
824 | } | |
825 | } | |
826 | ||
827 | TEST_P(DbMemtableKVChecksumTest, | |
828 | IteratorWithColumnFamilyCorruptAfterMemtableInsert) { | |
829 | SyncPoint::GetInstance()->SetCallBack( | |
830 | "MemTable::Add:BeforeReturn:Encoded", | |
831 | std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this, | |
832 | std::placeholders::_1)); | |
833 | SyncPoint::GetInstance()->EnableProcessing(); | |
834 | Options options = CurrentOptions(); | |
835 | options.memtable_protection_bytes_per_key = | |
836 | memtable_protection_bytes_per_key_; | |
837 | if (op_type_ == WriteBatchOpType::kMerge) { | |
838 | options.merge_operator = MergeOperators::CreateStringAppendOperator(); | |
839 | } | |
840 | ||
841 | SkipNotToCorruptEntry(); | |
842 | while (MoreBytesToCorrupt()) { | |
843 | Reopen(options); | |
844 | CreateAndReopenWithCF({"pikachu"}, options); | |
845 | ASSERT_OK(ExecuteWrite(handles_[1])); | |
846 | Iterator* it = db_->NewIterator(ReadOptions(), handles_[1]); | |
847 | it->SeekToFirst(); | |
848 | ASSERT_FALSE(it->Valid()); | |
849 | ASSERT_TRUE(it->status().IsCorruption()); | |
850 | delete it; | |
851 | Destroy(options); | |
852 | SkipNotToCorruptEntry(); | |
853 | } | |
854 | } | |
855 | ||
856 | TEST_P(DbMemtableKVChecksumTest, FlushWithCorruptAfterMemtableInsert) { | |
857 | SyncPoint::GetInstance()->SetCallBack( | |
858 | "MemTable::Add:BeforeReturn:Encoded", | |
859 | std::bind(&DbKvChecksumTest::CorruptNextByteCallBack, this, | |
860 | std::placeholders::_1)); | |
861 | SyncPoint::GetInstance()->EnableProcessing(); | |
862 | Options options = CurrentOptions(); | |
863 | options.memtable_protection_bytes_per_key = | |
864 | memtable_protection_bytes_per_key_; | |
865 | if (op_type_ == WriteBatchOpType::kMerge) { | |
866 | options.merge_operator = MergeOperators::CreateStringAppendOperator(); | |
867 | } | |
868 | ||
869 | SkipNotToCorruptEntry(); | |
870 | // Not corruping each byte like other tests since Flush() is relatively slow. | |
871 | Reopen(options); | |
872 | ASSERT_OK(ExecuteWrite(nullptr)); | |
873 | ASSERT_TRUE(Flush().IsCorruption()); | |
874 | // DB enters read-only state when flush reads corrupted data | |
875 | ASSERT_TRUE(dbfull()->TEST_GetBGError().IsCorruption()); | |
876 | Destroy(options); | |
877 | } | |
878 | ||
879 | } // namespace ROCKSDB_NAMESPACE | |
880 | ||
881 | int main(int argc, char** argv) { | |
882 | ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); | |
883 | ::testing::InitGoogleTest(&argc, argv); | |
884 | return RUN_ALL_TESTS(); | |
885 | } |