]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/tools/db_repl_stress.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / tools / db_repl_stress.cc
index 794ac1530e1ab22b406f2e4125e3fc6427328ca0..ba680f4f20c0e38c8d549a708bde4a66d977f8a3 100644 (file)
@@ -27,13 +27,30 @@ int main() {
 // --num_inserts = the num of inserts the first thread should perform.
 // --wal_ttl = the wal ttl for the run.
 
-using namespace ROCKSDB_NAMESPACE;
+DEFINE_uint64(num_inserts, 1000,
+              "the num of inserts the first thread should"
+              " perform.");
+DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
+DEFINE_uint64(wal_size_limit_MB, 10,
+              "the wal size limit for the run"
+              "(in MB)");
+
+using ROCKSDB_NAMESPACE::BatchResult;
+using ROCKSDB_NAMESPACE::DB;
+using ROCKSDB_NAMESPACE::DestroyDB;
+using ROCKSDB_NAMESPACE::Env;
+using ROCKSDB_NAMESPACE::Options;
+using ROCKSDB_NAMESPACE::Random;
+using ROCKSDB_NAMESPACE::SequenceNumber;
+using ROCKSDB_NAMESPACE::Slice;
+using ROCKSDB_NAMESPACE::Status;
+using ROCKSDB_NAMESPACE::TransactionLogIterator;
+using ROCKSDB_NAMESPACE::WriteOptions;
 
 using GFLAGS_NAMESPACE::ParseCommandLineFlags;
 using GFLAGS_NAMESPACE::SetUsageMessage;
 
 struct DataPumpThread {
-  size_t no_records;
   DB* db;  // Assumption DB is Open'ed already.
 };
 
@@ -41,8 +58,8 @@ static void DataPumpThreadBody(void* arg) {
   DataPumpThread* t = reinterpret_cast<DataPumpThread*>(arg);
   DB* db = t->db;
   Random rnd(301);
-  size_t i = 0;
-  while (i++ < t->no_records) {
+  uint64_t i = 0;
+  while (i++ < FLAGS_num_inserts) {
     if (!db->Put(WriteOptions(), Slice(rnd.RandomString(500)),
                  Slice(rnd.RandomString(500)))
              .ok()) {
@@ -52,45 +69,6 @@ static void DataPumpThreadBody(void* arg) {
   }
 }
 
-struct ReplicationThread {
-  std::atomic<bool> stop;
-  DB* db;
-  volatile size_t no_read;
-};
-
-static void ReplicationThreadBody(void* arg) {
-  ReplicationThread* t = reinterpret_cast<ReplicationThread*>(arg);
-  DB* db = t->db;
-  std::unique_ptr<TransactionLogIterator> iter;
-  SequenceNumber currentSeqNum = 1;
-  while (!t->stop.load(std::memory_order_acquire)) {
-    iter.reset();
-    Status s;
-    while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
-      if (t->stop.load(std::memory_order_acquire)) {
-        return;
-      }
-    }
-    fprintf(stderr, "Refreshing iterator\n");
-    for (; iter->Valid(); iter->Next(), t->no_read++, currentSeqNum++) {
-      BatchResult res = iter->GetBatch();
-      if (res.sequence != currentSeqNum) {
-        fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n",
-                (long)currentSeqNum, (long)res.sequence);
-        exit(1);
-      }
-    }
-  }
-}
-
-DEFINE_uint64(num_inserts, 1000,
-              "the num of inserts the first thread should"
-              " perform.");
-DEFINE_uint64(wal_ttl_seconds, 1000, "the wal ttl for the run(in seconds)");
-DEFINE_uint64(wal_size_limit_MB, 10,
-              "the wal size limit for the run"
-              "(in MB)");
-
 int main(int argc, const char** argv) {
   SetUsageMessage(
       std::string("\nUSAGE:\n") + std::string(argv[0]) +
@@ -117,29 +95,38 @@ int main(int argc, const char** argv) {
   }
 
   DataPumpThread dataPump;
-  dataPump.no_records = FLAGS_num_inserts;
   dataPump.db = db;
   env->StartThread(DataPumpThreadBody, &dataPump);
 
-  ReplicationThread replThread;
-  replThread.db = db;
-  replThread.no_read = 0;
-  replThread.stop.store(false, std::memory_order_release);
-
-  env->StartThread(ReplicationThreadBody, &replThread);
-  while (replThread.no_read < FLAGS_num_inserts)
-    ;
-  replThread.stop.store(true, std::memory_order_release);
-  if (replThread.no_read < dataPump.no_records) {
-    // no. read should be => than inserted.
-    fprintf(stderr,
-            "No. of Record's written and read not same\nRead : %" ROCKSDB_PRIszt
-            " Written : %" ROCKSDB_PRIszt "\n",
-            replThread.no_read, dataPump.no_records);
-    exit(1);
+  std::unique_ptr<TransactionLogIterator> iter;
+  SequenceNumber currentSeqNum = 1;
+  uint64_t num_read = 0;
+  for (;;) {
+    iter.reset();
+    // Continue to probe a bit more after all received
+    size_t probes = 0;
+    while (!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
+      probes++;
+      if (probes > 100 && num_read >= FLAGS_num_inserts) {
+        if (num_read > FLAGS_num_inserts) {
+          fprintf(stderr, "Too many updates read: %ld expected: %ld\n",
+                  (long)num_read, (long)FLAGS_num_inserts);
+          exit(1);
+        }
+        fprintf(stderr, "Successful!\n");
+        return 0;
+      }
+    }
+    fprintf(stderr, "Refreshing iterator\n");
+    for (; iter->Valid(); iter->Next(), num_read++, currentSeqNum++) {
+      BatchResult res = iter->GetBatch();
+      if (res.sequence != currentSeqNum) {
+        fprintf(stderr, "Missed a seq no. b/w %ld and %ld\n",
+                (long)currentSeqNum, (long)res.sequence);
+        exit(1);
+      }
+    }
   }
-  fprintf(stderr, "Successful!\n");
-  exit(0);
 }
 
 #endif  // GFLAGS