]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db_stress_tool/no_batched_ops_stress.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db_stress_tool / no_batched_ops_stress.cc
index 2e6734f21c0922c51cc48da7a84dc0aae07e8e4f..269d0886d3f49fb089b523df285332cebb6ff390 100644 (file)
@@ -9,6 +9,9 @@
 
 #ifdef GFLAGS
 #include "db_stress_tool/db_stress_common.h"
+#ifndef NDEBUG
+#include "utilities/fault_injection_fs.h"
+#endif // NDEBUG
 
 namespace ROCKSDB_NAMESPACE {
 class NonBatchedOpsStressTest : public StressTest {
@@ -33,8 +36,8 @@ class NonBatchedOpsStressTest : public StressTest {
       if (thread->shared->HasVerificationFailedYet()) {
         break;
       }
-      if (!thread->rand.OneIn(2)) {
-        // Use iterator to verify this range
+      if (thread->rand.OneIn(3)) {
+        // 1/3 chance use iterator to verify this range
         Slice prefix;
         std::string seek_key = Key(start);
         std::unique_ptr<Iterator> iter(
@@ -79,8 +82,8 @@ class NonBatchedOpsStressTest : public StressTest {
                           from_db.data(), from_db.length());
           }
         }
-      } else {
-        // Use Get to verify this range
+      } else if (thread->rand.OneIn(2)) {
+        // 1/3 chance use Get to verify this range
         for (auto i = start; i < end; i++) {
           if (thread->shared->HasVerificationFailedYet()) {
             break;
@@ -96,6 +99,38 @@ class NonBatchedOpsStressTest : public StressTest {
                           from_db.data(), from_db.length());
           }
         }
+      } else {
+        // 1/3 chance use MultiGet to verify this range
+        for (auto i = start; i < end;) {
+          if (thread->shared->HasVerificationFailedYet()) {
+            break;
+          }
+          // Keep the batch size to some reasonable value
+          size_t batch_size = thread->rand.Uniform(128) + 1;
+          batch_size = std::min<size_t>(batch_size, end - i);
+          std::vector<std::string> keystrs(batch_size);
+          std::vector<Slice> keys(batch_size);
+          std::vector<PinnableSlice> values(batch_size);
+          std::vector<Status> statuses(batch_size);
+          for (size_t j = 0; j < batch_size; ++j) {
+            keystrs[j] = Key(i + j);
+            keys[j] = Slice(keystrs[j].data(), keystrs[j].length());
+          }
+          db_->MultiGet(options, column_families_[cf], batch_size, keys.data(),
+                        values.data(), statuses.data());
+          for (size_t j = 0; j < batch_size; ++j) {
+            Status s = statuses[j];
+            std::string from_db = values[j].ToString();
+            VerifyValue(static_cast<int>(cf), i + j, options, shared, from_db,
+                        s, true);
+            if (from_db.length()) {
+              PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i + j),
+                            from_db.data(), from_db.length());
+            }
+          }
+
+          i += batch_size;
+        }
       }
     }
   }
@@ -144,18 +179,52 @@ class NonBatchedOpsStressTest : public StressTest {
     std::string key_str = Key(rand_keys[0]);
     Slice key = key_str;
     std::string from_db;
+    int error_count = 0;
+
+#ifndef NDEBUG
+    if (fault_fs_guard) {
+      fault_fs_guard->EnableErrorInjection();
+      SharedState::ignore_read_error = false;
+    }
+#endif // NDEBUG
     Status s = db_->Get(read_opts, cfh, key, &from_db);
+#ifndef NDEBUG
+    if (fault_fs_guard) {
+      error_count = fault_fs_guard->GetAndResetErrorCount();
+    }
+#endif // NDEBUG
     if (s.ok()) {
+#ifndef NDEBUG
+      if (fault_fs_guard) {
+        if (error_count && !SharedState::ignore_read_error) {
+          // Grab mutex so multiple thread don't try to print the
+          // stack trace at the same time
+          MutexLock l(thread->shared->GetMutex());
+          fprintf(stderr, "Didn't get expected error from Get\n");
+          fprintf(stderr, "Callstack that injected the fault\n");
+          fault_fs_guard->PrintFaultBacktrace();
+          std::terminate();
+        }
+      }
+#endif // NDEBUG
       // found case
       thread->stats.AddGets(1, 1);
     } else if (s.IsNotFound()) {
       // not found case
       thread->stats.AddGets(1, 0);
     } else {
-      // errors case
-      fprintf(stderr, "TestGet error: %s\n", s.ToString().c_str());
-      thread->stats.AddErrors(1);
+      if (error_count == 0) {
+        // errors case
+        thread->stats.AddErrors(1);
+      } else {
+        thread->stats.AddVerifiedErrors(1);
+      }
+    }
+#ifndef NDEBUG
+    if (fault_fs_guard) {
+      fault_fs_guard->DisableErrorInjection();
     }
+#endif // NDEBUG
     return s;
   }
 
@@ -171,6 +240,15 @@ class NonBatchedOpsStressTest : public StressTest {
     std::vector<PinnableSlice> values(num_keys);
     std::vector<Status> statuses(num_keys);
     ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
+    int error_count = 0;
+    // Do a consistency check between Get and MultiGet. Don't do it too
+    // often as it will slow db_stress down
+    bool do_consistency_check = thread->rand.OneIn(4);
+
+    ReadOptions readoptionscopy = read_opts;
+    if (do_consistency_check) {
+      readoptionscopy.snapshot = db_->GetSnapshot();
+    }
 
     // To appease clang analyzer
     const bool use_txn = FLAGS_use_txn;
@@ -231,18 +309,96 @@ class NonBatchedOpsStressTest : public StressTest {
     }
 
     if (!use_txn) {
-      db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
+#ifndef NDEBUG
+      if (fault_fs_guard) {
+        fault_fs_guard->EnableErrorInjection();
+        SharedState::ignore_read_error = false;
+      }
+#endif // NDEBUG
+      db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
                     statuses.data());
+#ifndef NDEBUG
+      if (fault_fs_guard) {
+        error_count = fault_fs_guard->GetAndResetErrorCount();
+      }
+#endif // NDEBUG
     } else {
 #ifndef ROCKSDB_LITE
-      txn->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(),
+      txn->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
                     statuses.data());
-      RollbackTxn(txn);
 #endif
     }
 
-    for (const auto& s : statuses) {
-      if (s.ok()) {
+#ifndef NDEBUG
+    if (fault_fs_guard && error_count && !SharedState::ignore_read_error) {
+      int stat_nok = 0;
+      for (const auto& s : statuses) {
+        if (!s.ok() && !s.IsNotFound()) {
+          stat_nok++;
+        }
+      }
+
+      if (stat_nok < error_count) {
+        // Grab mutex so multiple thread don't try to print the
+        // stack trace at the same time
+        MutexLock l(thread->shared->GetMutex());
+        fprintf(stderr, "Didn't get expected error from MultiGet\n");
+        fprintf(stderr, "Callstack that injected the fault\n");
+        fault_fs_guard->PrintFaultBacktrace();
+        std::terminate();
+      }
+    }
+    if (fault_fs_guard) {
+      fault_fs_guard->DisableErrorInjection();
+    }
+#endif // NDEBUG
+
+    for (size_t i = 0; i < statuses.size(); ++i) {
+      Status s = statuses[i];
+      bool is_consistent = true;
+      // Only do the consistency check if no error was injected and MultiGet
+      // didn't return an unexpected error
+      if (do_consistency_check && !error_count && (s.ok() || s.IsNotFound())) {
+        Status tmp_s;
+        std::string value;
+
+        if (use_txn) {
+#ifndef ROCKSDB_LITE
+          tmp_s = txn->Get(readoptionscopy, cfh, keys[i], &value);
+#endif  // ROCKSDB_LITE
+        } else {
+          tmp_s = db_->Get(readoptionscopy, cfh, keys[i], &value);
+        }
+        if (!tmp_s.ok() && !tmp_s.IsNotFound()) {
+          fprintf(stderr, "Get error: %s\n", s.ToString().c_str());
+          is_consistent = false;
+        } else if (!s.ok() && tmp_s.ok()) {
+          fprintf(stderr, "MultiGet returned different results with key %s\n",
+                  keys[i].ToString(true).c_str());
+          fprintf(stderr, "Get returned ok, MultiGet returned not found\n");
+          is_consistent = false;
+        } else if (s.ok() && tmp_s.IsNotFound()) {
+          fprintf(stderr, "MultiGet returned different results with key %s\n",
+                  keys[i].ToString(true).c_str());
+          fprintf(stderr, "MultiGet returned ok, Get returned not found\n");
+          is_consistent = false;
+        } else if (s.ok() && value != values[i].ToString()) {
+          fprintf(stderr, "MultiGet returned different results with key %s\n",
+                  keys[i].ToString(true).c_str());
+          fprintf(stderr, "MultiGet returned value %s\n",
+                  values[i].ToString(true).c_str());
+          fprintf(stderr, "Get returned value %s\n", value.c_str());
+          is_consistent = false;
+        }
+      }
+
+      if (!is_consistent) {
+        fprintf(stderr, "TestMultiGet error: is_consistent is false\n");
+        thread->stats.AddErrors(1);
+        // Fail fast to preserve the DB state
+        thread->shared->SetVerificationFailure();
+        break;
+      } else if (s.ok()) {
         // found case
         thread->stats.AddGets(1, 1);
       } else if (s.IsNotFound()) {
@@ -252,11 +408,24 @@ class NonBatchedOpsStressTest : public StressTest {
         // With txn this is sometimes expected.
         thread->stats.AddGets(1, 1);
       } else {
-        // errors case
-        fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
-        thread->stats.AddErrors(1);
+        if (error_count == 0) {
+          // errors case
+          fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
+          thread->stats.AddErrors(1);
+        } else {
+          thread->stats.AddVerifiedErrors(1);
+        }
       }
     }
+
+    if (readoptionscopy.snapshot) {
+      db_->ReleaseSnapshot(readoptionscopy.snapshot);
+    }
+    if (use_txn) {
+#ifndef ROCKSDB_LITE
+      RollbackTxn(txn);
+#endif
+    }
     return statuses;
   }