]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db_stress_tool/batched_ops_stress.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db_stress_tool / batched_ops_stress.cc
index ea1fab5696d02bfa0ccbcf01061c024bcf1a50dd..3f34460762ae870cdf5b5796979c316d870d2257 100644 (file)
@@ -16,37 +16,55 @@ class BatchedOpsStressTest : public StressTest {
   BatchedOpsStressTest() {}
   virtual ~BatchedOpsStressTest() {}
 
-  // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
-  // ("9"+K, "9"+V) in DB atomically i.e in a single batch.
+  bool IsStateTracked() const override { return false; }
+
+  // Given a key K and value V, this puts ("0"+K, V+"0"), ("1"+K, V+"1"), ...,
+  // ("9"+K, V+"9") in DB atomically i.e in a single batch.
   // Also refer BatchedOpsStressTest::TestGet
   Status TestPut(ThreadState* thread, WriteOptions& write_opts,
                  const ReadOptions& /* read_opts */,
                  const std::vector<int>& rand_column_families,
-                 const std::vector<int64_t>& rand_keys, char (&value)[100],
-                 std::unique_ptr<MutexLock>& /* lock */) override {
-    uint32_t value_base =
+                 const std::vector<int64_t>& rand_keys,
+                 char (&value)[100]) override {
+    assert(!rand_column_families.empty());
+    assert(!rand_keys.empty());
+
+    const std::string key_body = Key(rand_keys[0]);
+
+    const uint32_t value_base =
         thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
-    size_t sz = GenerateValue(value_base, value, sizeof(value));
-    Slice v(value, sz);
-    std::string keys[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"};
-    std::string values[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"};
-    Slice value_slices[10];
-    WriteBatch batch;
-    Status s;
-    auto cfh = column_families_[rand_column_families[0]];
-    std::string key_str = Key(rand_keys[0]);
-    for (int i = 0; i < 10; i++) {
-      keys[i] += key_str;
-      values[i] += v.ToString();
-      value_slices[i] = values[i];
+    const size_t sz = GenerateValue(value_base, value, sizeof(value));
+    const std::string value_body = Slice(value, sz).ToString();
+
+    WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
+                     FLAGS_batch_protection_bytes_per_key,
+                     FLAGS_user_timestamp_size);
+
+    ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
+    assert(cfh);
+
+    for (int i = 9; i >= 0; --i) {
+      const std::string num = std::to_string(i);
+
+      // Note: the digit in num is prepended to the key; however, it is appended
+      // to the value because we want the "value base" to be encoded uniformly
+      // at the beginning of the value for all types of stress tests (e.g.
+      // batched, non-batched, CF consistency).
+      const std::string k = num + key_body;
+      const std::string v = value_body + num;
+
       if (FLAGS_use_merge) {
-        batch.Merge(cfh, keys[i], value_slices[i]);
+        batch.Merge(cfh, k, v);
+      } else if (FLAGS_use_put_entity_one_in > 0 &&
+                 (value_base % FLAGS_use_put_entity_one_in) == 0) {
+        batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
       } else {
-        batch.Put(cfh, keys[i], value_slices[i]);
+        batch.Put(cfh, k, v);
       }
     }
 
-    s = db_->Write(write_opts, &batch);
+    const Status s = db_->Write(write_opts, &batch);
+
     if (!s.ok()) {
       fprintf(stderr, "multiput error: %s\n", s.ToString().c_str());
       thread->stats.AddErrors(1);
@@ -58,15 +76,16 @@ class BatchedOpsStressTest : public StressTest {
     return s;
   }
 
-  // Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K)
+  // Given a key K, this deletes ("0"+K), ("1"+K), ..., ("9"+K)
   // in DB atomically i.e in a single batch. Also refer MultiGet.
   Status TestDelete(ThreadState* thread, WriteOptions& writeoptions,
                     const std::vector<int>& rand_column_families,
-                    const std::vector<int64_t>& rand_keys,
-                    std::unique_ptr<MutexLock>& /* lock */) override {
+                    const std::vector<int64_t>& rand_keys) override {
     std::string keys[10] = {"9", "7", "5", "3", "1", "8", "6", "4", "2", "0"};
 
-    WriteBatch batch;
+    WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
+                     FLAGS_batch_protection_bytes_per_key,
+                     FLAGS_user_timestamp_size);
     Status s;
     auto cfh = column_families_[rand_column_families[0]];
     std::string key_str = Key(rand_keys[0]);
@@ -89,8 +108,7 @@ class BatchedOpsStressTest : public StressTest {
   Status TestDeleteRange(ThreadState* /* thread */,
                          WriteOptions& /* write_opts */,
                          const std::vector<int>& /* rand_column_families */,
-                         const std::vector<int64_t>& /* rand_keys */,
-                         std::unique_ptr<MutexLock>& /* lock */) override {
+                         const std::vector<int64_t>& /* rand_keys */) override {
     assert(false);
     return Status::NotSupported(
         "BatchedOpsStressTest does not support "
@@ -100,8 +118,7 @@ class BatchedOpsStressTest : public StressTest {
   void TestIngestExternalFile(
       ThreadState* /* thread */,
       const std::vector<int>& /* rand_column_families */,
-      const std::vector<int64_t>& /* rand_keys */,
-      std::unique_ptr<MutexLock>& /* lock */) override {
+      const std::vector<int64_t>& /* rand_keys */) override {
     assert(false);
     fprintf(stderr,
             "BatchedOpsStressTest does not support "
@@ -109,9 +126,9 @@ class BatchedOpsStressTest : public StressTest {
     std::terminate();
   }
 
-  // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K
+  // Given a key K, this gets values for "0"+K, "1"+K, ..., "9"+K
   // in the same snapshot, and verifies that all the values are of the form
-  // "0"+V, "1"+V,..."9"+V.
+  // V+"0", V+"1", ..., V+"9".
   // ASSUMES that BatchedOpsStressTest::TestPut was used to put (K, V) into
   // the DB.
   Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
@@ -143,13 +160,19 @@ class BatchedOpsStressTest : public StressTest {
       } else {
         values[i] = from_db;
 
-        char expected_prefix = (keys[i])[0];
-        char actual_prefix = (values[i])[0];
-        if (actual_prefix != expected_prefix) {
-          fprintf(stderr, "error expected prefix = %c actual = %c\n",
-                  expected_prefix, actual_prefix);
+        assert(!keys[i].empty());
+        assert(!values[i].empty());
+
+        const char expected = keys[i].front();
+        const char actual = values[i].back();
+
+        if (expected != actual) {
+          fprintf(stderr, "get error expected = %c actual = %c\n", expected,
+                  actual);
         }
-        (values[i])[0] = ' ';  // blank out the differing character
+
+        values[i].pop_back();  // get rid of the differing character
+
         thread->stats.AddGets(1, 1);
       }
     }
@@ -158,7 +181,7 @@ class BatchedOpsStressTest : public StressTest {
     // Now that we retrieved all values, check that they all match
     for (int i = 1; i < 10; i++) {
       if (values[i] != values[0]) {
-        fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
+        fprintf(stderr, "get error: inconsistent values for key %s: %s, %s\n",
                 key.ToString(true).c_str(), StringToHex(values[0]).c_str(),
                 StringToHex(values[i]).c_str());
         // we continue after error rather than exiting so that we can
@@ -175,8 +198,8 @@ class BatchedOpsStressTest : public StressTest {
       const std::vector<int64_t>& rand_keys) override {
     size_t num_keys = rand_keys.size();
     std::vector<Status> ret_status(num_keys);
-    std::array<std::string, 10> keys = {{"0", "1", "2", "3", "4",
-                                         "5", "6", "7", "8", "9"}};
+    std::array<std::string, 10> keys = {
+        {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}};
     size_t num_prefixes = keys.size();
     for (size_t rand_key = 0; rand_key < num_keys; ++rand_key) {
       std::vector<Slice> key_slices;
@@ -184,6 +207,8 @@ class BatchedOpsStressTest : public StressTest {
       std::vector<Status> statuses(num_prefixes);
       ReadOptions readoptionscopy = readoptions;
       readoptionscopy.snapshot = db_->GetSnapshot();
+      readoptionscopy.rate_limiter_priority =
+          FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
       std::vector<std::string> key_str;
       key_str.reserve(num_prefixes);
       key_slices.reserve(num_prefixes);
@@ -199,7 +224,7 @@ class BatchedOpsStressTest : public StressTest {
       for (size_t i = 0; i < num_prefixes; i++) {
         Status s = statuses[i];
         if (!s.ok() && !s.IsNotFound()) {
-          fprintf(stderr, "get error: %s\n", s.ToString().c_str());
+          fprintf(stderr, "multiget error: %s\n", s.ToString().c_str());
           thread->stats.AddErrors(1);
           ret_status[rand_key] = s;
           // we continue after error rather than exiting so that we can
@@ -208,17 +233,19 @@ class BatchedOpsStressTest : public StressTest {
           thread->stats.AddGets(1, 0);
           ret_status[rand_key] = s;
         } else {
-          char expected_prefix = (keys[i])[0];
-          char actual_prefix = (values[i])[0];
-          if (actual_prefix != expected_prefix) {
-            fprintf(stderr, "error expected prefix = %c actual = %c\n",
-                    expected_prefix, actual_prefix);
+          assert(!keys[i].empty());
+          assert(!values[i].empty());
+
+          const char expected = keys[i][0];
+          const char actual = values[i][values[i].size() - 1];
+
+          if (expected != actual) {
+            fprintf(stderr, "multiget error expected = %c actual = %c\n",
+                    expected, actual);
           }
-          std::string str;
-          str.assign(values[i].data(), values[i].size());
-          values[i].Reset();
-          str[0] = ' ';  // blank out the differing character
-          values[i].PinSelf(str);
+
+          values[i].remove_suffix(1);  // get rid of the differing character
+
           thread->stats.AddGets(1, 1);
         }
       }
@@ -227,8 +254,10 @@ class BatchedOpsStressTest : public StressTest {
       // Now that we retrieved all values, check that they all match
       for (size_t i = 1; i < num_prefixes; i++) {
         if (values[i] != values[0]) {
-          fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n",
-                  key_str[i].c_str(), StringToHex(values[0].ToString()).c_str(),
+          fprintf(stderr,
+                  "multiget error: inconsistent values for key %s: %s, %s\n",
+                  StringToHex(key_str[i]).c_str(),
+                  StringToHex(values[0].ToString()).c_str(),
                   StringToHex(values[i].ToString()).c_str());
           // we continue after error rather than exiting so that we can
           // find more errors if any
@@ -239,100 +268,129 @@ class BatchedOpsStressTest : public StressTest {
     return ret_status;
   }
 
-  // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
+  // Given a key, this does prefix scans for "0"+P, "1"+P, ..., "9"+P
   // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
   // of the key. Each of these 10 scans returns a series of values;
   // each series should be the same length, and it is verified for each
-  // index i that all the i'th values are of the form "0"+V, "1"+V,..."9"+V.
+  // index i that all the i'th values are of the form V+"0", V+"1", ..., V+"9".
   // ASSUMES that MultiPut was used to put (K, V)
   Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
                         const std::vector<int>& rand_column_families,
                         const std::vector<int64_t>& rand_keys) override {
-    size_t prefix_to_use =
-        (FLAGS_prefix_size < 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size);
-    std::string key_str = Key(rand_keys[0]);
-    Slice key = key_str;
-    auto cfh = column_families_[rand_column_families[0]];
-    std::string prefixes[10] = {"0", "1", "2", "3", "4",
-                                "5", "6", "7", "8", "9"};
-    Slice prefix_slices[10];
-    ReadOptions readoptionscopy[10];
-    const Snapshot* snapshot = db_->GetSnapshot();
-    Iterator* iters[10];
-    std::string upper_bounds[10];
-    Slice ub_slices[10];
-    Status s = Status::OK();
-    for (int i = 0; i < 10; i++) {
-      prefixes[i] += key.ToString();
-      prefixes[i].resize(prefix_to_use);
-      prefix_slices[i] = Slice(prefixes[i]);
-      readoptionscopy[i] = readoptions;
-      readoptionscopy[i].snapshot = snapshot;
+    assert(!rand_column_families.empty());
+    assert(!rand_keys.empty());
+
+    const std::string key = Key(rand_keys[0]);
+
+    assert(FLAGS_prefix_size > 0);
+    const size_t prefix_to_use = static_cast<size_t>(FLAGS_prefix_size);
+
+    constexpr size_t num_prefixes = 10;
+
+    std::array<std::string, num_prefixes> prefixes;
+    std::array<Slice, num_prefixes> prefix_slices;
+    std::array<ReadOptions, num_prefixes> ro_copies;
+    std::array<std::string, num_prefixes> upper_bounds;
+    std::array<Slice, num_prefixes> ub_slices;
+    std::array<std::unique_ptr<Iterator>, num_prefixes> iters;
+
+    const Snapshot* const snapshot = db_->GetSnapshot();
+
+    ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
+    assert(cfh);
+
+    for (size_t i = 0; i < num_prefixes; ++i) {
+      prefixes[i] = std::to_string(i) + key;
+      prefix_slices[i] = Slice(prefixes[i].data(), prefix_to_use);
+
+      ro_copies[i] = readoptions;
+      ro_copies[i].snapshot = snapshot;
       if (thread->rand.OneIn(2) &&
           GetNextPrefix(prefix_slices[i], &(upper_bounds[i]))) {
         // For half of the time, set the upper bound to the next prefix
-        ub_slices[i] = Slice(upper_bounds[i]);
-        readoptionscopy[i].iterate_upper_bound = &(ub_slices[i]);
+        ub_slices[i] = upper_bounds[i];
+        ro_copies[i].iterate_upper_bound = &(ub_slices[i]);
       }
-      iters[i] = db_->NewIterator(readoptionscopy[i], cfh);
+
+      iters[i].reset(db_->NewIterator(ro_copies[i], cfh));
       iters[i]->Seek(prefix_slices[i]);
     }
 
-    long count = 0;
+    uint64_t count = 0;
+
     while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) {
-      count++;
-      std::string values[10];
+      ++count;
+
+      std::array<std::string, num_prefixes> values;
+
       // get list of all values for this iteration
-      for (int i = 0; i < 10; i++) {
+      for (size_t i = 0; i < num_prefixes; ++i) {
         // no iterator should finish before the first one
         assert(iters[i]->Valid() &&
                iters[i]->key().starts_with(prefix_slices[i]));
         values[i] = iters[i]->value().ToString();
 
-        char expected_first = (prefixes[i])[0];
-        char actual_first = (values[i])[0];
+        // make sure the last character of the value is the expected digit
+        assert(!prefixes[i].empty());
+        assert(!values[i].empty());
+
+        const char expected = prefixes[i].front();
+        const char actual = values[i].back();
 
-        if (actual_first != expected_first) {
-          fprintf(stderr, "error expected first = %c actual = %c\n",
-                  expected_first, actual_first);
+        if (expected != actual) {
+          fprintf(stderr, "prefix scan error expected = %c actual = %c\n",
+                  expected, actual);
         }
-        (values[i])[0] = ' ';  // blank out the differing character
-      }
-      // make sure all values are equivalent
-      for (int i = 0; i < 10; i++) {
+
+        values[i].pop_back();  // get rid of the differing character
+
+        // make sure all values are equivalent
         if (values[i] != values[0]) {
           fprintf(stderr,
-                  "error : %d, inconsistent values for prefix %s: %s, %s\n", i,
-                  prefixes[i].c_str(), StringToHex(values[0]).c_str(),
+                  "prefix scan error : %" ROCKSDB_PRIszt
+                  ", inconsistent values for prefix %s: %s, %s\n",
+                  i, prefix_slices[i].ToString(/* hex */ true).c_str(),
+                  StringToHex(values[0]).c_str(),
                   StringToHex(values[i]).c_str());
           // we continue after error rather than exiting so that we can
           // find more errors if any
         }
+
+        // make sure value() and columns() are consistent
+        const WideColumns expected_columns = GenerateExpectedWideColumns(
+            GetValueBase(iters[i]->value()), iters[i]->value());
+        if (iters[i]->columns() != expected_columns) {
+          fprintf(stderr,
+                  "prefix scan error : %" ROCKSDB_PRIszt
+                  ", value and columns inconsistent for prefix %s: %s\n",
+                  i, prefix_slices[i].ToString(/* hex */ true).c_str(),
+                  DebugString(iters[i]->value(), iters[i]->columns(),
+                              expected_columns)
+                      .c_str());
+        }
+
         iters[i]->Next();
       }
     }
 
     // cleanup iterators and snapshot
-    for (int i = 0; i < 10; i++) {
+    for (size_t i = 0; i < num_prefixes; ++i) {
       // if the first iterator finished, they should have all finished
       assert(!iters[i]->Valid() ||
              !iters[i]->key().starts_with(prefix_slices[i]));
       assert(iters[i]->status().ok());
-      delete iters[i];
     }
+
     db_->ReleaseSnapshot(snapshot);
 
-    if (s.ok()) {
-      thread->stats.AddPrefixes(1, count);
-    } else {
-      fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
-      thread->stats.AddErrors(1);
-    }
+    thread->stats.AddPrefixes(1, count);
 
-    return s;
+    return Status::OK();
   }
 
   void VerifyDb(ThreadState* /* thread */) const override {}
+
+  void ContinuouslyVerifyDb(ThreadState* /* thread */) const override {}
 };
 
 StressTest* CreateBatchedOpsStressTest() { return new BatchedOpsStressTest(); }