]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/merge_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / merge_test.cc
index 3f85f646416fb90e86d7408be47827ed6a3adfb1..047ea7fac26152ccf00026436dd817569d2fbe19 100644 (file)
@@ -4,8 +4,9 @@
 //  (found in the LICENSE.Apache file in the root directory).
 //
 #include <assert.h>
-#include <memory>
+
 #include <iostream>
+#include <memory>
 
 #include "db/db_impl/db_impl.h"
 #include "db/dbformat.h"
@@ -18,6 +19,7 @@
 #include "rocksdb/merge_operator.h"
 #include "rocksdb/utilities/db_ttl.h"
 #include "test_util/testharness.h"
+#include "util/coding.h"
 #include "utilities/merge_operators.h"
 
 namespace ROCKSDB_NAMESPACE {
@@ -296,6 +298,96 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) {
   }
 }
 
+void testCountersWithFlushAndCompaction(Counters& counters, DB* db) {
+  ASSERT_OK(db->Put({}, "1", "1"));
+  ASSERT_OK(db->Flush(FlushOptions()));
+
+  std::atomic<int> cnt{0};
+  const auto get_thread_id = [&cnt]() {
+    thread_local int thread_id{cnt++};
+    return thread_id;
+  };
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionSet::LogAndApply:BeforeWriterWaiting", [&](void* /*arg*/) {
+        int thread_id = get_thread_id();
+        if (1 == thread_id) {
+          TEST_SYNC_POINT(
+              "testCountersWithFlushAndCompaction::bg_compact_thread:0");
+        } else if (2 == thread_id) {
+          TEST_SYNC_POINT(
+              "testCountersWithFlushAndCompaction::bg_flush_thread:0");
+        }
+      });
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionSet::LogAndApply:WriteManifest", [&](void* /*arg*/) {
+        int thread_id = get_thread_id();
+        if (0 == thread_id) {
+          TEST_SYNC_POINT(
+              "testCountersWithFlushAndCompaction::set_options_thread:0");
+          TEST_SYNC_POINT(
+              "testCountersWithFlushAndCompaction::set_options_thread:1");
+        }
+      });
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionSet::LogAndApply:WakeUpAndDone", [&](void* arg) {
+        auto* mutex = reinterpret_cast<InstrumentedMutex*>(arg);
+        mutex->AssertHeld();
+        int thread_id = get_thread_id();
+        ASSERT_EQ(2, thread_id);
+        mutex->Unlock();
+        TEST_SYNC_POINT(
+            "testCountersWithFlushAndCompaction::bg_flush_thread:1");
+        TEST_SYNC_POINT(
+            "testCountersWithFlushAndCompaction::bg_flush_thread:2");
+        mutex->Lock();
+      });
+  SyncPoint::GetInstance()->LoadDependency({
+      {"testCountersWithFlushAndCompaction::set_options_thread:0",
+       "testCountersWithCompactionAndFlush:BeforeCompact"},
+      {"testCountersWithFlushAndCompaction::bg_compact_thread:0",
+       "testCountersWithFlushAndCompaction:BeforeIncCounters"},
+      {"testCountersWithFlushAndCompaction::bg_flush_thread:0",
+       "testCountersWithFlushAndCompaction::set_options_thread:1"},
+      {"testCountersWithFlushAndCompaction::bg_flush_thread:1",
+       "testCountersWithFlushAndCompaction:BeforeVerification"},
+      {"testCountersWithFlushAndCompaction:AfterGet",
+       "testCountersWithFlushAndCompaction::bg_flush_thread:2"},
+  });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  port::Thread set_options_thread([&]() {
+    ASSERT_OK(reinterpret_cast<DBImpl*>(db)->SetOptions(
+        {{"disable_auto_compactions", "false"}}));
+  });
+  TEST_SYNC_POINT("testCountersWithCompactionAndFlush:BeforeCompact");
+  port::Thread compact_thread([&]() {
+    ASSERT_OK(reinterpret_cast<DBImpl*>(db)->CompactRange(
+        CompactRangeOptions(), db->DefaultColumnFamily(), nullptr, nullptr));
+  });
+
+  TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeIncCounters");
+  counters.add("test-key", 1);
+
+  FlushOptions flush_opts;
+  flush_opts.wait = false;
+  ASSERT_OK(db->Flush(flush_opts));
+
+  TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeVerification");
+  std::string expected;
+  PutFixed64(&expected, 1);
+  std::string actual;
+  Status s = db->Get(ReadOptions(), "test-key", &actual);
+  TEST_SYNC_POINT("testCountersWithFlushAndCompaction:AfterGet");
+  set_options_thread.join();
+  compact_thread.join();
+  ASSERT_OK(s);
+  ASSERT_EQ(expected, actual);
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
 void testSuccessiveMerge(Counters& counters, size_t max_num_merges,
                          size_t num_merges) {
 
@@ -488,6 +580,19 @@ TEST_F(MergeTest, MergeDbTtlTest) {
   runTest(test::PerThreadDBPath("merge_testdbttl"),
           true);  // Run test on TTL database
 }
+
+TEST_F(MergeTest, MergeWithCompactionAndFlush) {
+  const std::string dbname =
+      test::PerThreadDBPath("merge_with_compaction_and_flush");
+  {
+    auto db = OpenDb(dbname);
+    {
+      MergeBasedCounters counters(db, 0);
+      testCountersWithFlushAndCompaction(counters, db.get());
+    }
+  }
+  DestroyDB(dbname, Options());
+}
 #endif  // !ROCKSDB_LITE
 
 }  // namespace ROCKSDB_NAMESPACE