1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #ifndef __STDC_FORMAT_MACROS
9 #define __STDC_FORMAT_MACROS
18 #include "db/db_impl.h"
19 #include "rocksdb/db.h"
20 #include "rocksdb/options.h"
21 #include "rocksdb/utilities/transaction.h"
22 #include "rocksdb/utilities/transaction_db.h"
23 #include "table/mock_table.h"
24 #include "util/fault_injection_test_env.h"
25 #include "util/random.h"
26 #include "util/string_util.h"
27 #include "util/sync_point.h"
28 #include "util/testharness.h"
29 #include "util/testutil.h"
30 #include "util/transaction_test_util.h"
31 #include "utilities/merge_operators.h"
32 #include "utilities/merge_operators/string_append/stringappend.h"
33 #include "utilities/transactions/pessimistic_transaction_db.h"
35 #include "port/port.h"
39 // Return true if the ith bit is set in combination represented by comb
40 bool IsInCombination(size_t i
, size_t comb
) { return comb
& (size_t(1) << i
); }
42 class TransactionTestBase
: public ::testing::Test
{
45 FaultInjectionTestEnv
* env
;
49 TransactionDBOptions txn_db_options
;
50 bool use_stackable_db_
;
52 TransactionTestBase(bool use_stackable_db
, bool two_write_queue
,
53 TxnDBWritePolicy write_policy
)
54 : db(nullptr), env(nullptr), use_stackable_db_(use_stackable_db
) {
55 options
.create_if_missing
= true;
56 options
.max_write_buffer_number
= 2;
57 options
.write_buffer_size
= 4 * 1024;
58 options
.level0_file_num_compaction_trigger
= 2;
59 options
.merge_operator
= MergeOperators::CreateFromStringId("stringappend");
60 env
= new FaultInjectionTestEnv(Env::Default());
62 options
.two_write_queues
= two_write_queue
;
63 dbname
= test::PerThreadDBPath("transaction_testdb");
65 DestroyDB(dbname
, options
);
66 txn_db_options
.transaction_lock_timeout
= 0;
67 txn_db_options
.default_lock_timeout
= 0;
68 txn_db_options
.write_policy
= write_policy
;
69 txn_db_options
.rollback_merge_operands
= true;
71 if (use_stackable_db
== false) {
72 s
= TransactionDB::Open(options
, txn_db_options
, dbname
, &db
);
74 s
= OpenWithStackableDB();
79 ~TransactionTestBase() {
82 // This is to skip the assert statement in FaultInjectionTestEnv. There
83 // seems to be a bug in btrfs that the makes readdir return recently
84 // unlink-ed files. By using the default fs we simply ignore errors resulted
85 // from attempting to delete such files in DestroyDB.
86 options
.env
= Env::Default();
87 DestroyDB(dbname
, options
);
91 Status
ReOpenNoDelete() {
94 env
->AssertNoOpenFile();
95 env
->DropUnsyncedFileData();
98 if (use_stackable_db_
== false) {
99 s
= TransactionDB::Open(options
, txn_db_options
, dbname
, &db
);
101 s
= OpenWithStackableDB();
103 assert(!s
.ok() || db
!= nullptr);
107 Status
ReOpenNoDelete(std::vector
<ColumnFamilyDescriptor
>& cfs
,
108 std::vector
<ColumnFamilyHandle
*>* handles
) {
109 for (auto h
: *handles
) {
115 env
->AssertNoOpenFile();
116 env
->DropUnsyncedFileData();
119 if (use_stackable_db_
== false) {
120 s
= TransactionDB::Open(options
, txn_db_options
, dbname
, cfs
, handles
,
123 s
= OpenWithStackableDB(cfs
, handles
);
125 assert(db
!= nullptr);
132 DestroyDB(dbname
, options
);
134 if (use_stackable_db_
== false) {
135 s
= TransactionDB::Open(options
, txn_db_options
, dbname
, &db
);
137 s
= OpenWithStackableDB();
139 assert(db
!= nullptr);
143 Status
OpenWithStackableDB(std::vector
<ColumnFamilyDescriptor
>& cfs
,
144 std::vector
<ColumnFamilyHandle
*>* handles
) {
145 std::vector
<size_t> compaction_enabled_cf_indices
;
146 TransactionDB::PrepareWrap(&options
, &cfs
, &compaction_enabled_cf_indices
);
147 DB
* root_db
= nullptr;
148 Options
options_copy(options
);
149 const bool use_seq_per_batch
=
150 txn_db_options
.write_policy
== WRITE_PREPARED
||
151 txn_db_options
.write_policy
== WRITE_UNPREPARED
;
152 const bool use_batch_per_txn
=
153 txn_db_options
.write_policy
== WRITE_COMMITTED
||
154 txn_db_options
.write_policy
== WRITE_PREPARED
;
155 Status s
= DBImpl::Open(options_copy
, dbname
, cfs
, handles
, &root_db
,
156 use_seq_per_batch
, use_batch_per_txn
);
157 StackableDB
* stackable_db
= new StackableDB(root_db
);
159 assert(root_db
!= nullptr);
160 s
= TransactionDB::WrapStackableDB(stackable_db
, txn_db_options
,
161 compaction_enabled_cf_indices
,
166 // just in case it was not deleted (and not set to nullptr).
172 Status
OpenWithStackableDB() {
173 std::vector
<size_t> compaction_enabled_cf_indices
;
174 std::vector
<ColumnFamilyDescriptor
> column_families
{ColumnFamilyDescriptor(
175 kDefaultColumnFamilyName
, ColumnFamilyOptions(options
))};
177 TransactionDB::PrepareWrap(&options
, &column_families
,
178 &compaction_enabled_cf_indices
);
179 std::vector
<ColumnFamilyHandle
*> handles
;
180 DB
* root_db
= nullptr;
181 Options
options_copy(options
);
182 const bool use_seq_per_batch
=
183 txn_db_options
.write_policy
== WRITE_PREPARED
||
184 txn_db_options
.write_policy
== WRITE_UNPREPARED
;
185 const bool use_batch_per_txn
=
186 txn_db_options
.write_policy
== WRITE_COMMITTED
||
187 txn_db_options
.write_policy
== WRITE_PREPARED
;
188 Status s
= DBImpl::Open(options_copy
, dbname
, column_families
, &handles
,
189 &root_db
, use_seq_per_batch
, use_batch_per_txn
);
194 StackableDB
* stackable_db
= new StackableDB(root_db
);
195 assert(root_db
!= nullptr);
196 assert(handles
.size() == 1);
197 s
= TransactionDB::WrapStackableDB(stackable_db
, txn_db_options
,
198 compaction_enabled_cf_indices
, handles
,
203 // just in case it was not deleted (and not set to nullptr).
209 std::atomic
<size_t> linked
= {0};
210 std::atomic
<size_t> exp_seq
= {0};
211 std::atomic
<size_t> commit_writes
= {0};
212 std::atomic
<size_t> expected_commits
= {0};
213 std::function
<void(size_t, Status
)> txn_t0_with_status
= [&](size_t index
,
215 // Test DB's internal txn. It involves no prepare phase nor a commit marker.
217 auto s
= db
->Put(wopts
, "key" + std::to_string(index
), "value");
219 if (txn_db_options
.write_policy
== TxnDBWritePolicy::WRITE_COMMITTED
) {
220 // Consume one seq per key
223 // Consume one seq per batch
225 if (options
.two_write_queues
) {
226 // Consume one seq for commit
231 std::function
<void(size_t)> txn_t0
= [&](size_t index
) {
232 return txn_t0_with_status(index
, Status::OK());
234 std::function
<void(size_t)> txn_t1
= [&](size_t index
) {
235 // Testing directly writing a write batch. Functionality-wise it is
236 // equivalent to commit without prepare.
238 auto istr
= std::to_string(index
);
239 ASSERT_OK(wb
.Put("k1" + istr
, "v1"));
240 ASSERT_OK(wb
.Put("k2" + istr
, "v2"));
241 ASSERT_OK(wb
.Put("k3" + istr
, "v3"));
243 auto s
= db
->Write(wopts
, &wb
);
244 if (txn_db_options
.write_policy
== TxnDBWritePolicy::WRITE_COMMITTED
) {
245 // Consume one seq per key
248 // Consume one seq per batch
250 if (options
.two_write_queues
) {
251 // Consume one seq for commit
257 std::function
<void(size_t)> txn_t2
= [&](size_t index
) {
258 // Commit without prepare. It should write to DB without a commit marker.
259 TransactionOptions txn_options
;
260 WriteOptions write_options
;
261 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
262 auto istr
= std::to_string(index
);
263 ASSERT_OK(txn
->SetName("xid" + istr
));
264 ASSERT_OK(txn
->Put(Slice("foo" + istr
), Slice("bar")));
265 ASSERT_OK(txn
->Put(Slice("foo2" + istr
), Slice("bar2")));
266 ASSERT_OK(txn
->Put(Slice("foo3" + istr
), Slice("bar3")));
267 ASSERT_OK(txn
->Put(Slice("foo4" + istr
), Slice("bar4")));
268 ASSERT_OK(txn
->Commit());
269 if (txn_db_options
.write_policy
== TxnDBWritePolicy::WRITE_COMMITTED
) {
270 // Consume one seq per key
273 // Consume one seq per batch
275 if (options
.two_write_queues
) {
276 // Consume one seq for commit
282 std::function
<void(size_t)> txn_t3
= [&](size_t index
) {
283 // A full 2pc txn that also involves a commit marker.
284 TransactionOptions txn_options
;
285 WriteOptions write_options
;
286 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
287 auto istr
= std::to_string(index
);
288 ASSERT_OK(txn
->SetName("xid" + istr
));
289 ASSERT_OK(txn
->Put(Slice("foo" + istr
), Slice("bar")));
290 ASSERT_OK(txn
->Put(Slice("foo2" + istr
), Slice("bar2")));
291 ASSERT_OK(txn
->Put(Slice("foo3" + istr
), Slice("bar3")));
292 ASSERT_OK(txn
->Put(Slice("foo4" + istr
), Slice("bar4")));
293 ASSERT_OK(txn
->Put(Slice("foo5" + istr
), Slice("bar5")));
295 ASSERT_OK(txn
->Prepare());
297 ASSERT_OK(txn
->Commit());
298 if (txn_db_options
.write_policy
== TxnDBWritePolicy::WRITE_COMMITTED
) {
299 // Consume one seq per key
302 // Consume one seq per batch
304 // Consume one seq per commit marker
309 std::function
<void(size_t)> txn_t4
= [&](size_t index
) {
310 // A full 2pc txn that also involves a commit marker.
311 TransactionOptions txn_options
;
312 WriteOptions write_options
;
313 Transaction
* txn
= db
->BeginTransaction(write_options
, txn_options
);
314 auto istr
= std::to_string(index
);
315 ASSERT_OK(txn
->SetName("xid" + istr
));
316 ASSERT_OK(txn
->Put(Slice("foo" + istr
), Slice("bar")));
317 ASSERT_OK(txn
->Put(Slice("foo2" + istr
), Slice("bar2")));
318 ASSERT_OK(txn
->Put(Slice("foo3" + istr
), Slice("bar3")));
319 ASSERT_OK(txn
->Put(Slice("foo4" + istr
), Slice("bar4")));
320 ASSERT_OK(txn
->Put(Slice("foo5" + istr
), Slice("bar5")));
322 ASSERT_OK(txn
->Prepare());
324 ASSERT_OK(txn
->Rollback());
325 if (txn_db_options
.write_policy
== TxnDBWritePolicy::WRITE_COMMITTED
) {
326 // No seq is consumed for deleting the txn buffer
329 // Consume one seq per batch
331 // Consume one seq per rollback batch
333 if (options
.two_write_queues
) {
334 // Consume one seq for rollback commit
341 // Test that we can change write policy after a clean shutdown (which would
343 void CrossCompatibilityTest(TxnDBWritePolicy from_policy
,
344 TxnDBWritePolicy to_policy
, bool empty_wal
) {
345 TransactionOptions txn_options
;
346 ReadOptions read_options
;
347 WriteOptions write_options
;
350 options
.write_buffer_size
= 1024; // To create more sst files
351 std::unordered_map
<std::string
, std::string
> committed_kvs
;
354 txn_db_options
.write_policy
= from_policy
;
357 for (int i
= 0; i
< 1024; i
++) {
358 auto istr
= std::to_string(index
);
359 auto k
= Slice("foo-" + istr
).ToString();
360 auto v
= Slice("bar-" + istr
).ToString();
361 // For test the duplicate keys
362 auto v2
= Slice("bar2-" + istr
).ToString();
363 auto type
= rnd
.Uniform(4);
366 committed_kvs
[k
] = v
;
367 ASSERT_OK(db
->Put(write_options
, k
, v
));
368 committed_kvs
[k
] = v2
;
369 ASSERT_OK(db
->Put(write_options
, k
, v2
));
373 committed_kvs
[k
] = v
;
375 committed_kvs
[k
] = v2
;
377 ASSERT_OK(db
->Write(write_options
, &wb
));
382 txn
= db
->BeginTransaction(write_options
, txn_options
);
383 ASSERT_OK(txn
->SetName("xid" + istr
));
384 committed_kvs
[k
] = v
;
385 ASSERT_OK(txn
->Put(k
, v
));
386 committed_kvs
[k
] = v2
;
387 ASSERT_OK(txn
->Put(k
, v2
));
390 ASSERT_OK(txn
->Prepare());
392 ASSERT_OK(txn
->Commit());
402 txn_db_options
.write_policy
= to_policy
;
403 auto db_impl
= reinterpret_cast<DBImpl
*>(db
->GetRootDB());
404 // Before upgrade/downgrade the WAL must be emptied
406 db_impl
->TEST_FlushMemTable();
408 db_impl
->FlushWAL(true);
410 auto s
= ReOpenNoDelete();
414 // Test that we can detect the WAL that is produced by an incompatible
415 // WritePolicy and fail fast before mis-interpreting the WAL.
416 ASSERT_TRUE(s
.IsNotSupported());
419 db_impl
= reinterpret_cast<DBImpl
*>(db
->GetRootDB());
420 // Check that WAL is empty
421 VectorLogPtr log_files
;
422 db_impl
->GetSortedWalFiles(log_files
);
423 ASSERT_EQ(0, log_files
.size());
425 for (auto& kv
: committed_kvs
) {
427 s
= db
->Get(read_options
, kv
.first
, &value
);
428 if (s
.IsNotFound()) {
429 printf("key = %s\n", kv
.first
.c_str());
432 if (kv
.second
!= value
) {
433 printf("key = %s\n", kv
.first
.c_str());
435 ASSERT_EQ(kv
.second
, value
);
440 class TransactionTest
: public TransactionTestBase
,
441 virtual public ::testing::WithParamInterface
<
442 std::tuple
<bool, bool, TxnDBWritePolicy
>> {
445 : TransactionTestBase(std::get
<0>(GetParam()), std::get
<1>(GetParam()),
446 std::get
<2>(GetParam())){};
449 class TransactionStressTest
: public TransactionTest
{};
451 class MySQLStyleTransactionTest
: public TransactionTest
{};
453 } // namespace rocksdb