1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
13 #include "db/db_impl.h"
14 #include "db/write_callback.h"
15 #include "rocksdb/db.h"
16 #include "rocksdb/write_batch.h"
17 #include "port/port.h"
18 #include "util/logging.h"
19 #include "util/random.h"
20 #include "util/sync_point.h"
21 #include "util/testharness.h"
27 class WriteCallbackTest
: public testing::Test
{
32 dbname
= test::TmpDir() + "/write_callback_testdb";
36 class WriteCallbackTestWriteCallback1
: public WriteCallback
{
38 bool was_called
= false;
40 Status
Callback(DB
*db
) override
{
43 // Make sure db is a DBImpl
44 DBImpl
* db_impl
= dynamic_cast<DBImpl
*> (db
);
45 if (db_impl
== nullptr) {
46 return Status::InvalidArgument("");
52 bool AllowWriteBatching() override
{ return true; }
55 class WriteCallbackTestWriteCallback2
: public WriteCallback
{
57 Status
Callback(DB
*db
) override
{
58 return Status::Busy();
60 bool AllowWriteBatching() override
{ return true; }
63 class MockWriteCallback
: public WriteCallback
{
65 bool should_fail_
= false;
66 bool was_called_
= false;
67 bool allow_batching_
= false;
69 Status
Callback(DB
* db
) override
{
72 return Status::Busy();
78 bool AllowWriteBatching() override
{ return allow_batching_
; }
81 TEST_F(WriteCallbackTest
, WriteWithCallbackTest
) {
83 WriteOP(bool should_fail
= false) { callback_
.should_fail_
= should_fail
; }
85 void Put(const string
& key
, const string
& val
) {
86 kvs_
.push_back(std::make_pair(key
, val
));
87 write_batch_
.Put(key
, val
);
93 callback_
.was_called_
= false;
96 MockWriteCallback callback_
;
97 WriteBatch write_batch_
;
98 std::vector
<std::pair
<string
, string
>> kvs_
;
101 std::vector
<std::vector
<WriteOP
>> write_scenarios
= {
108 {false, false, false},
110 {false, true, false},
112 {true, false, false, false, false},
113 {false, false, false, false, true},
114 {false, false, true, false, true},
117 for (auto& allow_parallel
: {true, false}) {
118 for (auto& allow_batching
: {true, false}) {
119 for (auto& enable_WAL
: {true, false}) {
120 for (auto& write_group
: write_scenarios
) {
122 options
.create_if_missing
= true;
123 options
.allow_concurrent_memtable_write
= allow_parallel
;
125 ReadOptions read_options
;
129 DestroyDB(dbname
, options
);
130 ASSERT_OK(DB::Open(options
, dbname
, &db
));
132 db_impl
= dynamic_cast<DBImpl
*>(db
);
133 ASSERT_TRUE(db_impl
);
135 std::atomic
<uint64_t> threads_waiting(0);
136 std::atomic
<uint64_t> seq(db_impl
->GetLatestSequenceNumber());
137 ASSERT_EQ(db_impl
->GetLatestSequenceNumber(), 0);
139 rocksdb::SyncPoint::GetInstance()->SetCallBack(
140 "WriteThread::JoinBatchGroup:Wait", [&](void* arg
) {
141 uint64_t cur_threads_waiting
= 0;
142 bool is_leader
= false;
143 bool is_last
= false;
147 cur_threads_waiting
= threads_waiting
.load();
148 is_leader
= (cur_threads_waiting
== 0);
149 is_last
= (cur_threads_waiting
== write_group
.size() - 1);
150 } while (!threads_waiting
.compare_exchange_strong(
151 cur_threads_waiting
, cur_threads_waiting
+ 1));
154 auto* writer
= reinterpret_cast<WriteThread::Writer
*>(arg
);
157 ASSERT_TRUE(writer
->state
==
158 WriteThread::State::STATE_GROUP_LEADER
);
160 ASSERT_TRUE(writer
->state
== WriteThread::State::STATE_INIT
);
163 // (meta test) the first WriteOP should indeed be the first
164 // and the last should be the last (all others can be out of
167 ASSERT_TRUE(writer
->callback
->Callback(nullptr).ok() ==
168 !write_group
.front().callback_
.should_fail_
);
169 } else if (is_last
) {
170 ASSERT_TRUE(writer
->callback
->Callback(nullptr).ok() ==
171 !write_group
.back().callback_
.should_fail_
);
175 while (threads_waiting
.load() < write_group
.size()) {
179 rocksdb::SyncPoint::GetInstance()->SetCallBack(
180 "WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg
) {
182 auto* writer
= reinterpret_cast<WriteThread::Writer
*>(arg
);
184 if (!allow_batching
) {
185 // no batching so everyone should be a leader
186 ASSERT_TRUE(writer
->state
==
187 WriteThread::State::STATE_GROUP_LEADER
);
188 } else if (!allow_parallel
) {
189 ASSERT_TRUE(writer
->state
==
190 WriteThread::State::STATE_COMPLETED
);
194 std::atomic
<uint32_t> thread_num(0);
195 std::atomic
<char> dummy_key(0);
196 std::function
<void()> write_with_callback_func
= [&]() {
197 uint32_t i
= thread_num
.fetch_add(1);
200 // leaders gotta lead
201 while (i
> 0 && threads_waiting
.load() < 1) {
205 while (i
== write_group
.size() - 1 &&
206 threads_waiting
.load() < write_group
.size() - 1) {
209 auto& write_op
= write_group
.at(i
);
211 write_op
.callback_
.allow_batching_
= allow_batching
;
214 for (uint32_t j
= 0; j
< rnd
.Next() % 50; j
++) {
218 my_key
= dummy_key
.load();
219 } while (!dummy_key
.compare_exchange_strong(my_key
, my_key
+ 1));
221 string
skey(5, my_key
);
222 string
sval(10, my_key
);
223 write_op
.Put(skey
, sval
);
225 if (!write_op
.callback_
.should_fail_
) {
230 WriteOptions woptions
;
231 woptions
.disableWAL
= !enable_WAL
;
232 woptions
.sync
= enable_WAL
;
233 Status s
= db_impl
->WriteWithCallback(
234 woptions
, &write_op
.write_batch_
, &write_op
.callback_
);
236 if (write_op
.callback_
.should_fail_
) {
237 ASSERT_TRUE(s
.IsBusy());
243 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
246 std::vector
<port::Thread
> threads
;
247 for (uint32_t i
= 0; i
< write_group
.size(); i
++) {
248 threads
.emplace_back(write_with_callback_func
);
250 for (auto& t
: threads
) {
254 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
258 for (auto& w
: write_group
) {
259 ASSERT_TRUE(w
.callback_
.was_called_
);
260 for (auto& kvp
: w
.kvs_
) {
261 if (w
.callback_
.should_fail_
) {
263 db
->Get(read_options
, kvp
.first
, &value
).IsNotFound());
265 ASSERT_OK(db
->Get(read_options
, kvp
.first
, &value
));
266 ASSERT_EQ(value
, kvp
.second
);
271 ASSERT_EQ(seq
.load(), db_impl
->GetLatestSequenceNumber());
274 DestroyDB(dbname
, options
);
281 TEST_F(WriteCallbackTest
, WriteCallBackTest
) {
283 WriteOptions write_options
;
284 ReadOptions read_options
;
289 DestroyDB(dbname
, options
);
291 options
.create_if_missing
= true;
292 Status s
= DB::Open(options
, dbname
, &db
);
295 db_impl
= dynamic_cast<DBImpl
*> (db
);
296 ASSERT_TRUE(db_impl
);
300 wb
.Put("a", "value.a");
303 // Test a simple Write
304 s
= db
->Write(write_options
, &wb
);
307 s
= db
->Get(read_options
, "a", &value
);
309 ASSERT_EQ("value.a", value
);
311 // Test WriteWithCallback
312 WriteCallbackTestWriteCallback1 callback1
;
315 wb2
.Put("a", "value.a2");
317 s
= db_impl
->WriteWithCallback(write_options
, &wb2
, &callback1
);
319 ASSERT_TRUE(callback1
.was_called
);
321 s
= db
->Get(read_options
, "a", &value
);
323 ASSERT_EQ("value.a2", value
);
325 // Test WriteWithCallback for a callback that fails
326 WriteCallbackTestWriteCallback2 callback2
;
329 wb3
.Put("a", "value.a3");
331 s
= db_impl
->WriteWithCallback(write_options
, &wb3
, &callback2
);
334 s
= db
->Get(read_options
, "a", &value
);
336 ASSERT_EQ("value.a2", value
);
339 DestroyDB(dbname
, options
);
342 } // namespace rocksdb
344 int main(int argc
, char** argv
) {
345 ::testing::InitGoogleTest(&argc
, argv
);
346 return RUN_ALL_TESTS();
352 int main(int argc
, char** argv
) {
354 "SKIPPED as WriteWithCallback is not supported in ROCKSDB_LITE\n");
358 #endif // !ROCKSDB_LITE