#include <utility>
#include <vector>
-#include "db/db_impl.h"
+#include "db/db_impl/db_impl.h"
#include "db/write_callback.h"
+#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/write_batch.h"
-#include "port/port.h"
+#include "test_util/sync_point.h"
+#include "test_util/testharness.h"
#include "util/random.h"
-#include "util/sync_point.h"
-#include "util/testharness.h"
using std::string;
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
class WriteCallbackTest : public testing::Test {
public:
{false, false, true, false, true},
};
+ for (auto& unordered_write : {true, false}) {
for (auto& seq_per_batch : {true, false}) {
for (auto& two_queues : {true, false}) {
for (auto& allow_parallel : {true, false}) {
for (auto& write_group : write_scenarios) {
Options options;
options.create_if_missing = true;
+ options.unordered_write = unordered_write;
options.allow_concurrent_memtable_write = allow_parallel;
options.enable_pipelined_write = enable_pipelined_write;
options.two_write_queues = two_queues;
+ // Skip unsupported combinations
if (options.enable_pipelined_write && seq_per_batch) {
- // This combination is not supported
continue;
}
if (options.enable_pipelined_write && options.two_write_queues) {
- // This combination is not supported
+ continue;
+ }
+ if (options.unordered_write &&
+ !options.allow_concurrent_memtable_write) {
+ continue;
+ }
+ if (options.unordered_write && options.enable_pipelined_write) {
continue;
}
std::atomic<uint64_t> seq(db_impl->GetLatestSequenceNumber());
ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0);
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Start", [&](void*) {
uint64_t cur_threads_joining = threads_joining.fetch_add(1);
// Wait for the last joined writer to link to the queue.
});
// Verification once writers call JoinBatchGroup.
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
uint64_t cur_threads_linked = threads_linked.fetch_add(1);
bool is_leader = false;
}
});
- rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) {
// check my state
auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
PublishSeqCallback(DBImpl* db_impl_in)
: db_impl_(db_impl_in) {}
Status Callback(SequenceNumber last_seq, bool /*not used*/,
- uint64_t) override {
+ uint64_t, size_t /*index*/,
+ size_t /*total*/) override {
db_impl_->SetLastPublishedSequence(last_seq);
return Status::OK();
}
}
};
- rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// do all the writes
std::vector<port::Thread> threads;
t.join();
}
- rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
// check for keys
string value;
}
}
}
-}
-}
+ }
+ }
+ }
}
TEST_F(WriteCallbackTest, WriteCallBackTest) {
DestroyDB(dbname, options);
}
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);