// (found in the LICENSE.Apache file in the root directory).
#include <atomic>
+#include <fstream>
#include <memory>
#include <thread>
#include <vector>
-#include <fstream>
+
#include "db/db_test_util.h"
#include "db/write_batch_internal.h"
#include "db/write_thread.h"
#include "port/port.h"
#include "port/stack_trace.h"
-#include "test_util/fault_injection_test_env.h"
#include "test_util/sync_point.h"
+#include "util/random.h"
#include "util/string_util.h"
+#include "utilities/fault_injection_env.h"
namespace ROCKSDB_NAMESPACE {
// Test variations of WriteImpl.
class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
public:
- DBWriteTest() : DBTestBase("/db_write_test") {}
+ DBWriteTest() : DBTestBase("/db_write_test", /*env_do_fsync=*/true) {}
Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
}
+TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) {
+ Options options = GetOptions();
+ options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger =
+ 4;
+ std::vector<port::Thread> threads;
+ std::atomic<int> thread_num(0);
+ port::Mutex mutex;
+ port::CondVar cv(&mutex);
+ // Guarded by mutex
+ int writers = 0;
+
+ Reopen(options);
+
+ std::function<void()> write_slowdown_func = [&]() {
+ int a = thread_num.fetch_add(1);
+ std::string key = "foo" + std::to_string(a);
+ WriteOptions wo;
+ wo.no_slowdown = false;
+ dbfull()->Put(wo, key, "bar");
+ };
+ std::function<void()> write_no_slowdown_func = [&]() {
+ int a = thread_num.fetch_add(1);
+ std::string key = "foo" + std::to_string(a);
+ WriteOptions wo;
+ wo.no_slowdown = true;
+ dbfull()->Put(wo, key, "bar");
+ };
+ std::function<void(void*)> unblock_main_thread_func = [&](void*) {
+ mutex.Lock();
+ ++writers;
+ cv.SignalAll();
+ mutex.Unlock();
+ };
+
+ // Create 3 L0 files and schedule 4th without waiting
+ Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+ Flush();
+ Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+ Flush();
+ Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+ Flush();
+ Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar");
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:1",
+ "DBImpl::BackgroundCallFlush:start"},
+ {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:2",
+ "DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"},
+ // Make compaction start wait for the write stall to be detected and
+ // implemented by a write group leader
+ {"DBWriteTest::WriteStallRemoveNoSlowdownWrite:3",
+ "BackgroundCallCompaction:0"}});
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ // Schedule creation of 4th L0 file without waiting. This will seal the
+ // memtable and then wait for a sync point before writing the file. We need
+ // to do it this way because SwitchMemtable() needs to enter the
+ // write_thread
+ FlushOptions fopt;
+ fopt.wait = false;
+ dbfull()->Flush(fopt);
+
+ // Create a mix of slowdown/no_slowdown write threads
+ mutex.Lock();
+ // First leader
+ threads.emplace_back(write_slowdown_func);
+ while (writers != 1) {
+ cv.Wait();
+ }
+
+ // Second leader. Will stall writes
+ // Build a writers list with no slowdown in the middle:
+ // +-------------+
+ // | slowdown +<----+ newest
+ // +--+----------+
+ // |
+ // v
+ // +--+----------+
+ // | no slowdown |
+ // +--+----------+
+ // |
+ // v
+ // +--+----------+
+ // | slowdown +
+ // +-------------+
+ threads.emplace_back(write_slowdown_func);
+ while (writers != 2) {
+ cv.Wait();
+ }
+ threads.emplace_back(write_no_slowdown_func);
+ while (writers != 3) {
+ cv.Wait();
+ }
+ threads.emplace_back(write_slowdown_func);
+ while (writers != 4) {
+ cv.Wait();
+ }
+
+ mutex.Unlock();
+
+ TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1");
+ dbfull()->TEST_WaitForFlushMemTable(nullptr);
+ // This would have triggered a write stall. Unblock the write group leader
+ TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2");
+ // The leader is going to create missing newer links. When the leader
+ // finishes, the next leader is going to delay writes and fail writers with
+ // no_slowdown
+
+ TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:3");
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
Options options = GetOptions();
options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger = 4;
std::atomic<int> thread_num(0);
port::Mutex mutex;
port::CondVar cv(&mutex);
+ // Guarded by mutex
+ int writers = 0;
Reopen(options);
};
std::function<void(void *)> unblock_main_thread_func = [&](void *) {
mutex.Lock();
+ ++writers;
cv.SignalAll();
mutex.Unlock();
};
mutex.Lock();
// First leader
threads.emplace_back(write_slowdown_func);
- cv.Wait();
+ while (writers != 1) {
+ cv.Wait();
+ }
// Second leader. Will stall writes
threads.emplace_back(write_slowdown_func);
- cv.Wait();
threads.emplace_back(write_no_slowdown_func);
- cv.Wait();
threads.emplace_back(write_slowdown_func);
- cv.Wait();
threads.emplace_back(write_no_slowdown_func);
- cv.Wait();
threads.emplace_back(write_slowdown_func);
- cv.Wait();
+ while (writers != 6) {
+ cv.Wait();
+ }
mutex.Unlock();
TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
for (auto& t : threads) {
t.join();
}
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
constexpr int kNumThreads = 5;
std::unique_ptr<FaultInjectionTestEnv> mock_env(
- new FaultInjectionTestEnv(Env::Default()));
+ new FaultInjectionTestEnv(env_));
Options options = GetOptions();
options.env = mock_env.get();
Reopen(options);
TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
std::unique_ptr<FaultInjectionTestEnv> mock_env(
- new FaultInjectionTestEnv(Env::Default()));
+ new FaultInjectionTestEnv(env_));
Options options = GetOptions();
options.env = mock_env.get();
Reopen(options);
TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
Random rnd(301);
std::unique_ptr<FaultInjectionTestEnv> mock_env(
- new FaultInjectionTestEnv(Env::Default()));
+ new FaultInjectionTestEnv(env_));
Options options = GetOptions();
options.env = mock_env.get();
options.writable_file_max_buffer_size = 4 * 1024 * 1024;
mock_env->SetFilesystemActive(false, Status::IOError("Not active"));
Status s;
for (int i = 0; i < 4 * 512; ++i) {
- s = Put(Key(i), RandomString(&rnd, 1024));
+ s = Put(Key(i), rnd.RandomString(1024));
if (!s.ok()) {
break;
}