X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Frocksdb%2Fdb_stress_tool%2Fno_batched_ops_stress.cc;h=269d0886d3f49fb089b523df285332cebb6ff390;hb=20effc670b57271cb089376d6d0800990e5218d5;hp=2e6734f21c0922c51cc48da7a84dc0aae07e8e4f;hpb=a71831dadd1e1f3e0fa70405511f65cc33db0498;p=ceph.git diff --git a/ceph/src/rocksdb/db_stress_tool/no_batched_ops_stress.cc b/ceph/src/rocksdb/db_stress_tool/no_batched_ops_stress.cc index 2e6734f21..269d0886d 100644 --- a/ceph/src/rocksdb/db_stress_tool/no_batched_ops_stress.cc +++ b/ceph/src/rocksdb/db_stress_tool/no_batched_ops_stress.cc @@ -9,6 +9,9 @@ #ifdef GFLAGS #include "db_stress_tool/db_stress_common.h" +#ifndef NDEBUG +#include "utilities/fault_injection_fs.h" +#endif // NDEBUG namespace ROCKSDB_NAMESPACE { class NonBatchedOpsStressTest : public StressTest { @@ -33,8 +36,8 @@ class NonBatchedOpsStressTest : public StressTest { if (thread->shared->HasVerificationFailedYet()) { break; } - if (!thread->rand.OneIn(2)) { - // Use iterator to verify this range + if (thread->rand.OneIn(3)) { + // 1/3 chance use iterator to verify this range Slice prefix; std::string seek_key = Key(start); std::unique_ptr iter( @@ -79,8 +82,8 @@ class NonBatchedOpsStressTest : public StressTest { from_db.data(), from_db.length()); } } - } else { - // Use Get to verify this range + } else if (thread->rand.OneIn(2)) { + // 1/3 chance use Get to verify this range for (auto i = start; i < end; i++) { if (thread->shared->HasVerificationFailedYet()) { break; @@ -96,6 +99,38 @@ class NonBatchedOpsStressTest : public StressTest { from_db.data(), from_db.length()); } } + } else { + // 1/3 chance use MultiGet to verify this range + for (auto i = start; i < end;) { + if (thread->shared->HasVerificationFailedYet()) { + break; + } + // Keep the batch size to some reasonable value + size_t batch_size = thread->rand.Uniform(128) + 1; + batch_size = std::min(batch_size, end - i); + std::vector keystrs(batch_size); + std::vector keys(batch_size); + std::vector values(batch_size); + std::vector statuses(batch_size); + for (size_t j = 0; j < batch_size; ++j) { + keystrs[j] = Key(i + j); + keys[j] = Slice(keystrs[j].data(), keystrs[j].length()); + } + db_->MultiGet(options, column_families_[cf], batch_size, keys.data(), + values.data(), statuses.data()); + for (size_t j = 0; j < batch_size; ++j) { + Status s = statuses[j]; + std::string from_db = values[j].ToString(); + VerifyValue(static_cast(cf), i + j, options, shared, from_db, + s, true); + if (from_db.length()) { + PrintKeyValue(static_cast(cf), static_cast(i + j), + from_db.data(), from_db.length()); + } + } + + i += batch_size; + } } } } @@ -144,18 +179,52 @@ class NonBatchedOpsStressTest : public StressTest { std::string key_str = Key(rand_keys[0]); Slice key = key_str; std::string from_db; + int error_count = 0; + +#ifndef NDEBUG + if (fault_fs_guard) { + fault_fs_guard->EnableErrorInjection(); + SharedState::ignore_read_error = false; + } +#endif // NDEBUG Status s = db_->Get(read_opts, cfh, key, &from_db); +#ifndef NDEBUG + if (fault_fs_guard) { + error_count = fault_fs_guard->GetAndResetErrorCount(); + } +#endif // NDEBUG if (s.ok()) { +#ifndef NDEBUG + if (fault_fs_guard) { + if (error_count && !SharedState::ignore_read_error) { + // Grab mutex so multiple thread don't try to print the + // stack trace at the same time + MutexLock l(thread->shared->GetMutex()); + fprintf(stderr, "Didn't get expected error from Get\n"); + fprintf(stderr, "Callstack that injected the fault\n"); + fault_fs_guard->PrintFaultBacktrace(); + std::terminate(); + } + } +#endif // NDEBUG // found case thread->stats.AddGets(1, 1); } else if (s.IsNotFound()) { // not found case thread->stats.AddGets(1, 0); } else { - // errors case - fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str()); - thread->stats.AddErrors(1); + if (error_count == 0) { + // errors case + thread->stats.AddErrors(1); + } else { + thread->stats.AddVerifiedErrors(1); + } + } +#ifndef NDEBUG + if (fault_fs_guard) { + fault_fs_guard->DisableErrorInjection(); } +#endif // NDEBUG return s; } @@ -171,6 +240,15 @@ class NonBatchedOpsStressTest : public StressTest { std::vector values(num_keys); std::vector statuses(num_keys); ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]]; + int error_count = 0; + // Do a consistency check between Get and MultiGet. Don't do it too + // often as it will slow db_stress down + bool do_consistency_check = thread->rand.OneIn(4); + + ReadOptions readoptionscopy = read_opts; + if (do_consistency_check) { + readoptionscopy.snapshot = db_->GetSnapshot(); + } // To appease clang analyzer const bool use_txn = FLAGS_use_txn; @@ -231,18 +309,96 @@ class NonBatchedOpsStressTest : public StressTest { } if (!use_txn) { - db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), +#ifndef NDEBUG + if (fault_fs_guard) { + fault_fs_guard->EnableErrorInjection(); + SharedState::ignore_read_error = false; + } +#endif // NDEBUG + db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(), statuses.data()); +#ifndef NDEBUG + if (fault_fs_guard) { + error_count = fault_fs_guard->GetAndResetErrorCount(); + } +#endif // NDEBUG } else { #ifndef ROCKSDB_LITE - txn->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), + txn->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(), statuses.data()); - RollbackTxn(txn); #endif } - for (const auto& s : statuses) { - if (s.ok()) { +#ifndef NDEBUG + if (fault_fs_guard && error_count && !SharedState::ignore_read_error) { + int stat_nok = 0; + for (const auto& s : statuses) { + if (!s.ok() && !s.IsNotFound()) { + stat_nok++; + } + } + + if (stat_nok < error_count) { + // Grab mutex so multiple thread don't try to print the + // stack trace at the same time + MutexLock l(thread->shared->GetMutex()); + fprintf(stderr, "Didn't get expected error from MultiGet\n"); + fprintf(stderr, "Callstack that injected the fault\n"); + fault_fs_guard->PrintFaultBacktrace(); + std::terminate(); + } + } + if (fault_fs_guard) { + fault_fs_guard->DisableErrorInjection(); + } +#endif // NDEBUG + + for (size_t i = 0; i < statuses.size(); ++i) { + Status s = statuses[i]; + bool is_consistent = true; + // Only do the consistency check if no error was injected and MultiGet + // didn't return an unexpected error + if (do_consistency_check && !error_count && (s.ok() || s.IsNotFound())) { + Status tmp_s; + std::string value; + + if (use_txn) { +#ifndef ROCKSDB_LITE + tmp_s = txn->Get(readoptionscopy, cfh, keys[i], &value); +#endif // ROCKSDB_LITE + } else { + tmp_s = db_->Get(readoptionscopy, cfh, keys[i], &value); + } + if (!tmp_s.ok() && !tmp_s.IsNotFound()) { + fprintf(stderr, "Get error: %s\n", s.ToString().c_str()); + is_consistent = false; + } else if (!s.ok() && tmp_s.ok()) { + fprintf(stderr, "MultiGet returned different results with key %s\n", + keys[i].ToString(true).c_str()); + fprintf(stderr, "Get returned ok, MultiGet returned not found\n"); + is_consistent = false; + } else if (s.ok() && tmp_s.IsNotFound()) { + fprintf(stderr, "MultiGet returned different results with key %s\n", + keys[i].ToString(true).c_str()); + fprintf(stderr, "MultiGet returned ok, Get returned not found\n"); + is_consistent = false; + } else if (s.ok() && value != values[i].ToString()) { + fprintf(stderr, "MultiGet returned different results with key %s\n", + keys[i].ToString(true).c_str()); + fprintf(stderr, "MultiGet returned value %s\n", + values[i].ToString(true).c_str()); + fprintf(stderr, "Get returned value %s\n", value.c_str()); + is_consistent = false; + } + } + + if (!is_consistent) { + fprintf(stderr, "TestMultiGet error: is_consistent is false\n"); + thread->stats.AddErrors(1); + // Fail fast to preserve the DB state + thread->shared->SetVerificationFailure(); + break; + } else if (s.ok()) { // found case thread->stats.AddGets(1, 1); } else if (s.IsNotFound()) { @@ -252,11 +408,24 @@ class NonBatchedOpsStressTest : public StressTest { // With txn this is sometimes expected. thread->stats.AddGets(1, 1); } else { - // errors case - fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str()); - thread->stats.AddErrors(1); + if (error_count == 0) { + // errors case + fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str()); + thread->stats.AddErrors(1); + } else { + thread->stats.AddVerifiedErrors(1); + } } } + + if (readoptionscopy.snapshot) { + db_->ReleaseSnapshot(readoptionscopy.snapshot); + } + if (use_txn) { +#ifndef ROCKSDB_LITE + RollbackTxn(txn); +#endif + } return statuses; }