1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
11 #include "db_stress_tool/db_stress_common.h"
12 #include "file/file_util.h"
14 namespace ROCKSDB_NAMESPACE
{
15 class CfConsistencyStressTest
: public StressTest
{
17 CfConsistencyStressTest() : batch_id_(0) {}
19 ~CfConsistencyStressTest() override
{}
21 Status
TestPut(ThreadState
* thread
, WriteOptions
& write_opts
,
22 const ReadOptions
& /* read_opts */,
23 const std::vector
<int>& rand_column_families
,
24 const std::vector
<int64_t>& rand_keys
, char (&value
)[100],
25 std::unique_ptr
<MutexLock
>& /* lock */) override
{
26 std::string key_str
= Key(rand_keys
[0]);
28 uint64_t value_base
= batch_id_
.fetch_add(1);
30 GenerateValue(static_cast<uint32_t>(value_base
), value
, sizeof(value
));
33 for (auto cf
: rand_column_families
) {
34 ColumnFamilyHandle
* cfh
= column_families_
[cf
];
35 if (FLAGS_use_merge
) {
36 batch
.Merge(cfh
, key
, v
);
37 } else { /* !FLAGS_use_merge */
38 batch
.Put(cfh
, key
, v
);
41 Status s
= db_
->Write(write_opts
, &batch
);
43 fprintf(stderr
, "multi put or merge error: %s\n", s
.ToString().c_str());
44 thread
->stats
.AddErrors(1);
46 auto num
= static_cast<long>(rand_column_families
.size());
47 thread
->stats
.AddBytesForWrites(num
, (sz
+ 1) * num
);
53 Status
TestDelete(ThreadState
* thread
, WriteOptions
& write_opts
,
54 const std::vector
<int>& rand_column_families
,
55 const std::vector
<int64_t>& rand_keys
,
56 std::unique_ptr
<MutexLock
>& /* lock */) override
{
57 std::string key_str
= Key(rand_keys
[0]);
60 for (auto cf
: rand_column_families
) {
61 ColumnFamilyHandle
* cfh
= column_families_
[cf
];
62 batch
.Delete(cfh
, key
);
64 Status s
= db_
->Write(write_opts
, &batch
);
66 fprintf(stderr
, "multidel error: %s\n", s
.ToString().c_str());
67 thread
->stats
.AddErrors(1);
69 thread
->stats
.AddDeletes(static_cast<long>(rand_column_families
.size()));
74 Status
TestDeleteRange(ThreadState
* thread
, WriteOptions
& write_opts
,
75 const std::vector
<int>& rand_column_families
,
76 const std::vector
<int64_t>& rand_keys
,
77 std::unique_ptr
<MutexLock
>& /* lock */) override
{
78 int64_t rand_key
= rand_keys
[0];
79 auto shared
= thread
->shared
;
80 int64_t max_key
= shared
->GetMaxKey();
81 if (rand_key
> max_key
- FLAGS_range_deletion_width
) {
83 thread
->rand
.Next() % (max_key
- FLAGS_range_deletion_width
+ 1);
85 std::string key_str
= Key(rand_key
);
87 std::string end_key_str
= Key(rand_key
+ FLAGS_range_deletion_width
);
88 Slice end_key
= end_key_str
;
90 for (auto cf
: rand_column_families
) {
91 ColumnFamilyHandle
* cfh
= column_families_
[rand_column_families
[cf
]];
92 batch
.DeleteRange(cfh
, key
, end_key
);
94 Status s
= db_
->Write(write_opts
, &batch
);
96 fprintf(stderr
, "multi del range error: %s\n", s
.ToString().c_str());
97 thread
->stats
.AddErrors(1);
99 thread
->stats
.AddRangeDeletions(
100 static_cast<long>(rand_column_families
.size()));
105 void TestIngestExternalFile(
106 ThreadState
* /* thread */,
107 const std::vector
<int>& /* rand_column_families */,
108 const std::vector
<int64_t>& /* rand_keys */,
109 std::unique_ptr
<MutexLock
>& /* lock */) override
{
112 "CfConsistencyStressTest does not support TestIngestExternalFile "
113 "because it's not possible to verify the result\n");
117 Status
TestGet(ThreadState
* thread
, const ReadOptions
& readoptions
,
118 const std::vector
<int>& rand_column_families
,
119 const std::vector
<int64_t>& rand_keys
) override
{
120 std::string key_str
= Key(rand_keys
[0]);
123 bool is_consistent
= true;
125 if (thread
->rand
.OneIn(2)) {
126 // 1/2 chance, does a random read from random CF
128 column_families_
[rand_column_families
[thread
->rand
.Next() %
129 rand_column_families
.size()]];
131 s
= db_
->Get(readoptions
, cfh
, key
, &from_db
);
133 // 1/2 chance, comparing one key is the same across all CFs
134 const Snapshot
* snapshot
= db_
->GetSnapshot();
135 ReadOptions readoptionscopy
= readoptions
;
136 readoptionscopy
.snapshot
= snapshot
;
139 s
= db_
->Get(readoptionscopy
, column_families_
[rand_column_families
[0]],
141 if (s
.ok() || s
.IsNotFound()) {
143 for (size_t i
= 1; i
< rand_column_families
.size(); i
++) {
145 s
= db_
->Get(readoptionscopy
,
146 column_families_
[rand_column_families
[i
]], key
, &value1
);
147 if (!s
.ok() && !s
.IsNotFound()) {
150 if (!found
&& s
.ok()) {
151 fprintf(stderr
, "Get() return different results with key %s\n",
152 Slice(key_str
).ToString(true).c_str());
153 fprintf(stderr
, "CF %s is not found\n",
154 column_family_names_
[0].c_str());
155 fprintf(stderr
, "CF %s returns value %s\n",
156 column_family_names_
[i
].c_str(),
157 Slice(value1
).ToString(true).c_str());
158 is_consistent
= false;
159 } else if (found
&& s
.IsNotFound()) {
160 fprintf(stderr
, "Get() return different results with key %s\n",
161 Slice(key_str
).ToString(true).c_str());
162 fprintf(stderr
, "CF %s returns value %s\n",
163 column_family_names_
[0].c_str(),
164 Slice(value0
).ToString(true).c_str());
165 fprintf(stderr
, "CF %s is not found\n",
166 column_family_names_
[i
].c_str());
167 is_consistent
= false;
168 } else if (s
.ok() && value0
!= value1
) {
169 fprintf(stderr
, "Get() return different results with key %s\n",
170 Slice(key_str
).ToString(true).c_str());
171 fprintf(stderr
, "CF %s returns value %s\n",
172 column_family_names_
[0].c_str(),
173 Slice(value0
).ToString(true).c_str());
174 fprintf(stderr
, "CF %s returns value %s\n",
175 column_family_names_
[i
].c_str(),
176 Slice(value1
).ToString(true).c_str());
177 is_consistent
= false;
179 if (!is_consistent
) {
185 db_
->ReleaseSnapshot(snapshot
);
187 if (!is_consistent
) {
188 fprintf(stderr
, "TestGet error: is_consistent is false\n");
189 thread
->stats
.AddErrors(1);
190 // Fail fast to preserve the DB state.
191 thread
->shared
->SetVerificationFailure();
193 thread
->stats
.AddGets(1, 1);
194 } else if (s
.IsNotFound()) {
195 thread
->stats
.AddGets(1, 0);
197 fprintf(stderr
, "TestGet error: %s\n", s
.ToString().c_str());
198 thread
->stats
.AddErrors(1);
203 std::vector
<Status
> TestMultiGet(
204 ThreadState
* thread
, const ReadOptions
& read_opts
,
205 const std::vector
<int>& rand_column_families
,
206 const std::vector
<int64_t>& rand_keys
) override
{
207 size_t num_keys
= rand_keys
.size();
208 std::vector
<std::string
> key_str
;
209 std::vector
<Slice
> keys
;
210 keys
.reserve(num_keys
);
211 key_str
.reserve(num_keys
);
212 std::vector
<PinnableSlice
> values(num_keys
);
213 std::vector
<Status
> statuses(num_keys
);
214 ColumnFamilyHandle
* cfh
= column_families_
[rand_column_families
[0]];
216 for (size_t i
= 0; i
< num_keys
; ++i
) {
217 key_str
.emplace_back(Key(rand_keys
[i
]));
218 keys
.emplace_back(key_str
.back());
220 db_
->MultiGet(read_opts
, cfh
, num_keys
, keys
.data(), values
.data(),
222 for (auto s
: statuses
) {
225 thread
->stats
.AddGets(1, 1);
226 } else if (s
.IsNotFound()) {
228 thread
->stats
.AddGets(1, 0);
231 fprintf(stderr
, "MultiGet error: %s\n", s
.ToString().c_str());
232 thread
->stats
.AddErrors(1);
238 Status
TestPrefixScan(ThreadState
* thread
, const ReadOptions
& readoptions
,
239 const std::vector
<int>& rand_column_families
,
240 const std::vector
<int64_t>& rand_keys
) override
{
241 size_t prefix_to_use
=
242 (FLAGS_prefix_size
< 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size
);
244 std::string key_str
= Key(rand_keys
[0]);
246 Slice prefix
= Slice(key
.data(), prefix_to_use
);
248 std::string upper_bound
;
250 ReadOptions ro_copy
= readoptions
;
251 // Get the next prefix first and then see if we want to set upper bound.
252 // We'll use the next prefix in an assertion later on
253 if (GetNextPrefix(prefix
, &upper_bound
) && thread
->rand
.OneIn(2)) {
254 ub_slice
= Slice(upper_bound
);
255 ro_copy
.iterate_upper_bound
= &ub_slice
;
258 column_families_
[rand_column_families
[thread
->rand
.Next() %
259 rand_column_families
.size()]];
260 Iterator
* iter
= db_
->NewIterator(ro_copy
, cfh
);
261 unsigned long count
= 0;
262 for (iter
->Seek(prefix
); iter
->Valid() && iter
->key().starts_with(prefix
);
266 assert(prefix_to_use
== 0 ||
267 count
<= GetPrefixKeyCount(prefix
.ToString(), upper_bound
));
268 Status s
= iter
->status();
270 thread
->stats
.AddPrefixes(1, count
);
272 fprintf(stderr
, "TestPrefixScan error: %s\n", s
.ToString().c_str());
273 thread
->stats
.AddErrors(1);
279 ColumnFamilyHandle
* GetControlCfh(ThreadState
* thread
,
280 int /*column_family_id*/
282 // All column families should contain the same data. Randomly pick one.
283 return column_families_
[thread
->rand
.Next() % column_families_
.size()];
286 void VerifyDb(ThreadState
* thread
) const override
{
287 ReadOptions
options(FLAGS_verify_checksum
, true);
288 // We must set total_order_seek to true because we are doing a SeekToFirst
289 // on a column family whose memtables may support (by default) prefix-based
290 // iterator. In this case, NewIterator with options.total_order_seek being
291 // false returns a prefix-based iterator. Calling SeekToFirst using this
292 // iterator causes the iterator to become invalid. That means we cannot
293 // iterate the memtable using this iterator any more, although the memtable
294 // contains the most up-to-date key-values.
295 options
.total_order_seek
= true;
296 const auto ss_deleter
= [this](const Snapshot
* ss
) {
297 db_
->ReleaseSnapshot(ss
);
299 std::unique_ptr
<const Snapshot
, decltype(ss_deleter
)> snapshot_guard(
300 db_
->GetSnapshot(), ss_deleter
);
301 options
.snapshot
= snapshot_guard
.get();
302 assert(thread
!= nullptr);
303 auto shared
= thread
->shared
;
304 std::vector
<std::unique_ptr
<Iterator
>> iters(column_families_
.size());
305 for (size_t i
= 0; i
!= column_families_
.size(); ++i
) {
306 iters
[i
].reset(db_
->NewIterator(options
, column_families_
[i
]));
308 for (auto& iter
: iters
) {
311 size_t num
= column_families_
.size();
312 assert(num
== iters
.size());
313 std::vector
<Status
> statuses(num
, Status::OK());
315 if (shared
->HasVerificationFailedYet()) {
318 size_t valid_cnt
= 0;
320 for (auto& iter
: iters
) {
324 statuses
[idx
] = iter
->status();
328 if (valid_cnt
== 0) {
330 for (size_t i
= 0; i
!= num
; ++i
) {
331 const auto& s
= statuses
[i
];
334 fprintf(stderr
, "Iterator on cf %s has error: %s\n",
335 column_families_
[i
]->GetName().c_str(),
336 s
.ToString().c_str());
337 shared
->SetVerificationFailure();
341 } else if (valid_cnt
!= iters
.size()) {
342 shared
->SetVerificationFailure();
343 for (size_t i
= 0; i
!= num
; ++i
) {
344 if (!iters
[i
]->Valid()) {
345 if (statuses
[i
].ok()) {
346 fprintf(stderr
, "Finished scanning cf %s\n",
347 column_families_
[i
]->GetName().c_str());
349 fprintf(stderr
, "Iterator on cf %s has error: %s\n",
350 column_families_
[i
]->GetName().c_str(),
351 statuses
[i
].ToString().c_str());
354 fprintf(stderr
, "cf %s has remaining data to scan\n",
355 column_families_
[i
]->GetName().c_str());
360 if (shared
->HasVerificationFailedYet()) {
363 // If the program reaches here, then all column families' iterators are
365 if (shared
->PrintingVerificationResults()) {
370 int num_mismatched_cfs
= 0;
371 for (size_t i
= 0; i
!= num
; ++i
) {
373 key
= iters
[i
]->key();
374 value
= iters
[i
]->value();
376 int cmp
= key
.compare(iters
[i
]->key());
378 ++num_mismatched_cfs
;
379 if (1 == num_mismatched_cfs
) {
380 fprintf(stderr
, "Verification failed\n");
381 fprintf(stderr
, "Latest Sequence Number: %" PRIu64
"\n",
382 db_
->GetLatestSequenceNumber());
383 fprintf(stderr
, "[%s] %s => %s\n",
384 column_families_
[0]->GetName().c_str(),
385 key
.ToString(true /* hex */).c_str(),
386 value
.ToString(true /* hex */).c_str());
388 fprintf(stderr
, "[%s] %s => %s\n",
389 column_families_
[i
]->GetName().c_str(),
390 iters
[i
]->key().ToString(true /* hex */).c_str(),
391 iters
[i
]->value().ToString(true /* hex */).c_str());
397 end_key
= iters
[i
]->key();
399 begin_key
= iters
[i
]->key();
402 std::vector
<KeyVersion
> versions
;
403 const size_t kMaxNumIKeys
= 8;
404 const auto print_key_versions
= [&](ColumnFamilyHandle
* cfh
) {
405 Status s
= GetAllKeyVersions(db_
, cfh
, begin_key
, end_key
,
406 kMaxNumIKeys
, &versions
);
408 fprintf(stderr
, "%s\n", s
.ToString().c_str());
411 assert(nullptr != cfh
);
413 "Internal keys in CF '%s', [%s, %s] (max %" ROCKSDB_PRIszt
415 cfh
->GetName().c_str(),
416 begin_key
.ToString(true /* hex */).c_str(),
417 end_key
.ToString(true /* hex */).c_str(), kMaxNumIKeys
);
418 for (const KeyVersion
& kv
: versions
) {
419 fprintf(stderr
, " key %s seq %" PRIu64
" type %d\n",
420 Slice(kv
.user_key
).ToString(true).c_str(), kv
.sequence
,
424 if (1 == num_mismatched_cfs
) {
425 print_key_versions(column_families_
[0]);
427 print_key_versions(column_families_
[i
]);
428 #endif // ROCKSDB_LITE
429 shared
->SetVerificationFailure();
433 shared
->FinishPrintingVerificationResults();
434 for (auto& iter
: iters
) {
441 void ContinuouslyVerifyDb(ThreadState
* thread
) const override
{
445 DB
* db_ptr
= cmp_db_
? cmp_db_
: db_
;
446 const auto& cfhs
= cmp_db_
? cmp_cfhs_
: column_families_
;
447 const auto ss_deleter
= [&](const Snapshot
* ss
) {
448 db_ptr
->ReleaseSnapshot(ss
);
450 std::unique_ptr
<const Snapshot
, decltype(ss_deleter
)> snapshot_guard(
451 db_ptr
->GetSnapshot(), ss_deleter
);
453 status
= cmp_db_
->TryCatchUpWithPrimary();
455 SharedState
* shared
= thread
->shared
;
458 shared
->SetShouldStopTest();
461 assert(cmp_db_
|| snapshot_guard
.get());
462 const auto checksum_column_family
= [](Iterator
* iter
,
463 uint32_t* checksum
) -> Status
{
464 assert(nullptr != checksum
);
466 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
467 ret
= crc32c::Extend(ret
, iter
->key().data(), iter
->key().size());
468 ret
= crc32c::Extend(ret
, iter
->value().data(), iter
->value().size());
471 return iter
->status();
474 ropts
.total_order_seek
= true;
475 ropts
.snapshot
= snapshot_guard
.get();
478 // Compute crc for all key-values of default column family.
479 std::unique_ptr
<Iterator
> it(db_ptr
->NewIterator(ropts
));
480 status
= checksum_column_family(it
.get(), &crc
);
482 uint32_t tmp_crc
= 0;
484 for (ColumnFamilyHandle
* cfh
: cfhs
) {
485 if (cfh
== db_ptr
->DefaultColumnFamily()) {
488 std::unique_ptr
<Iterator
> it(db_ptr
->NewIterator(ropts
, cfh
));
489 status
= checksum_column_family(it
.get(), &tmp_crc
);
490 if (!status
.ok() || tmp_crc
!= crc
) {
495 if (!status
.ok() || tmp_crc
!= crc
) {
496 shared
->SetShouldStopTest();
499 #endif // !ROCKSDB_LITE
501 std::vector
<int> GenerateColumnFamilies(
502 const int /* num_column_families */,
503 int /* rand_column_family */) const override
{
504 std::vector
<int> ret
;
505 int num
= static_cast<int>(column_families_
.size());
507 std::generate_n(back_inserter(ret
), num
, [&k
]() -> int { return k
++; });
512 std::atomic
<int64_t> batch_id_
;
515 StressTest
* CreateCfConsistencyStressTest() {
516 return new CfConsistencyStressTest();
519 } // namespace ROCKSDB_NAMESPACE