]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_write_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_write_test.cc
index cc1aaac08226bd895feec474e2d760501e933689..945b08eda8e44a814fbaf4fe75fa129b87091ead 100644 (file)
@@ -4,25 +4,27 @@
 //  (found in the LICENSE.Apache file in the root directory).
 
 #include <atomic>
+#include <fstream>
 #include <memory>
 #include <thread>
 #include <vector>
-#include <fstream>
+
 #include "db/db_test_util.h"
 #include "db/write_batch_internal.h"
 #include "db/write_thread.h"
 #include "port/port.h"
 #include "port/stack_trace.h"
-#include "test_util/fault_injection_test_env.h"
 #include "test_util/sync_point.h"
+#include "util/random.h"
 #include "util/string_util.h"
+#include "utilities/fault_injection_env.h"
 
 namespace ROCKSDB_NAMESPACE {
 
 // Test variations of WriteImpl.
 class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
  public:
-  DBWriteTest() : DBTestBase("/db_write_test") {}
+  DBWriteTest() : DBTestBase("/db_write_test", /*env_do_fsync=*/true) {}
 
   Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
 
@@ -40,6 +42,125 @@ TEST_P(DBWriteTest, SyncAndDisableWAL) {
   ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
 }
 
+TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) {
+  Options options = GetOptions();
+  options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger =
+      4;
+  std::vector<port::Thread> threads;
+  std::atomic<int> thread_num(0);
+  port::Mutex mutex;
+  port::CondVar cv(&mutex);
+  // Guarded by mutex
+  int writers = 0;
+
+  Reopen(options);
+
+  std::function<void()> write_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = false;
+    dbfull()->Put(wo, key, "bar");
+  };
+  std::function<void()> write_no_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = true;
+    dbfull()->Put(wo, key, "bar");
+  };
+  std::function<void(void*)> unblock_main_thread_func = [&](void*) {
+    mutex.Lock();
+    ++writers;
+    cv.SignalAll();
+    mutex.Unlock();
+  };
+
+  // Create 3 L0 files and schedule 4th without waiting
+  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+  Flush();
+  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+  Flush();
+  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+  Flush();
+  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:1",
+        "DBImpl::BackgroundCallFlush:start"},
+       {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:2",
+        "DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"},
+       // Make compaction start wait for the write stall to be detected and
+       // implemented by a write group leader
+       {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:3",
+        "BackgroundCallCompaction:0"}});
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  // Schedule creation of 4th L0 file without waiting. This will seal the
+  // memtable and then wait for a sync point before writing the file. We need
+  // to do it this way because SwitchMemtable() needs to enter the
+  // write_thread
+  FlushOptions fopt;
+  fopt.wait = false;
+  dbfull()->Flush(fopt);
+
+  // Create a mix of slowdown/no_slowdown write threads
+  mutex.Lock();
+  // First leader
+  threads.emplace_back(write_slowdown_func);
+  while (writers != 1) {
+    cv.Wait();
+  }
+
+  // Second leader. Will stall writes
+  // Build a writers list with no slowdown in the middle:
+  //  +-------------+
+  //  | slowdown    +<----+ newest
+  //  +--+----------+
+  //     |
+  //     v
+  //  +--+----------+
+  //  | no slowdown |
+  //  +--+----------+
+  //     |
+  //     v
+  //  +--+----------+
+  //  | slowdown    +
+  //  +-------------+
+  threads.emplace_back(write_slowdown_func);
+  while (writers != 2) {
+    cv.Wait();
+  }
+  threads.emplace_back(write_no_slowdown_func);
+  while (writers != 3) {
+    cv.Wait();
+  }
+  threads.emplace_back(write_slowdown_func);
+  while (writers != 4) {
+    cv.Wait();
+  }
+
+  mutex.Unlock();
+
+  TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1");
+  dbfull()->TEST_WaitForFlushMemTable(nullptr);
+  // This would have triggered a write stall. Unblock the write group leader
+  TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2");
+  // The leader is going to create missing newer links. When the leader
+  // finishes, the next leader is going to delay writes and fail writers with
+  // no_slowdown
+
+  TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:3");
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
 TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
   Options options = GetOptions();
   options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
@@ -47,6 +168,8 @@ TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
   std::atomic<int> thread_num(0);
   port::Mutex mutex;
   port::CondVar cv(&mutex);
+  // Guarded by mutex
+  int writers = 0;
 
   Reopen(options);
 
@@ -66,6 +189,7 @@ TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
   };
   std::function<void(void *)> unblock_main_thread_func = [&](void *) {
     mutex.Lock();
+    ++writers;
     cv.SignalAll();
     mutex.Unlock();
   };
@@ -104,18 +228,18 @@ TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
   mutex.Lock();
   // First leader
   threads.emplace_back(write_slowdown_func);
-  cv.Wait();
+  while (writers != 1) {
+    cv.Wait();
+  }
   // Second leader. Will stall writes
   threads.emplace_back(write_slowdown_func);
-  cv.Wait();
   threads.emplace_back(write_no_slowdown_func);
-  cv.Wait();
   threads.emplace_back(write_slowdown_func);
-  cv.Wait();
   threads.emplace_back(write_no_slowdown_func);
-  cv.Wait();
   threads.emplace_back(write_slowdown_func);
-  cv.Wait();
+  while (writers != 6) {
+    cv.Wait();
+  }
   mutex.Unlock();
 
   TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
@@ -129,12 +253,14 @@ TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
   for (auto& t : threads) {
     t.join();
   }
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
 }
 
 TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
   constexpr int kNumThreads = 5;
   std::unique_ptr<FaultInjectionTestEnv> mock_env(
-      new FaultInjectionTestEnv(Env::Default()));
+      new FaultInjectionTestEnv(env_));
   Options options = GetOptions();
   options.env = mock_env.get();
   Reopen(options);
@@ -203,7 +329,7 @@ TEST_P(DBWriteTest, ManualWalFlushInEffect) {
 
 TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
   std::unique_ptr<FaultInjectionTestEnv> mock_env(
-      new FaultInjectionTestEnv(Env::Default()));
+      new FaultInjectionTestEnv(env_));
   Options options = GetOptions();
   options.env = mock_env.get();
   Reopen(options);
@@ -235,7 +361,7 @@ TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
 TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
   Random rnd(301);
   std::unique_ptr<FaultInjectionTestEnv> mock_env(
-      new FaultInjectionTestEnv(Env::Default()));
+      new FaultInjectionTestEnv(env_));
   Options options = GetOptions();
   options.env = mock_env.get();
   options.writable_file_max_buffer_size = 4 * 1024 * 1024;
@@ -246,7 +372,7 @@ TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
   mock_env->SetFilesystemActive(false, Status::IOError("Not active"));
   Status s;
   for (int i = 0; i < 4 * 512; ++i) {
-    s = Put(Key(i), RandomString(&rnd, 1024));
+    s = Put(Key(i), rnd.RandomString(1024));
     if (!s.ok()) {
       break;
     }