]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db_stress_tool/cf_consistency_stress.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db_stress_tool / cf_consistency_stress.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #ifdef GFLAGS
11 #include "db_stress_tool/db_stress_common.h"
12 #include "file/file_util.h"
13
14 namespace ROCKSDB_NAMESPACE {
15 class CfConsistencyStressTest : public StressTest {
16 public:
17 CfConsistencyStressTest() : batch_id_(0) {}
18
19 ~CfConsistencyStressTest() override {}
20
21 Status TestPut(ThreadState* thread, WriteOptions& write_opts,
22 const ReadOptions& /* read_opts */,
23 const std::vector<int>& rand_column_families,
24 const std::vector<int64_t>& rand_keys, char (&value)[100],
25 std::unique_ptr<MutexLock>& /* lock */) override {
26 std::string key_str = Key(rand_keys[0]);
27 Slice key = key_str;
28 uint64_t value_base = batch_id_.fetch_add(1);
29 size_t sz =
30 GenerateValue(static_cast<uint32_t>(value_base), value, sizeof(value));
31 Slice v(value, sz);
32 WriteBatch batch;
33 for (auto cf : rand_column_families) {
34 ColumnFamilyHandle* cfh = column_families_[cf];
35 if (FLAGS_use_merge) {
36 batch.Merge(cfh, key, v);
37 } else { /* !FLAGS_use_merge */
38 batch.Put(cfh, key, v);
39 }
40 }
41 Status s = db_->Write(write_opts, &batch);
42 if (!s.ok()) {
43 fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str());
44 thread->stats.AddErrors(1);
45 } else {
46 auto num = static_cast<long>(rand_column_families.size());
47 thread->stats.AddBytesForWrites(num, (sz + 1) * num);
48 }
49
50 return s;
51 }
52
53 Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
54 const std::vector<int>& rand_column_families,
55 const std::vector<int64_t>& rand_keys,
56 std::unique_ptr<MutexLock>& /* lock */) override {
57 std::string key_str = Key(rand_keys[0]);
58 Slice key = key_str;
59 WriteBatch batch;
60 for (auto cf : rand_column_families) {
61 ColumnFamilyHandle* cfh = column_families_[cf];
62 batch.Delete(cfh, key);
63 }
64 Status s = db_->Write(write_opts, &batch);
65 if (!s.ok()) {
66 fprintf(stderr, "multidel error: %s\n", s.ToString().c_str());
67 thread->stats.AddErrors(1);
68 } else {
69 thread->stats.AddDeletes(static_cast<long>(rand_column_families.size()));
70 }
71 return s;
72 }
73
74 Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
75 const std::vector<int>& rand_column_families,
76 const std::vector<int64_t>& rand_keys,
77 std::unique_ptr<MutexLock>& /* lock */) override {
78 int64_t rand_key = rand_keys[0];
79 auto shared = thread->shared;
80 int64_t max_key = shared->GetMaxKey();
81 if (rand_key > max_key - FLAGS_range_deletion_width) {
82 rand_key =
83 thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
84 }
85 std::string key_str = Key(rand_key);
86 Slice key = key_str;
87 std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width);
88 Slice end_key = end_key_str;
89 WriteBatch batch;
90 for (auto cf : rand_column_families) {
91 ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]];
92 batch.DeleteRange(cfh, key, end_key);
93 }
94 Status s = db_->Write(write_opts, &batch);
95 if (!s.ok()) {
96 fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
97 thread->stats.AddErrors(1);
98 } else {
99 thread->stats.AddRangeDeletions(
100 static_cast<long>(rand_column_families.size()));
101 }
102 return s;
103 }
104
105 void TestIngestExternalFile(
106 ThreadState* /* thread */,
107 const std::vector<int>& /* rand_column_families */,
108 const std::vector<int64_t>& /* rand_keys */,
109 std::unique_ptr<MutexLock>& /* lock */) override {
110 assert(false);
111 fprintf(stderr,
112 "CfConsistencyStressTest does not support TestIngestExternalFile "
113 "because it's not possible to verify the result\n");
114 std::terminate();
115 }
116
117 Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
118 const std::vector<int>& rand_column_families,
119 const std::vector<int64_t>& rand_keys) override {
120 std::string key_str = Key(rand_keys[0]);
121 Slice key = key_str;
122 Status s;
123 bool is_consistent = true;
124
125 if (thread->rand.OneIn(2)) {
126 // 1/2 chance, does a random read from random CF
127 auto cfh =
128 column_families_[rand_column_families[thread->rand.Next() %
129 rand_column_families.size()]];
130 std::string from_db;
131 s = db_->Get(readoptions, cfh, key, &from_db);
132 } else {
133 // 1/2 chance, comparing one key is the same across all CFs
134 const Snapshot* snapshot = db_->GetSnapshot();
135 ReadOptions readoptionscopy = readoptions;
136 readoptionscopy.snapshot = snapshot;
137
138 std::string value0;
139 s = db_->Get(readoptionscopy, column_families_[rand_column_families[0]],
140 key, &value0);
141 if (s.ok() || s.IsNotFound()) {
142 bool found = s.ok();
143 for (size_t i = 1; i < rand_column_families.size(); i++) {
144 std::string value1;
145 s = db_->Get(readoptionscopy,
146 column_families_[rand_column_families[i]], key, &value1);
147 if (!s.ok() && !s.IsNotFound()) {
148 break;
149 }
150 if (!found && s.ok()) {
151 fprintf(stderr, "Get() return different results with key %s\n",
152 Slice(key_str).ToString(true).c_str());
153 fprintf(stderr, "CF %s is not found\n",
154 column_family_names_[0].c_str());
155 fprintf(stderr, "CF %s returns value %s\n",
156 column_family_names_[i].c_str(),
157 Slice(value1).ToString(true).c_str());
158 is_consistent = false;
159 } else if (found && s.IsNotFound()) {
160 fprintf(stderr, "Get() return different results with key %s\n",
161 Slice(key_str).ToString(true).c_str());
162 fprintf(stderr, "CF %s returns value %s\n",
163 column_family_names_[0].c_str(),
164 Slice(value0).ToString(true).c_str());
165 fprintf(stderr, "CF %s is not found\n",
166 column_family_names_[i].c_str());
167 is_consistent = false;
168 } else if (s.ok() && value0 != value1) {
169 fprintf(stderr, "Get() return different results with key %s\n",
170 Slice(key_str).ToString(true).c_str());
171 fprintf(stderr, "CF %s returns value %s\n",
172 column_family_names_[0].c_str(),
173 Slice(value0).ToString(true).c_str());
174 fprintf(stderr, "CF %s returns value %s\n",
175 column_family_names_[i].c_str(),
176 Slice(value1).ToString(true).c_str());
177 is_consistent = false;
178 }
179 if (!is_consistent) {
180 break;
181 }
182 }
183 }
184
185 db_->ReleaseSnapshot(snapshot);
186 }
187 if (!is_consistent) {
188 fprintf(stderr, "TestGet error: is_consistent is false\n");
189 thread->stats.AddErrors(1);
190 // Fail fast to preserve the DB state.
191 thread->shared->SetVerificationFailure();
192 } else if (s.ok()) {
193 thread->stats.AddGets(1, 1);
194 } else if (s.IsNotFound()) {
195 thread->stats.AddGets(1, 0);
196 } else {
197 fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str());
198 thread->stats.AddErrors(1);
199 }
200 return s;
201 }
202
203 std::vector<Status> TestMultiGet(
204 ThreadState* thread, const ReadOptions& read_opts,
205 const std::vector<int>& rand_column_families,
206 const std::vector<int64_t>& rand_keys) override {
207 size_t num_keys = rand_keys.size();
208 std::vector<std::string> key_str;
209 std::vector<Slice> keys;
210 keys.reserve(num_keys);
211 key_str.reserve(num_keys);
212 std::vector<PinnableSlice> values(num_keys);
213 std::vector<Status> statuses(num_keys);
214 ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
215
216 for (size_t i = 0; i < num_keys; ++i) {
217 key_str.emplace_back(Key(rand_keys[i]));
218 keys.emplace_back(key_str.back());
219 }
220 db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
221 statuses.data());
222 for (auto s : statuses) {
223 if (s.ok()) {
224 // found case
225 thread->stats.AddGets(1, 1);
226 } else if (s.IsNotFound()) {
227 // not found case
228 thread->stats.AddGets(1, 0);
229 } else {
230 // errors case
231 fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
232 thread->stats.AddErrors(1);
233 }
234 }
235 return statuses;
236 }
237
238 Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
239 const std::vector<int>& rand_column_families,
240 const std::vector<int64_t>& rand_keys) override {
241 size_t prefix_to_use =
242 (FLAGS_prefix_size < 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size);
243
244 std::string key_str = Key(rand_keys[0]);
245 Slice key = key_str;
246 Slice prefix = Slice(key.data(), prefix_to_use);
247
248 std::string upper_bound;
249 Slice ub_slice;
250 ReadOptions ro_copy = readoptions;
251 // Get the next prefix first and then see if we want to set upper bound.
252 // We'll use the next prefix in an assertion later on
253 if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
254 ub_slice = Slice(upper_bound);
255 ro_copy.iterate_upper_bound = &ub_slice;
256 }
257 auto cfh =
258 column_families_[rand_column_families[thread->rand.Next() %
259 rand_column_families.size()]];
260 Iterator* iter = db_->NewIterator(ro_copy, cfh);
261 unsigned long count = 0;
262 for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
263 iter->Next()) {
264 ++count;
265 }
266 assert(prefix_to_use == 0 ||
267 count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
268 Status s = iter->status();
269 if (s.ok()) {
270 thread->stats.AddPrefixes(1, count);
271 } else {
272 fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
273 thread->stats.AddErrors(1);
274 }
275 delete iter;
276 return s;
277 }
278
279 ColumnFamilyHandle* GetControlCfh(ThreadState* thread,
280 int /*column_family_id*/
281 ) override {
282 // All column families should contain the same data. Randomly pick one.
283 return column_families_[thread->rand.Next() % column_families_.size()];
284 }
285
286 void VerifyDb(ThreadState* thread) const override {
287 ReadOptions options(FLAGS_verify_checksum, true);
288 // We must set total_order_seek to true because we are doing a SeekToFirst
289 // on a column family whose memtables may support (by default) prefix-based
290 // iterator. In this case, NewIterator with options.total_order_seek being
291 // false returns a prefix-based iterator. Calling SeekToFirst using this
292 // iterator causes the iterator to become invalid. That means we cannot
293 // iterate the memtable using this iterator any more, although the memtable
294 // contains the most up-to-date key-values.
295 options.total_order_seek = true;
296 const auto ss_deleter = [this](const Snapshot* ss) {
297 db_->ReleaseSnapshot(ss);
298 };
299 std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard(
300 db_->GetSnapshot(), ss_deleter);
301 options.snapshot = snapshot_guard.get();
302 assert(thread != nullptr);
303 auto shared = thread->shared;
304 std::vector<std::unique_ptr<Iterator>> iters(column_families_.size());
305 for (size_t i = 0; i != column_families_.size(); ++i) {
306 iters[i].reset(db_->NewIterator(options, column_families_[i]));
307 }
308 for (auto& iter : iters) {
309 iter->SeekToFirst();
310 }
311 size_t num = column_families_.size();
312 assert(num == iters.size());
313 std::vector<Status> statuses(num, Status::OK());
314 do {
315 if (shared->HasVerificationFailedYet()) {
316 break;
317 }
318 size_t valid_cnt = 0;
319 size_t idx = 0;
320 for (auto& iter : iters) {
321 if (iter->Valid()) {
322 ++valid_cnt;
323 } else {
324 statuses[idx] = iter->status();
325 }
326 ++idx;
327 }
328 if (valid_cnt == 0) {
329 Status status;
330 for (size_t i = 0; i != num; ++i) {
331 const auto& s = statuses[i];
332 if (!s.ok()) {
333 status = s;
334 fprintf(stderr, "Iterator on cf %s has error: %s\n",
335 column_families_[i]->GetName().c_str(),
336 s.ToString().c_str());
337 shared->SetVerificationFailure();
338 }
339 }
340 break;
341 } else if (valid_cnt != iters.size()) {
342 shared->SetVerificationFailure();
343 for (size_t i = 0; i != num; ++i) {
344 if (!iters[i]->Valid()) {
345 if (statuses[i].ok()) {
346 fprintf(stderr, "Finished scanning cf %s\n",
347 column_families_[i]->GetName().c_str());
348 } else {
349 fprintf(stderr, "Iterator on cf %s has error: %s\n",
350 column_families_[i]->GetName().c_str(),
351 statuses[i].ToString().c_str());
352 }
353 } else {
354 fprintf(stderr, "cf %s has remaining data to scan\n",
355 column_families_[i]->GetName().c_str());
356 }
357 }
358 break;
359 }
360 if (shared->HasVerificationFailedYet()) {
361 break;
362 }
363 // If the program reaches here, then all column families' iterators are
364 // still valid.
365 if (shared->PrintingVerificationResults()) {
366 continue;
367 }
368 Slice key;
369 Slice value;
370 int num_mismatched_cfs = 0;
371 for (size_t i = 0; i != num; ++i) {
372 if (i == 0) {
373 key = iters[i]->key();
374 value = iters[i]->value();
375 } else {
376 int cmp = key.compare(iters[i]->key());
377 if (cmp != 0) {
378 ++num_mismatched_cfs;
379 if (1 == num_mismatched_cfs) {
380 fprintf(stderr, "Verification failed\n");
381 fprintf(stderr, "Latest Sequence Number: %" PRIu64 "\n",
382 db_->GetLatestSequenceNumber());
383 fprintf(stderr, "[%s] %s => %s\n",
384 column_families_[0]->GetName().c_str(),
385 key.ToString(true /* hex */).c_str(),
386 value.ToString(true /* hex */).c_str());
387 }
388 fprintf(stderr, "[%s] %s => %s\n",
389 column_families_[i]->GetName().c_str(),
390 iters[i]->key().ToString(true /* hex */).c_str(),
391 iters[i]->value().ToString(true /* hex */).c_str());
392 #ifndef ROCKSDB_LITE
393 Slice begin_key;
394 Slice end_key;
395 if (cmp < 0) {
396 begin_key = key;
397 end_key = iters[i]->key();
398 } else {
399 begin_key = iters[i]->key();
400 end_key = key;
401 }
402 std::vector<KeyVersion> versions;
403 const size_t kMaxNumIKeys = 8;
404 const auto print_key_versions = [&](ColumnFamilyHandle* cfh) {
405 Status s = GetAllKeyVersions(db_, cfh, begin_key, end_key,
406 kMaxNumIKeys, &versions);
407 if (!s.ok()) {
408 fprintf(stderr, "%s\n", s.ToString().c_str());
409 return;
410 }
411 assert(nullptr != cfh);
412 fprintf(stderr,
413 "Internal keys in CF '%s', [%s, %s] (max %" ROCKSDB_PRIszt
414 ")\n",
415 cfh->GetName().c_str(),
416 begin_key.ToString(true /* hex */).c_str(),
417 end_key.ToString(true /* hex */).c_str(), kMaxNumIKeys);
418 for (const KeyVersion& kv : versions) {
419 fprintf(stderr, " key %s seq %" PRIu64 " type %d\n",
420 Slice(kv.user_key).ToString(true).c_str(), kv.sequence,
421 kv.type);
422 }
423 };
424 if (1 == num_mismatched_cfs) {
425 print_key_versions(column_families_[0]);
426 }
427 print_key_versions(column_families_[i]);
428 #endif // ROCKSDB_LITE
429 shared->SetVerificationFailure();
430 }
431 }
432 }
433 shared->FinishPrintingVerificationResults();
434 for (auto& iter : iters) {
435 iter->Next();
436 }
437 } while (true);
438 }
439
440 #ifndef ROCKSDB_LITE
441 void ContinuouslyVerifyDb(ThreadState* thread) const override {
442 assert(thread);
443 Status status;
444
445 DB* db_ptr = cmp_db_ ? cmp_db_ : db_;
446 const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_;
447 const auto ss_deleter = [&](const Snapshot* ss) {
448 db_ptr->ReleaseSnapshot(ss);
449 };
450 std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard(
451 db_ptr->GetSnapshot(), ss_deleter);
452 if (cmp_db_) {
453 status = cmp_db_->TryCatchUpWithPrimary();
454 }
455 SharedState* shared = thread->shared;
456 assert(shared);
457 if (!status.ok()) {
458 shared->SetShouldStopTest();
459 return;
460 }
461 assert(cmp_db_ || snapshot_guard.get());
462 const auto checksum_column_family = [](Iterator* iter,
463 uint32_t* checksum) -> Status {
464 assert(nullptr != checksum);
465 uint32_t ret = 0;
466 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
467 ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
468 ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
469 }
470 *checksum = ret;
471 return iter->status();
472 };
473 ReadOptions ropts;
474 ropts.total_order_seek = true;
475 ropts.snapshot = snapshot_guard.get();
476 uint32_t crc = 0;
477 {
478 // Compute crc for all key-values of default column family.
479 std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts));
480 status = checksum_column_family(it.get(), &crc);
481 }
482 uint32_t tmp_crc = 0;
483 if (status.ok()) {
484 for (ColumnFamilyHandle* cfh : cfhs) {
485 if (cfh == db_ptr->DefaultColumnFamily()) {
486 continue;
487 }
488 std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts, cfh));
489 status = checksum_column_family(it.get(), &tmp_crc);
490 if (!status.ok() || tmp_crc != crc) {
491 break;
492 }
493 }
494 }
495 if (!status.ok() || tmp_crc != crc) {
496 shared->SetShouldStopTest();
497 }
498 }
499 #endif // !ROCKSDB_LITE
500
501 std::vector<int> GenerateColumnFamilies(
502 const int /* num_column_families */,
503 int /* rand_column_family */) const override {
504 std::vector<int> ret;
505 int num = static_cast<int>(column_families_.size());
506 int k = 0;
507 std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; });
508 return ret;
509 }
510
511 private:
512 std::atomic<int64_t> batch_id_;
513 };
514
515 StressTest* CreateCfConsistencyStressTest() {
516 return new CfConsistencyStressTest();
517 }
518
519 } // namespace ROCKSDB_NAMESPACE
520 #endif // GFLAGS