]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | // | |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #ifdef GFLAGS | |
11 | #include "db_stress_tool/db_stress_common.h" | |
12 | ||
13 | namespace ROCKSDB_NAMESPACE { | |
14 | class CfConsistencyStressTest : public StressTest { | |
15 | public: | |
16 | CfConsistencyStressTest() : batch_id_(0) {} | |
17 | ||
18 | ~CfConsistencyStressTest() override {} | |
19 | ||
20 | Status TestPut(ThreadState* thread, WriteOptions& write_opts, | |
21 | const ReadOptions& /* read_opts */, | |
22 | const std::vector<int>& rand_column_families, | |
23 | const std::vector<int64_t>& rand_keys, char (&value)[100], | |
24 | std::unique_ptr<MutexLock>& /* lock */) override { | |
25 | std::string key_str = Key(rand_keys[0]); | |
26 | Slice key = key_str; | |
27 | uint64_t value_base = batch_id_.fetch_add(1); | |
28 | size_t sz = | |
29 | GenerateValue(static_cast<uint32_t>(value_base), value, sizeof(value)); | |
30 | Slice v(value, sz); | |
31 | WriteBatch batch; | |
32 | for (auto cf : rand_column_families) { | |
33 | ColumnFamilyHandle* cfh = column_families_[cf]; | |
34 | if (FLAGS_use_merge) { | |
35 | batch.Merge(cfh, key, v); | |
36 | } else { /* !FLAGS_use_merge */ | |
37 | batch.Put(cfh, key, v); | |
38 | } | |
39 | } | |
40 | Status s = db_->Write(write_opts, &batch); | |
41 | if (!s.ok()) { | |
42 | fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str()); | |
43 | thread->stats.AddErrors(1); | |
44 | } else { | |
45 | auto num = static_cast<long>(rand_column_families.size()); | |
46 | thread->stats.AddBytesForWrites(num, (sz + 1) * num); | |
47 | } | |
48 | ||
49 | return s; | |
50 | } | |
51 | ||
52 | Status TestDelete(ThreadState* thread, WriteOptions& write_opts, | |
53 | const std::vector<int>& rand_column_families, | |
54 | const std::vector<int64_t>& rand_keys, | |
55 | std::unique_ptr<MutexLock>& /* lock */) override { | |
56 | std::string key_str = Key(rand_keys[0]); | |
57 | Slice key = key_str; | |
58 | WriteBatch batch; | |
59 | for (auto cf : rand_column_families) { | |
60 | ColumnFamilyHandle* cfh = column_families_[cf]; | |
61 | batch.Delete(cfh, key); | |
62 | } | |
63 | Status s = db_->Write(write_opts, &batch); | |
64 | if (!s.ok()) { | |
65 | fprintf(stderr, "multidel error: %s\n", s.ToString().c_str()); | |
66 | thread->stats.AddErrors(1); | |
67 | } else { | |
68 | thread->stats.AddDeletes(static_cast<long>(rand_column_families.size())); | |
69 | } | |
70 | return s; | |
71 | } | |
72 | ||
73 | Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts, | |
74 | const std::vector<int>& rand_column_families, | |
75 | const std::vector<int64_t>& rand_keys, | |
76 | std::unique_ptr<MutexLock>& /* lock */) override { | |
77 | int64_t rand_key = rand_keys[0]; | |
78 | auto shared = thread->shared; | |
79 | int64_t max_key = shared->GetMaxKey(); | |
80 | if (rand_key > max_key - FLAGS_range_deletion_width) { | |
81 | rand_key = | |
82 | thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1); | |
83 | } | |
84 | std::string key_str = Key(rand_key); | |
85 | Slice key = key_str; | |
86 | std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width); | |
87 | Slice end_key = end_key_str; | |
88 | WriteBatch batch; | |
89 | for (auto cf : rand_column_families) { | |
90 | ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]]; | |
91 | batch.DeleteRange(cfh, key, end_key); | |
92 | } | |
93 | Status s = db_->Write(write_opts, &batch); | |
94 | if (!s.ok()) { | |
95 | fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str()); | |
96 | thread->stats.AddErrors(1); | |
97 | } else { | |
98 | thread->stats.AddRangeDeletions( | |
99 | static_cast<long>(rand_column_families.size())); | |
100 | } | |
101 | return s; | |
102 | } | |
103 | ||
104 | void TestIngestExternalFile( | |
105 | ThreadState* /* thread */, | |
106 | const std::vector<int>& /* rand_column_families */, | |
107 | const std::vector<int64_t>& /* rand_keys */, | |
108 | std::unique_ptr<MutexLock>& /* lock */) override { | |
109 | assert(false); | |
110 | fprintf(stderr, | |
111 | "CfConsistencyStressTest does not support TestIngestExternalFile " | |
112 | "because it's not possible to verify the result\n"); | |
113 | std::terminate(); | |
114 | } | |
115 | ||
116 | Status TestGet(ThreadState* thread, const ReadOptions& readoptions, | |
117 | const std::vector<int>& rand_column_families, | |
118 | const std::vector<int64_t>& rand_keys) override { | |
119 | std::string key_str = Key(rand_keys[0]); | |
120 | Slice key = key_str; | |
121 | Status s; | |
122 | bool is_consistent = true; | |
123 | ||
124 | if (thread->rand.OneIn(2)) { | |
125 | // 1/2 chance, does a random read from random CF | |
126 | auto cfh = | |
127 | column_families_[rand_column_families[thread->rand.Next() % | |
128 | rand_column_families.size()]]; | |
129 | std::string from_db; | |
130 | s = db_->Get(readoptions, cfh, key, &from_db); | |
131 | } else { | |
132 | // 1/2 chance, comparing one key is the same across all CFs | |
133 | const Snapshot* snapshot = db_->GetSnapshot(); | |
134 | ReadOptions readoptionscopy = readoptions; | |
135 | readoptionscopy.snapshot = snapshot; | |
136 | ||
137 | std::string value0; | |
138 | s = db_->Get(readoptionscopy, column_families_[rand_column_families[0]], | |
139 | key, &value0); | |
140 | if (s.ok() || s.IsNotFound()) { | |
141 | bool found = s.ok(); | |
142 | for (size_t i = 1; i < rand_column_families.size(); i++) { | |
143 | std::string value1; | |
144 | s = db_->Get(readoptionscopy, | |
145 | column_families_[rand_column_families[i]], key, &value1); | |
146 | if (!s.ok() && !s.IsNotFound()) { | |
147 | break; | |
148 | } | |
149 | if (!found && s.ok()) { | |
150 | fprintf(stderr, "Get() return different results with key %s\n", | |
151 | Slice(key_str).ToString(true).c_str()); | |
152 | fprintf(stderr, "CF %s is not found\n", | |
153 | column_family_names_[0].c_str()); | |
154 | fprintf(stderr, "CF %s returns value %s\n", | |
155 | column_family_names_[i].c_str(), | |
156 | Slice(value1).ToString(true).c_str()); | |
157 | is_consistent = false; | |
158 | } else if (found && s.IsNotFound()) { | |
159 | fprintf(stderr, "Get() return different results with key %s\n", | |
160 | Slice(key_str).ToString(true).c_str()); | |
161 | fprintf(stderr, "CF %s returns value %s\n", | |
162 | column_family_names_[0].c_str(), | |
163 | Slice(value0).ToString(true).c_str()); | |
164 | fprintf(stderr, "CF %s is not found\n", | |
165 | column_family_names_[i].c_str()); | |
166 | is_consistent = false; | |
167 | } else if (s.ok() && value0 != value1) { | |
168 | fprintf(stderr, "Get() return different results with key %s\n", | |
169 | Slice(key_str).ToString(true).c_str()); | |
170 | fprintf(stderr, "CF %s returns value %s\n", | |
171 | column_family_names_[0].c_str(), | |
172 | Slice(value0).ToString(true).c_str()); | |
173 | fprintf(stderr, "CF %s returns value %s\n", | |
174 | column_family_names_[i].c_str(), | |
175 | Slice(value1).ToString(true).c_str()); | |
176 | is_consistent = false; | |
177 | } | |
178 | if (!is_consistent) { | |
179 | break; | |
180 | } | |
181 | } | |
182 | } | |
183 | ||
184 | db_->ReleaseSnapshot(snapshot); | |
185 | } | |
186 | if (!is_consistent) { | |
187 | fprintf(stderr, "TestGet error: is_consistent is false\n"); | |
188 | thread->stats.AddErrors(1); | |
189 | // Fail fast to preserve the DB state. | |
190 | thread->shared->SetVerificationFailure(); | |
191 | } else if (s.ok()) { | |
192 | thread->stats.AddGets(1, 1); | |
193 | } else if (s.IsNotFound()) { | |
194 | thread->stats.AddGets(1, 0); | |
195 | } else { | |
196 | fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str()); | |
197 | thread->stats.AddErrors(1); | |
198 | } | |
199 | return s; | |
200 | } | |
201 | ||
202 | std::vector<Status> TestMultiGet( | |
203 | ThreadState* thread, const ReadOptions& read_opts, | |
204 | const std::vector<int>& rand_column_families, | |
205 | const std::vector<int64_t>& rand_keys) override { | |
206 | size_t num_keys = rand_keys.size(); | |
207 | std::vector<std::string> key_str; | |
208 | std::vector<Slice> keys; | |
209 | keys.reserve(num_keys); | |
210 | key_str.reserve(num_keys); | |
211 | std::vector<PinnableSlice> values(num_keys); | |
212 | std::vector<Status> statuses(num_keys); | |
213 | ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]]; | |
214 | ||
215 | for (size_t i = 0; i < num_keys; ++i) { | |
216 | key_str.emplace_back(Key(rand_keys[i])); | |
217 | keys.emplace_back(key_str.back()); | |
218 | } | |
219 | db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), | |
220 | statuses.data()); | |
221 | for (auto s : statuses) { | |
222 | if (s.ok()) { | |
223 | // found case | |
224 | thread->stats.AddGets(1, 1); | |
225 | } else if (s.IsNotFound()) { | |
226 | // not found case | |
227 | thread->stats.AddGets(1, 0); | |
228 | } else { | |
229 | // errors case | |
230 | fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str()); | |
231 | thread->stats.AddErrors(1); | |
232 | } | |
233 | } | |
234 | return statuses; | |
235 | } | |
236 | ||
237 | Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions, | |
238 | const std::vector<int>& rand_column_families, | |
239 | const std::vector<int64_t>& rand_keys) override { | |
240 | size_t prefix_to_use = | |
241 | (FLAGS_prefix_size < 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size); | |
242 | ||
243 | std::string key_str = Key(rand_keys[0]); | |
244 | Slice key = key_str; | |
245 | Slice prefix = Slice(key.data(), prefix_to_use); | |
246 | ||
247 | std::string upper_bound; | |
248 | Slice ub_slice; | |
249 | ReadOptions ro_copy = readoptions; | |
250 | // Get the next prefix first and then see if we want to set upper bound. | |
251 | // We'll use the next prefix in an assertion later on | |
252 | if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) { | |
253 | ub_slice = Slice(upper_bound); | |
254 | ro_copy.iterate_upper_bound = &ub_slice; | |
255 | } | |
256 | auto cfh = | |
257 | column_families_[rand_column_families[thread->rand.Next() % | |
258 | rand_column_families.size()]]; | |
259 | Iterator* iter = db_->NewIterator(ro_copy, cfh); | |
260 | unsigned long count = 0; | |
261 | for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); | |
262 | iter->Next()) { | |
263 | ++count; | |
264 | } | |
265 | assert(prefix_to_use == 0 || | |
266 | count <= GetPrefixKeyCount(prefix.ToString(), upper_bound)); | |
267 | Status s = iter->status(); | |
268 | if (s.ok()) { | |
269 | thread->stats.AddPrefixes(1, count); | |
270 | } else { | |
271 | fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str()); | |
272 | thread->stats.AddErrors(1); | |
273 | } | |
274 | delete iter; | |
275 | return s; | |
276 | } | |
277 | ||
278 | ColumnFamilyHandle* GetControlCfh(ThreadState* thread, | |
279 | int /*column_family_id*/ | |
280 | ) override { | |
281 | // All column families should contain the same data. Randomly pick one. | |
282 | return column_families_[thread->rand.Next() % column_families_.size()]; | |
283 | } | |
284 | ||
285 | #ifdef ROCKSDB_LITE | |
286 | Status TestCheckpoint(ThreadState* /* thread */, | |
287 | const std::vector<int>& /* rand_column_families */, | |
288 | const std::vector<int64_t>& /* rand_keys */) override { | |
289 | assert(false); | |
290 | fprintf(stderr, | |
291 | "RocksDB lite does not support " | |
292 | "TestCheckpoint\n"); | |
293 | std::terminate(); | |
294 | } | |
295 | #else | |
296 | Status TestCheckpoint(ThreadState* thread, | |
297 | const std::vector<int>& /* rand_column_families */, | |
298 | const std::vector<int64_t>& /* rand_keys */) override { | |
299 | std::string checkpoint_dir = | |
300 | FLAGS_db + "/.checkpoint" + ToString(thread->tid); | |
301 | ||
302 | // We need to clear DB including manifest files, so make a copy | |
303 | Options opt_copy = options_; | |
304 | opt_copy.env = db_stress_env->target(); | |
305 | DestroyDB(checkpoint_dir, opt_copy); | |
306 | ||
307 | Checkpoint* checkpoint = nullptr; | |
308 | Status s = Checkpoint::Create(db_, &checkpoint); | |
309 | if (s.ok()) { | |
310 | s = checkpoint->CreateCheckpoint(checkpoint_dir); | |
311 | } | |
312 | std::vector<ColumnFamilyHandle*> cf_handles; | |
313 | DB* checkpoint_db = nullptr; | |
314 | if (s.ok()) { | |
315 | delete checkpoint; | |
316 | checkpoint = nullptr; | |
317 | Options options(options_); | |
318 | options.listeners.clear(); | |
319 | std::vector<ColumnFamilyDescriptor> cf_descs; | |
320 | // TODO(ajkr): `column_family_names_` is not safe to access here when | |
321 | // `clear_column_family_one_in != 0`. But we can't easily switch to | |
322 | // `ListColumnFamilies` to get names because it won't necessarily give | |
323 | // the same order as `column_family_names_`. | |
324 | if (FLAGS_clear_column_family_one_in == 0) { | |
325 | for (const auto& name : column_family_names_) { | |
326 | cf_descs.emplace_back(name, ColumnFamilyOptions(options)); | |
327 | } | |
328 | s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs, | |
329 | &cf_handles, &checkpoint_db); | |
330 | } | |
331 | } | |
332 | if (checkpoint_db != nullptr) { | |
333 | for (auto cfh : cf_handles) { | |
334 | delete cfh; | |
335 | } | |
336 | cf_handles.clear(); | |
337 | delete checkpoint_db; | |
338 | checkpoint_db = nullptr; | |
339 | } | |
340 | DestroyDB(checkpoint_dir, opt_copy); | |
341 | if (!s.ok()) { | |
342 | fprintf(stderr, "A checkpoint operation failed with: %s\n", | |
343 | s.ToString().c_str()); | |
344 | } | |
345 | return s; | |
346 | } | |
347 | #endif // !ROCKSDB_LITE | |
348 | ||
349 | void VerifyDb(ThreadState* thread) const override { | |
350 | ReadOptions options(FLAGS_verify_checksum, true); | |
351 | // We must set total_order_seek to true because we are doing a SeekToFirst | |
352 | // on a column family whose memtables may support (by default) prefix-based | |
353 | // iterator. In this case, NewIterator with options.total_order_seek being | |
354 | // false returns a prefix-based iterator. Calling SeekToFirst using this | |
355 | // iterator causes the iterator to become invalid. That means we cannot | |
356 | // iterate the memtable using this iterator any more, although the memtable | |
357 | // contains the most up-to-date key-values. | |
358 | options.total_order_seek = true; | |
359 | const auto ss_deleter = [this](const Snapshot* ss) { | |
360 | db_->ReleaseSnapshot(ss); | |
361 | }; | |
362 | std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard( | |
363 | db_->GetSnapshot(), ss_deleter); | |
364 | options.snapshot = snapshot_guard.get(); | |
365 | assert(thread != nullptr); | |
366 | auto shared = thread->shared; | |
367 | std::vector<std::unique_ptr<Iterator>> iters(column_families_.size()); | |
368 | for (size_t i = 0; i != column_families_.size(); ++i) { | |
369 | iters[i].reset(db_->NewIterator(options, column_families_[i])); | |
370 | } | |
371 | for (auto& iter : iters) { | |
372 | iter->SeekToFirst(); | |
373 | } | |
374 | size_t num = column_families_.size(); | |
375 | assert(num == iters.size()); | |
376 | std::vector<Status> statuses(num, Status::OK()); | |
377 | do { | |
378 | if (shared->HasVerificationFailedYet()) { | |
379 | break; | |
380 | } | |
381 | size_t valid_cnt = 0; | |
382 | size_t idx = 0; | |
383 | for (auto& iter : iters) { | |
384 | if (iter->Valid()) { | |
385 | ++valid_cnt; | |
386 | } else { | |
387 | statuses[idx] = iter->status(); | |
388 | } | |
389 | ++idx; | |
390 | } | |
391 | if (valid_cnt == 0) { | |
392 | Status status; | |
393 | for (size_t i = 0; i != num; ++i) { | |
394 | const auto& s = statuses[i]; | |
395 | if (!s.ok()) { | |
396 | status = s; | |
397 | fprintf(stderr, "Iterator on cf %s has error: %s\n", | |
398 | column_families_[i]->GetName().c_str(), | |
399 | s.ToString().c_str()); | |
400 | shared->SetVerificationFailure(); | |
401 | } | |
402 | } | |
403 | break; | |
404 | } else if (valid_cnt != iters.size()) { | |
405 | shared->SetVerificationFailure(); | |
406 | for (size_t i = 0; i != num; ++i) { | |
407 | if (!iters[i]->Valid()) { | |
408 | if (statuses[i].ok()) { | |
409 | fprintf(stderr, "Finished scanning cf %s\n", | |
410 | column_families_[i]->GetName().c_str()); | |
411 | } else { | |
412 | fprintf(stderr, "Iterator on cf %s has error: %s\n", | |
413 | column_families_[i]->GetName().c_str(), | |
414 | statuses[i].ToString().c_str()); | |
415 | } | |
416 | } else { | |
417 | fprintf(stderr, "cf %s has remaining data to scan\n", | |
418 | column_families_[i]->GetName().c_str()); | |
419 | } | |
420 | } | |
421 | break; | |
422 | } | |
423 | if (shared->HasVerificationFailedYet()) { | |
424 | break; | |
425 | } | |
426 | // If the program reaches here, then all column families' iterators are | |
427 | // still valid. | |
428 | if (shared->PrintingVerificationResults()) { | |
429 | continue; | |
430 | } | |
431 | Slice key; | |
432 | Slice value; | |
433 | int num_mismatched_cfs = 0; | |
434 | for (size_t i = 0; i != num; ++i) { | |
435 | if (i == 0) { | |
436 | key = iters[i]->key(); | |
437 | value = iters[i]->value(); | |
438 | } else { | |
439 | int cmp = key.compare(iters[i]->key()); | |
440 | if (cmp != 0) { | |
441 | ++num_mismatched_cfs; | |
442 | if (1 == num_mismatched_cfs) { | |
443 | fprintf(stderr, "Verification failed\n"); | |
444 | fprintf(stderr, "Latest Sequence Number: %" PRIu64 "\n", | |
445 | db_->GetLatestSequenceNumber()); | |
446 | fprintf(stderr, "[%s] %s => %s\n", | |
447 | column_families_[0]->GetName().c_str(), | |
448 | key.ToString(true /* hex */).c_str(), | |
449 | value.ToString(true /* hex */).c_str()); | |
450 | } | |
451 | fprintf(stderr, "[%s] %s => %s\n", | |
452 | column_families_[i]->GetName().c_str(), | |
453 | iters[i]->key().ToString(true /* hex */).c_str(), | |
454 | iters[i]->value().ToString(true /* hex */).c_str()); | |
455 | #ifndef ROCKSDB_LITE | |
456 | Slice begin_key; | |
457 | Slice end_key; | |
458 | if (cmp < 0) { | |
459 | begin_key = key; | |
460 | end_key = iters[i]->key(); | |
461 | } else { | |
462 | begin_key = iters[i]->key(); | |
463 | end_key = key; | |
464 | } | |
465 | std::vector<KeyVersion> versions; | |
466 | const size_t kMaxNumIKeys = 8; | |
467 | const auto print_key_versions = [&](ColumnFamilyHandle* cfh) { | |
468 | Status s = GetAllKeyVersions(db_, cfh, begin_key, end_key, | |
469 | kMaxNumIKeys, &versions); | |
470 | if (!s.ok()) { | |
471 | fprintf(stderr, "%s\n", s.ToString().c_str()); | |
472 | return; | |
473 | } | |
474 | assert(nullptr != cfh); | |
475 | fprintf(stderr, | |
476 | "Internal keys in CF '%s', [%s, %s] (max %" ROCKSDB_PRIszt | |
477 | ")\n", | |
478 | cfh->GetName().c_str(), | |
479 | begin_key.ToString(true /* hex */).c_str(), | |
480 | end_key.ToString(true /* hex */).c_str(), kMaxNumIKeys); | |
481 | for (const KeyVersion& kv : versions) { | |
482 | fprintf(stderr, " key %s seq %" PRIu64 " type %d\n", | |
483 | Slice(kv.user_key).ToString(true).c_str(), kv.sequence, | |
484 | kv.type); | |
485 | } | |
486 | }; | |
487 | if (1 == num_mismatched_cfs) { | |
488 | print_key_versions(column_families_[0]); | |
489 | } | |
490 | print_key_versions(column_families_[i]); | |
491 | #endif // ROCKSDB_LITE | |
492 | shared->SetVerificationFailure(); | |
493 | } | |
494 | } | |
495 | } | |
496 | shared->FinishPrintingVerificationResults(); | |
497 | for (auto& iter : iters) { | |
498 | iter->Next(); | |
499 | } | |
500 | } while (true); | |
501 | } | |
502 | ||
503 | #ifndef ROCKSDB_LITE | |
504 | void ContinuouslyVerifyDb(ThreadState* thread) const override { | |
505 | assert(thread); | |
506 | Status status; | |
507 | ||
508 | DB* db_ptr = cmp_db_ ? cmp_db_ : db_; | |
509 | const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_; | |
510 | const auto ss_deleter = [&](const Snapshot* ss) { | |
511 | db_ptr->ReleaseSnapshot(ss); | |
512 | }; | |
513 | std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard( | |
514 | db_ptr->GetSnapshot(), ss_deleter); | |
515 | if (cmp_db_) { | |
516 | status = cmp_db_->TryCatchUpWithPrimary(); | |
517 | } | |
518 | SharedState* shared = thread->shared; | |
519 | assert(shared); | |
520 | if (!status.ok()) { | |
521 | shared->SetShouldStopTest(); | |
522 | return; | |
523 | } | |
524 | assert(cmp_db_ || snapshot_guard.get()); | |
525 | const auto checksum_column_family = [](Iterator* iter, | |
526 | uint32_t* checksum) -> Status { | |
527 | assert(nullptr != checksum); | |
528 | uint32_t ret = 0; | |
529 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
530 | ret = crc32c::Extend(ret, iter->key().data(), iter->key().size()); | |
531 | ret = crc32c::Extend(ret, iter->value().data(), iter->value().size()); | |
532 | } | |
533 | *checksum = ret; | |
534 | return iter->status(); | |
535 | }; | |
536 | ReadOptions ropts; | |
537 | ropts.total_order_seek = true; | |
538 | ropts.snapshot = snapshot_guard.get(); | |
539 | uint32_t crc = 0; | |
540 | { | |
541 | // Compute crc for all key-values of default column family. | |
542 | std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts)); | |
543 | status = checksum_column_family(it.get(), &crc); | |
544 | } | |
545 | uint32_t tmp_crc = 0; | |
546 | if (status.ok()) { | |
547 | for (ColumnFamilyHandle* cfh : cfhs) { | |
548 | if (cfh == db_ptr->DefaultColumnFamily()) { | |
549 | continue; | |
550 | } | |
551 | std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts, cfh)); | |
552 | status = checksum_column_family(it.get(), &tmp_crc); | |
553 | if (!status.ok() || tmp_crc != crc) { | |
554 | break; | |
555 | } | |
556 | } | |
557 | } | |
558 | if (!status.ok() || tmp_crc != crc) { | |
559 | shared->SetShouldStopTest(); | |
560 | } | |
561 | } | |
562 | #endif // !ROCKSDB_LITE | |
563 | ||
564 | std::vector<int> GenerateColumnFamilies( | |
565 | const int /* num_column_families */, | |
566 | int /* rand_column_family */) const override { | |
567 | std::vector<int> ret; | |
568 | int num = static_cast<int>(column_families_.size()); | |
569 | int k = 0; | |
570 | std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; }); | |
571 | return ret; | |
572 | } | |
573 | ||
574 | private: | |
575 | std::atomic<int64_t> batch_id_; | |
576 | }; | |
577 | ||
578 | StressTest* CreateCfConsistencyStressTest() { | |
579 | return new CfConsistencyStressTest(); | |
580 | } | |
581 | ||
582 | } // namespace ROCKSDB_NAMESPACE | |
583 | #endif // GFLAGS |