]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_write_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / db_write_test.cc
index e6bab87511483f33636282891fbe005520d9023c..cc1aaac08226bd895feec474e2d760501e933689 100644 (file)
@@ -7,16 +7,17 @@
 #include <memory>
 #include <thread>
 #include <vector>
+#include <fstream>
 #include "db/db_test_util.h"
 #include "db/write_batch_internal.h"
 #include "db/write_thread.h"
 #include "port/port.h"
 #include "port/stack_trace.h"
-#include "util/fault_injection_test_env.h"
+#include "test_util/fault_injection_test_env.h"
+#include "test_util/sync_point.h"
 #include "util/string_util.h"
-#include "util/sync_point.h"
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 // Test variations of WriteImpl.
 class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
@@ -39,6 +40,97 @@ TEST_P(DBWriteTest, SyncAndDisableWAL) {
   ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
 }
 
+TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
+  Options options = GetOptions();
+  options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
+  std::vector<port::Thread> threads;
+  std::atomic<int> thread_num(0);
+  port::Mutex mutex;
+  port::CondVar cv(&mutex);
+
+  Reopen(options);
+
+  std::function<void()> write_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = false;
+    dbfull()->Put(wo, key, "bar");
+  };
+  std::function<void()> write_no_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = true;
+    dbfull()->Put(wo, key, "bar");
+  };
+  std::function<void(void *)> unblock_main_thread_func = [&](void *) {
+    mutex.Lock();
+    cv.SignalAll();
+    mutex.Unlock();
+  };
+
+  // Create 3 L0 files and schedule 4th without waiting
+  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+  Flush();
+  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+  Flush();
+  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+  Flush();
+  Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBWriteTest::WriteThreadHangOnWriteStall:1",
+        "DBImpl::BackgroundCallFlush:start"},
+       {"DBWriteTest::WriteThreadHangOnWriteStall:2",
+        "DBImpl::WriteImpl:BeforeLeaderEnters"},
+       // Make compaction start wait for the write stall to be detected and
+       // implemented by a write group leader
+       {"DBWriteTest::WriteThreadHangOnWriteStall:3",
+        "BackgroundCallCompaction:0"}});
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  // Schedule creation of 4th L0 file without waiting. This will seal the
+  // memtable and then wait for a sync point before writing the file. We need
+  // to do it this way because SwitchMemtable() needs to enter the
+  // write_thread
+  FlushOptions fopt;
+  fopt.wait = false;
+  dbfull()->Flush(fopt);
+
+  // Create a mix of slowdown/no_slowdown write threads
+  mutex.Lock();
+  // First leader
+  threads.emplace_back(write_slowdown_func);
+  cv.Wait();
+  // Second leader. Will stall writes
+  threads.emplace_back(write_slowdown_func);
+  cv.Wait();
+  threads.emplace_back(write_no_slowdown_func);
+  cv.Wait();
+  threads.emplace_back(write_slowdown_func);
+  cv.Wait();
+  threads.emplace_back(write_no_slowdown_func);
+  cv.Wait();
+  threads.emplace_back(write_slowdown_func);
+  cv.Wait();
+  mutex.Unlock();
+
+  TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
+  dbfull()->TEST_WaitForFlushMemTable(nullptr);
+  // This would have triggered a write stall. Unblock the write group leader
+  TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
+  // The leader is going to create missing newer links. When the leader finishes,
+  // the next leader is going to delay writes and fail writers with no_slowdown
+
+  TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
+  for (auto& t : threads) {
+    t.join();
+  }
+}
+
 TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
   constexpr int kNumThreads = 5;
   std::unique_ptr<FaultInjectionTestEnv> mock_env(
@@ -185,15 +277,53 @@ TEST_P(DBWriteTest, LockWalInEffect) {
   ASSERT_OK(dbfull()->UnlockWAL());
 }
 
+TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
+    Options options = GetOptions();
+    options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
+    options.statistics->set_stats_level(StatsLevel::kAll);
+    Reopen(options);
+    std::string wal_key_prefix = "WAL_KEY_";
+    std::string no_wal_key_prefix = "K_";
+    // 100 KB value each for NO-WAL operation
+    std::string no_wal_value(1024 * 100, 'X');
+    // 1B value each for WAL operation
+    std::string wal_value = "0";
+    std::thread threads[10];
+    for (int t = 0; t < 10; t++) {
+        threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix, no_wal_value, this] {
+            for(int i = 0; i < 10; i++) {
+              ROCKSDB_NAMESPACE::WriteOptions write_option_disable;
+              write_option_disable.disableWAL = true;
+              ROCKSDB_NAMESPACE::WriteOptions write_option_default;
+              std::string no_wal_key = no_wal_key_prefix + std::to_string(t) +
+                                       "_" + std::to_string(i);
+              this->Put(no_wal_key, no_wal_value, write_option_disable);
+              std::string wal_key =
+                  wal_key_prefix + std::to_string(i) + "_" + std::to_string(i);
+              this->Put(wal_key, wal_value, write_option_default);
+              dbfull()->SyncWAL();
+            }
+            return 0;
+        });
+    }
+    for (auto& t: threads) {
+        t.join();
+    }
+    uint64_t bytes_num = options.statistics->getTickerCount(
+        ROCKSDB_NAMESPACE::Tickers::WAL_FILE_BYTES);
+    // written WAL size should less than 100KB (even included HEADER & FOOTER overhead)
+    ASSERT_LE(bytes_num, 1024 * 100);
+}
+
 INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
                         testing::Values(DBTestBase::kDefault,
                                         DBTestBase::kConcurrentWALWrites,
                                         DBTestBase::kPipelinedWrite));
 
-}  // namespace rocksdb
+}  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
-  rocksdb::port::InstallStackTraceHandler();
+  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();
 }