]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db_stress_tool/cf_consistency_stress.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db_stress_tool / cf_consistency_stress.cc
CommitLineData
f67539c2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#ifdef GFLAGS
11#include "db_stress_tool/db_stress_common.h"
12
13namespace ROCKSDB_NAMESPACE {
14class CfConsistencyStressTest : public StressTest {
15 public:
16 CfConsistencyStressTest() : batch_id_(0) {}
17
18 ~CfConsistencyStressTest() override {}
19
20 Status TestPut(ThreadState* thread, WriteOptions& write_opts,
21 const ReadOptions& /* read_opts */,
22 const std::vector<int>& rand_column_families,
23 const std::vector<int64_t>& rand_keys, char (&value)[100],
24 std::unique_ptr<MutexLock>& /* lock */) override {
25 std::string key_str = Key(rand_keys[0]);
26 Slice key = key_str;
27 uint64_t value_base = batch_id_.fetch_add(1);
28 size_t sz =
29 GenerateValue(static_cast<uint32_t>(value_base), value, sizeof(value));
30 Slice v(value, sz);
31 WriteBatch batch;
32 for (auto cf : rand_column_families) {
33 ColumnFamilyHandle* cfh = column_families_[cf];
34 if (FLAGS_use_merge) {
35 batch.Merge(cfh, key, v);
36 } else { /* !FLAGS_use_merge */
37 batch.Put(cfh, key, v);
38 }
39 }
40 Status s = db_->Write(write_opts, &batch);
41 if (!s.ok()) {
42 fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str());
43 thread->stats.AddErrors(1);
44 } else {
45 auto num = static_cast<long>(rand_column_families.size());
46 thread->stats.AddBytesForWrites(num, (sz + 1) * num);
47 }
48
49 return s;
50 }
51
52 Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
53 const std::vector<int>& rand_column_families,
54 const std::vector<int64_t>& rand_keys,
55 std::unique_ptr<MutexLock>& /* lock */) override {
56 std::string key_str = Key(rand_keys[0]);
57 Slice key = key_str;
58 WriteBatch batch;
59 for (auto cf : rand_column_families) {
60 ColumnFamilyHandle* cfh = column_families_[cf];
61 batch.Delete(cfh, key);
62 }
63 Status s = db_->Write(write_opts, &batch);
64 if (!s.ok()) {
65 fprintf(stderr, "multidel error: %s\n", s.ToString().c_str());
66 thread->stats.AddErrors(1);
67 } else {
68 thread->stats.AddDeletes(static_cast<long>(rand_column_families.size()));
69 }
70 return s;
71 }
72
73 Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
74 const std::vector<int>& rand_column_families,
75 const std::vector<int64_t>& rand_keys,
76 std::unique_ptr<MutexLock>& /* lock */) override {
77 int64_t rand_key = rand_keys[0];
78 auto shared = thread->shared;
79 int64_t max_key = shared->GetMaxKey();
80 if (rand_key > max_key - FLAGS_range_deletion_width) {
81 rand_key =
82 thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
83 }
84 std::string key_str = Key(rand_key);
85 Slice key = key_str;
86 std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width);
87 Slice end_key = end_key_str;
88 WriteBatch batch;
89 for (auto cf : rand_column_families) {
90 ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]];
91 batch.DeleteRange(cfh, key, end_key);
92 }
93 Status s = db_->Write(write_opts, &batch);
94 if (!s.ok()) {
95 fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
96 thread->stats.AddErrors(1);
97 } else {
98 thread->stats.AddRangeDeletions(
99 static_cast<long>(rand_column_families.size()));
100 }
101 return s;
102 }
103
104 void TestIngestExternalFile(
105 ThreadState* /* thread */,
106 const std::vector<int>& /* rand_column_families */,
107 const std::vector<int64_t>& /* rand_keys */,
108 std::unique_ptr<MutexLock>& /* lock */) override {
109 assert(false);
110 fprintf(stderr,
111 "CfConsistencyStressTest does not support TestIngestExternalFile "
112 "because it's not possible to verify the result\n");
113 std::terminate();
114 }
115
116 Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
117 const std::vector<int>& rand_column_families,
118 const std::vector<int64_t>& rand_keys) override {
119 std::string key_str = Key(rand_keys[0]);
120 Slice key = key_str;
121 Status s;
122 bool is_consistent = true;
123
124 if (thread->rand.OneIn(2)) {
125 // 1/2 chance, does a random read from random CF
126 auto cfh =
127 column_families_[rand_column_families[thread->rand.Next() %
128 rand_column_families.size()]];
129 std::string from_db;
130 s = db_->Get(readoptions, cfh, key, &from_db);
131 } else {
132 // 1/2 chance, comparing one key is the same across all CFs
133 const Snapshot* snapshot = db_->GetSnapshot();
134 ReadOptions readoptionscopy = readoptions;
135 readoptionscopy.snapshot = snapshot;
136
137 std::string value0;
138 s = db_->Get(readoptionscopy, column_families_[rand_column_families[0]],
139 key, &value0);
140 if (s.ok() || s.IsNotFound()) {
141 bool found = s.ok();
142 for (size_t i = 1; i < rand_column_families.size(); i++) {
143 std::string value1;
144 s = db_->Get(readoptionscopy,
145 column_families_[rand_column_families[i]], key, &value1);
146 if (!s.ok() && !s.IsNotFound()) {
147 break;
148 }
149 if (!found && s.ok()) {
150 fprintf(stderr, "Get() return different results with key %s\n",
151 Slice(key_str).ToString(true).c_str());
152 fprintf(stderr, "CF %s is not found\n",
153 column_family_names_[0].c_str());
154 fprintf(stderr, "CF %s returns value %s\n",
155 column_family_names_[i].c_str(),
156 Slice(value1).ToString(true).c_str());
157 is_consistent = false;
158 } else if (found && s.IsNotFound()) {
159 fprintf(stderr, "Get() return different results with key %s\n",
160 Slice(key_str).ToString(true).c_str());
161 fprintf(stderr, "CF %s returns value %s\n",
162 column_family_names_[0].c_str(),
163 Slice(value0).ToString(true).c_str());
164 fprintf(stderr, "CF %s is not found\n",
165 column_family_names_[i].c_str());
166 is_consistent = false;
167 } else if (s.ok() && value0 != value1) {
168 fprintf(stderr, "Get() return different results with key %s\n",
169 Slice(key_str).ToString(true).c_str());
170 fprintf(stderr, "CF %s returns value %s\n",
171 column_family_names_[0].c_str(),
172 Slice(value0).ToString(true).c_str());
173 fprintf(stderr, "CF %s returns value %s\n",
174 column_family_names_[i].c_str(),
175 Slice(value1).ToString(true).c_str());
176 is_consistent = false;
177 }
178 if (!is_consistent) {
179 break;
180 }
181 }
182 }
183
184 db_->ReleaseSnapshot(snapshot);
185 }
186 if (!is_consistent) {
187 fprintf(stderr, "TestGet error: is_consistent is false\n");
188 thread->stats.AddErrors(1);
189 // Fail fast to preserve the DB state.
190 thread->shared->SetVerificationFailure();
191 } else if (s.ok()) {
192 thread->stats.AddGets(1, 1);
193 } else if (s.IsNotFound()) {
194 thread->stats.AddGets(1, 0);
195 } else {
196 fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str());
197 thread->stats.AddErrors(1);
198 }
199 return s;
200 }
201
202 std::vector<Status> TestMultiGet(
203 ThreadState* thread, const ReadOptions& read_opts,
204 const std::vector<int>& rand_column_families,
205 const std::vector<int64_t>& rand_keys) override {
206 size_t num_keys = rand_keys.size();
207 std::vector<std::string> key_str;
208 std::vector<Slice> keys;
209 keys.reserve(num_keys);
210 key_str.reserve(num_keys);
211 std::vector<PinnableSlice> values(num_keys);
212 std::vector<Status> statuses(num_keys);
213 ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
214
215 for (size_t i = 0; i < num_keys; ++i) {
216 key_str.emplace_back(Key(rand_keys[i]));
217 keys.emplace_back(key_str.back());
218 }
219 db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
220 statuses.data());
221 for (auto s : statuses) {
222 if (s.ok()) {
223 // found case
224 thread->stats.AddGets(1, 1);
225 } else if (s.IsNotFound()) {
226 // not found case
227 thread->stats.AddGets(1, 0);
228 } else {
229 // errors case
230 fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
231 thread->stats.AddErrors(1);
232 }
233 }
234 return statuses;
235 }
236
237 Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
238 const std::vector<int>& rand_column_families,
239 const std::vector<int64_t>& rand_keys) override {
240 size_t prefix_to_use =
241 (FLAGS_prefix_size < 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size);
242
243 std::string key_str = Key(rand_keys[0]);
244 Slice key = key_str;
245 Slice prefix = Slice(key.data(), prefix_to_use);
246
247 std::string upper_bound;
248 Slice ub_slice;
249 ReadOptions ro_copy = readoptions;
250 // Get the next prefix first and then see if we want to set upper bound.
251 // We'll use the next prefix in an assertion later on
252 if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
253 ub_slice = Slice(upper_bound);
254 ro_copy.iterate_upper_bound = &ub_slice;
255 }
256 auto cfh =
257 column_families_[rand_column_families[thread->rand.Next() %
258 rand_column_families.size()]];
259 Iterator* iter = db_->NewIterator(ro_copy, cfh);
260 unsigned long count = 0;
261 for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
262 iter->Next()) {
263 ++count;
264 }
265 assert(prefix_to_use == 0 ||
266 count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
267 Status s = iter->status();
268 if (s.ok()) {
269 thread->stats.AddPrefixes(1, count);
270 } else {
271 fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
272 thread->stats.AddErrors(1);
273 }
274 delete iter;
275 return s;
276 }
277
278 ColumnFamilyHandle* GetControlCfh(ThreadState* thread,
279 int /*column_family_id*/
280 ) override {
281 // All column families should contain the same data. Randomly pick one.
282 return column_families_[thread->rand.Next() % column_families_.size()];
283 }
284
285#ifdef ROCKSDB_LITE
286 Status TestCheckpoint(ThreadState* /* thread */,
287 const std::vector<int>& /* rand_column_families */,
288 const std::vector<int64_t>& /* rand_keys */) override {
289 assert(false);
290 fprintf(stderr,
291 "RocksDB lite does not support "
292 "TestCheckpoint\n");
293 std::terminate();
294 }
295#else
296 Status TestCheckpoint(ThreadState* thread,
297 const std::vector<int>& /* rand_column_families */,
298 const std::vector<int64_t>& /* rand_keys */) override {
299 std::string checkpoint_dir =
300 FLAGS_db + "/.checkpoint" + ToString(thread->tid);
301
302 // We need to clear DB including manifest files, so make a copy
303 Options opt_copy = options_;
304 opt_copy.env = db_stress_env->target();
305 DestroyDB(checkpoint_dir, opt_copy);
306
307 Checkpoint* checkpoint = nullptr;
308 Status s = Checkpoint::Create(db_, &checkpoint);
309 if (s.ok()) {
310 s = checkpoint->CreateCheckpoint(checkpoint_dir);
311 }
312 std::vector<ColumnFamilyHandle*> cf_handles;
313 DB* checkpoint_db = nullptr;
314 if (s.ok()) {
315 delete checkpoint;
316 checkpoint = nullptr;
317 Options options(options_);
318 options.listeners.clear();
319 std::vector<ColumnFamilyDescriptor> cf_descs;
320 // TODO(ajkr): `column_family_names_` is not safe to access here when
321 // `clear_column_family_one_in != 0`. But we can't easily switch to
322 // `ListColumnFamilies` to get names because it won't necessarily give
323 // the same order as `column_family_names_`.
324 if (FLAGS_clear_column_family_one_in == 0) {
325 for (const auto& name : column_family_names_) {
326 cf_descs.emplace_back(name, ColumnFamilyOptions(options));
327 }
328 s = DB::OpenForReadOnly(DBOptions(options), checkpoint_dir, cf_descs,
329 &cf_handles, &checkpoint_db);
330 }
331 }
332 if (checkpoint_db != nullptr) {
333 for (auto cfh : cf_handles) {
334 delete cfh;
335 }
336 cf_handles.clear();
337 delete checkpoint_db;
338 checkpoint_db = nullptr;
339 }
340 DestroyDB(checkpoint_dir, opt_copy);
341 if (!s.ok()) {
342 fprintf(stderr, "A checkpoint operation failed with: %s\n",
343 s.ToString().c_str());
344 }
345 return s;
346 }
347#endif // !ROCKSDB_LITE
348
349 void VerifyDb(ThreadState* thread) const override {
350 ReadOptions options(FLAGS_verify_checksum, true);
351 // We must set total_order_seek to true because we are doing a SeekToFirst
352 // on a column family whose memtables may support (by default) prefix-based
353 // iterator. In this case, NewIterator with options.total_order_seek being
354 // false returns a prefix-based iterator. Calling SeekToFirst using this
355 // iterator causes the iterator to become invalid. That means we cannot
356 // iterate the memtable using this iterator any more, although the memtable
357 // contains the most up-to-date key-values.
358 options.total_order_seek = true;
359 const auto ss_deleter = [this](const Snapshot* ss) {
360 db_->ReleaseSnapshot(ss);
361 };
362 std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard(
363 db_->GetSnapshot(), ss_deleter);
364 options.snapshot = snapshot_guard.get();
365 assert(thread != nullptr);
366 auto shared = thread->shared;
367 std::vector<std::unique_ptr<Iterator>> iters(column_families_.size());
368 for (size_t i = 0; i != column_families_.size(); ++i) {
369 iters[i].reset(db_->NewIterator(options, column_families_[i]));
370 }
371 for (auto& iter : iters) {
372 iter->SeekToFirst();
373 }
374 size_t num = column_families_.size();
375 assert(num == iters.size());
376 std::vector<Status> statuses(num, Status::OK());
377 do {
378 if (shared->HasVerificationFailedYet()) {
379 break;
380 }
381 size_t valid_cnt = 0;
382 size_t idx = 0;
383 for (auto& iter : iters) {
384 if (iter->Valid()) {
385 ++valid_cnt;
386 } else {
387 statuses[idx] = iter->status();
388 }
389 ++idx;
390 }
391 if (valid_cnt == 0) {
392 Status status;
393 for (size_t i = 0; i != num; ++i) {
394 const auto& s = statuses[i];
395 if (!s.ok()) {
396 status = s;
397 fprintf(stderr, "Iterator on cf %s has error: %s\n",
398 column_families_[i]->GetName().c_str(),
399 s.ToString().c_str());
400 shared->SetVerificationFailure();
401 }
402 }
403 break;
404 } else if (valid_cnt != iters.size()) {
405 shared->SetVerificationFailure();
406 for (size_t i = 0; i != num; ++i) {
407 if (!iters[i]->Valid()) {
408 if (statuses[i].ok()) {
409 fprintf(stderr, "Finished scanning cf %s\n",
410 column_families_[i]->GetName().c_str());
411 } else {
412 fprintf(stderr, "Iterator on cf %s has error: %s\n",
413 column_families_[i]->GetName().c_str(),
414 statuses[i].ToString().c_str());
415 }
416 } else {
417 fprintf(stderr, "cf %s has remaining data to scan\n",
418 column_families_[i]->GetName().c_str());
419 }
420 }
421 break;
422 }
423 if (shared->HasVerificationFailedYet()) {
424 break;
425 }
426 // If the program reaches here, then all column families' iterators are
427 // still valid.
428 if (shared->PrintingVerificationResults()) {
429 continue;
430 }
431 Slice key;
432 Slice value;
433 int num_mismatched_cfs = 0;
434 for (size_t i = 0; i != num; ++i) {
435 if (i == 0) {
436 key = iters[i]->key();
437 value = iters[i]->value();
438 } else {
439 int cmp = key.compare(iters[i]->key());
440 if (cmp != 0) {
441 ++num_mismatched_cfs;
442 if (1 == num_mismatched_cfs) {
443 fprintf(stderr, "Verification failed\n");
444 fprintf(stderr, "Latest Sequence Number: %" PRIu64 "\n",
445 db_->GetLatestSequenceNumber());
446 fprintf(stderr, "[%s] %s => %s\n",
447 column_families_[0]->GetName().c_str(),
448 key.ToString(true /* hex */).c_str(),
449 value.ToString(true /* hex */).c_str());
450 }
451 fprintf(stderr, "[%s] %s => %s\n",
452 column_families_[i]->GetName().c_str(),
453 iters[i]->key().ToString(true /* hex */).c_str(),
454 iters[i]->value().ToString(true /* hex */).c_str());
455#ifndef ROCKSDB_LITE
456 Slice begin_key;
457 Slice end_key;
458 if (cmp < 0) {
459 begin_key = key;
460 end_key = iters[i]->key();
461 } else {
462 begin_key = iters[i]->key();
463 end_key = key;
464 }
465 std::vector<KeyVersion> versions;
466 const size_t kMaxNumIKeys = 8;
467 const auto print_key_versions = [&](ColumnFamilyHandle* cfh) {
468 Status s = GetAllKeyVersions(db_, cfh, begin_key, end_key,
469 kMaxNumIKeys, &versions);
470 if (!s.ok()) {
471 fprintf(stderr, "%s\n", s.ToString().c_str());
472 return;
473 }
474 assert(nullptr != cfh);
475 fprintf(stderr,
476 "Internal keys in CF '%s', [%s, %s] (max %" ROCKSDB_PRIszt
477 ")\n",
478 cfh->GetName().c_str(),
479 begin_key.ToString(true /* hex */).c_str(),
480 end_key.ToString(true /* hex */).c_str(), kMaxNumIKeys);
481 for (const KeyVersion& kv : versions) {
482 fprintf(stderr, " key %s seq %" PRIu64 " type %d\n",
483 Slice(kv.user_key).ToString(true).c_str(), kv.sequence,
484 kv.type);
485 }
486 };
487 if (1 == num_mismatched_cfs) {
488 print_key_versions(column_families_[0]);
489 }
490 print_key_versions(column_families_[i]);
491#endif // ROCKSDB_LITE
492 shared->SetVerificationFailure();
493 }
494 }
495 }
496 shared->FinishPrintingVerificationResults();
497 for (auto& iter : iters) {
498 iter->Next();
499 }
500 } while (true);
501 }
502
503#ifndef ROCKSDB_LITE
504 void ContinuouslyVerifyDb(ThreadState* thread) const override {
505 assert(thread);
506 Status status;
507
508 DB* db_ptr = cmp_db_ ? cmp_db_ : db_;
509 const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_;
510 const auto ss_deleter = [&](const Snapshot* ss) {
511 db_ptr->ReleaseSnapshot(ss);
512 };
513 std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard(
514 db_ptr->GetSnapshot(), ss_deleter);
515 if (cmp_db_) {
516 status = cmp_db_->TryCatchUpWithPrimary();
517 }
518 SharedState* shared = thread->shared;
519 assert(shared);
520 if (!status.ok()) {
521 shared->SetShouldStopTest();
522 return;
523 }
524 assert(cmp_db_ || snapshot_guard.get());
525 const auto checksum_column_family = [](Iterator* iter,
526 uint32_t* checksum) -> Status {
527 assert(nullptr != checksum);
528 uint32_t ret = 0;
529 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
530 ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
531 ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
532 }
533 *checksum = ret;
534 return iter->status();
535 };
536 ReadOptions ropts;
537 ropts.total_order_seek = true;
538 ropts.snapshot = snapshot_guard.get();
539 uint32_t crc = 0;
540 {
541 // Compute crc for all key-values of default column family.
542 std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts));
543 status = checksum_column_family(it.get(), &crc);
544 }
545 uint32_t tmp_crc = 0;
546 if (status.ok()) {
547 for (ColumnFamilyHandle* cfh : cfhs) {
548 if (cfh == db_ptr->DefaultColumnFamily()) {
549 continue;
550 }
551 std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts, cfh));
552 status = checksum_column_family(it.get(), &tmp_crc);
553 if (!status.ok() || tmp_crc != crc) {
554 break;
555 }
556 }
557 }
558 if (!status.ok() || tmp_crc != crc) {
559 shared->SetShouldStopTest();
560 }
561 }
562#endif // !ROCKSDB_LITE
563
564 std::vector<int> GenerateColumnFamilies(
565 const int /* num_column_families */,
566 int /* rand_column_family */) const override {
567 std::vector<int> ret;
568 int num = static_cast<int>(column_families_.size());
569 int k = 0;
570 std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; });
571 return ret;
572 }
573
574 private:
575 std::atomic<int64_t> batch_id_;
576};
577
578StressTest* CreateCfConsistencyStressTest() {
579 return new CfConsistencyStressTest();
580}
581
582} // namespace ROCKSDB_NAMESPACE
583#endif // GFLAGS