// (found in the LICENSE.Apache file in the root directory).
//
#include <assert.h>
-#include <memory>
+
#include <iostream>
+#include <memory>
#include "db/db_impl/db_impl.h"
#include "db/dbformat.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/utilities/db_ttl.h"
#include "test_util/testharness.h"
+#include "util/coding.h"
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
}
}
+void testCountersWithFlushAndCompaction(Counters& counters, DB* db) {
+ ASSERT_OK(db->Put({}, "1", "1"));
+ ASSERT_OK(db->Flush(FlushOptions()));
+
+ std::atomic<int> cnt{0};
+ const auto get_thread_id = [&cnt]() {
+ thread_local int thread_id{cnt++};
+ return thread_id;
+ };
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->SetCallBack(
+ "VersionSet::LogAndApply:BeforeWriterWaiting", [&](void* /*arg*/) {
+ int thread_id = get_thread_id();
+ if (1 == thread_id) {
+ TEST_SYNC_POINT(
+ "testCountersWithFlushAndCompaction::bg_compact_thread:0");
+ } else if (2 == thread_id) {
+ TEST_SYNC_POINT(
+ "testCountersWithFlushAndCompaction::bg_flush_thread:0");
+ }
+ });
+ SyncPoint::GetInstance()->SetCallBack(
+ "VersionSet::LogAndApply:WriteManifest", [&](void* /*arg*/) {
+ int thread_id = get_thread_id();
+ if (0 == thread_id) {
+ TEST_SYNC_POINT(
+ "testCountersWithFlushAndCompaction::set_options_thread:0");
+ TEST_SYNC_POINT(
+ "testCountersWithFlushAndCompaction::set_options_thread:1");
+ }
+ });
+ SyncPoint::GetInstance()->SetCallBack(
+ "VersionSet::LogAndApply:WakeUpAndDone", [&](void* arg) {
+ auto* mutex = reinterpret_cast<InstrumentedMutex*>(arg);
+ mutex->AssertHeld();
+ int thread_id = get_thread_id();
+ ASSERT_EQ(2, thread_id);
+ mutex->Unlock();
+ TEST_SYNC_POINT(
+ "testCountersWithFlushAndCompaction::bg_flush_thread:1");
+ TEST_SYNC_POINT(
+ "testCountersWithFlushAndCompaction::bg_flush_thread:2");
+ mutex->Lock();
+ });
+ SyncPoint::GetInstance()->LoadDependency({
+ {"testCountersWithFlushAndCompaction::set_options_thread:0",
+ "testCountersWithCompactionAndFlush:BeforeCompact"},
+ {"testCountersWithFlushAndCompaction::bg_compact_thread:0",
+ "testCountersWithFlushAndCompaction:BeforeIncCounters"},
+ {"testCountersWithFlushAndCompaction::bg_flush_thread:0",
+ "testCountersWithFlushAndCompaction::set_options_thread:1"},
+ {"testCountersWithFlushAndCompaction::bg_flush_thread:1",
+ "testCountersWithFlushAndCompaction:BeforeVerification"},
+ {"testCountersWithFlushAndCompaction:AfterGet",
+ "testCountersWithFlushAndCompaction::bg_flush_thread:2"},
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ port::Thread set_options_thread([&]() {
+ ASSERT_OK(reinterpret_cast<DBImpl*>(db)->SetOptions(
+ {{"disable_auto_compactions", "false"}}));
+ });
+ TEST_SYNC_POINT("testCountersWithCompactionAndFlush:BeforeCompact");
+ port::Thread compact_thread([&]() {
+ ASSERT_OK(reinterpret_cast<DBImpl*>(db)->CompactRange(
+ CompactRangeOptions(), db->DefaultColumnFamily(), nullptr, nullptr));
+ });
+
+ TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeIncCounters");
+ counters.add("test-key", 1);
+
+ FlushOptions flush_opts;
+ flush_opts.wait = false;
+ ASSERT_OK(db->Flush(flush_opts));
+
+ TEST_SYNC_POINT("testCountersWithFlushAndCompaction:BeforeVerification");
+ std::string expected;
+ PutFixed64(&expected, 1);
+ std::string actual;
+ Status s = db->Get(ReadOptions(), "test-key", &actual);
+ TEST_SYNC_POINT("testCountersWithFlushAndCompaction:AfterGet");
+ set_options_thread.join();
+ compact_thread.join();
+ ASSERT_OK(s);
+ ASSERT_EQ(expected, actual);
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
void testSuccessiveMerge(Counters& counters, size_t max_num_merges,
size_t num_merges) {
runTest(test::PerThreadDBPath("merge_testdbttl"),
true); // Run test on TTL database
}
+
+TEST_F(MergeTest, MergeWithCompactionAndFlush) {
+ const std::string dbname =
+ test::PerThreadDBPath("merge_with_compaction_and_flush");
+ {
+ auto db = OpenDb(dbname);
+ {
+ MergeBasedCounters counters(db, 0);
+ testCountersWithFlushAndCompaction(counters, db.get());
+ }
+ }
+ DestroyDB(dbname, Options());
+}
#endif // !ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE