]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/transaction_test.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / utilities / transactions / transaction_test.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #pragma once
7
8 #ifndef __STDC_FORMAT_MACROS
9 #define __STDC_FORMAT_MACROS
10 #endif
11
12 #include <inttypes.h>
13 #include <algorithm>
14 #include <functional>
15 #include <string>
16 #include <thread>
17
18 #include "db/db_impl.h"
19 #include "rocksdb/db.h"
20 #include "rocksdb/options.h"
21 #include "rocksdb/utilities/transaction.h"
22 #include "rocksdb/utilities/transaction_db.h"
23 #include "table/mock_table.h"
24 #include "util/fault_injection_test_env.h"
25 #include "util/random.h"
26 #include "util/string_util.h"
27 #include "util/sync_point.h"
28 #include "util/testharness.h"
29 #include "util/testutil.h"
30 #include "util/transaction_test_util.h"
31 #include "utilities/merge_operators.h"
32 #include "utilities/merge_operators/string_append/stringappend.h"
33 #include "utilities/transactions/pessimistic_transaction_db.h"
34
35 #include "port/port.h"
36
37 namespace rocksdb {
38
39 // Return true if the ith bit is set in combination represented by comb
40 bool IsInCombination(size_t i, size_t comb) { return comb & (size_t(1) << i); }
41
42 class TransactionTestBase : public ::testing::Test {
43 public:
44 TransactionDB* db;
45 FaultInjectionTestEnv* env;
46 std::string dbname;
47 Options options;
48
49 TransactionDBOptions txn_db_options;
50 bool use_stackable_db_;
51
52 TransactionTestBase(bool use_stackable_db, bool two_write_queue,
53 TxnDBWritePolicy write_policy)
54 : db(nullptr), env(nullptr), use_stackable_db_(use_stackable_db) {
55 options.create_if_missing = true;
56 options.max_write_buffer_number = 2;
57 options.write_buffer_size = 4 * 1024;
58 options.level0_file_num_compaction_trigger = 2;
59 options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
60 env = new FaultInjectionTestEnv(Env::Default());
61 options.env = env;
62 options.two_write_queues = two_write_queue;
63 dbname = test::PerThreadDBPath("transaction_testdb");
64
65 DestroyDB(dbname, options);
66 txn_db_options.transaction_lock_timeout = 0;
67 txn_db_options.default_lock_timeout = 0;
68 txn_db_options.write_policy = write_policy;
69 txn_db_options.rollback_merge_operands = true;
70 Status s;
71 if (use_stackable_db == false) {
72 s = TransactionDB::Open(options, txn_db_options, dbname, &db);
73 } else {
74 s = OpenWithStackableDB();
75 }
76 assert(s.ok());
77 }
78
79 ~TransactionTestBase() {
80 delete db;
81 db = nullptr;
82 // This is to skip the assert statement in FaultInjectionTestEnv. There
83 // seems to be a bug in btrfs that the makes readdir return recently
84 // unlink-ed files. By using the default fs we simply ignore errors resulted
85 // from attempting to delete such files in DestroyDB.
86 options.env = Env::Default();
87 DestroyDB(dbname, options);
88 delete env;
89 }
90
91 Status ReOpenNoDelete() {
92 delete db;
93 db = nullptr;
94 env->AssertNoOpenFile();
95 env->DropUnsyncedFileData();
96 env->ResetState();
97 Status s;
98 if (use_stackable_db_ == false) {
99 s = TransactionDB::Open(options, txn_db_options, dbname, &db);
100 } else {
101 s = OpenWithStackableDB();
102 }
103 assert(!s.ok() || db != nullptr);
104 return s;
105 }
106
107 Status ReOpenNoDelete(std::vector<ColumnFamilyDescriptor>& cfs,
108 std::vector<ColumnFamilyHandle*>* handles) {
109 for (auto h : *handles) {
110 delete h;
111 }
112 handles->clear();
113 delete db;
114 db = nullptr;
115 env->AssertNoOpenFile();
116 env->DropUnsyncedFileData();
117 env->ResetState();
118 Status s;
119 if (use_stackable_db_ == false) {
120 s = TransactionDB::Open(options, txn_db_options, dbname, cfs, handles,
121 &db);
122 } else {
123 s = OpenWithStackableDB(cfs, handles);
124 }
125 assert(db != nullptr);
126 return s;
127 }
128
129 Status ReOpen() {
130 delete db;
131 db = nullptr;
132 DestroyDB(dbname, options);
133 Status s;
134 if (use_stackable_db_ == false) {
135 s = TransactionDB::Open(options, txn_db_options, dbname, &db);
136 } else {
137 s = OpenWithStackableDB();
138 }
139 assert(db != nullptr);
140 return s;
141 }
142
143 Status OpenWithStackableDB(std::vector<ColumnFamilyDescriptor>& cfs,
144 std::vector<ColumnFamilyHandle*>* handles) {
145 std::vector<size_t> compaction_enabled_cf_indices;
146 TransactionDB::PrepareWrap(&options, &cfs, &compaction_enabled_cf_indices);
147 DB* root_db = nullptr;
148 Options options_copy(options);
149 const bool use_seq_per_batch =
150 txn_db_options.write_policy == WRITE_PREPARED ||
151 txn_db_options.write_policy == WRITE_UNPREPARED;
152 const bool use_batch_per_txn =
153 txn_db_options.write_policy == WRITE_COMMITTED ||
154 txn_db_options.write_policy == WRITE_PREPARED;
155 Status s = DBImpl::Open(options_copy, dbname, cfs, handles, &root_db,
156 use_seq_per_batch, use_batch_per_txn);
157 StackableDB* stackable_db = new StackableDB(root_db);
158 if (s.ok()) {
159 assert(root_db != nullptr);
160 s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options,
161 compaction_enabled_cf_indices,
162 *handles, &db);
163 }
164 if (!s.ok()) {
165 delete stackable_db;
166 // just in case it was not deleted (and not set to nullptr).
167 delete root_db;
168 }
169 return s;
170 }
171
172 Status OpenWithStackableDB() {
173 std::vector<size_t> compaction_enabled_cf_indices;
174 std::vector<ColumnFamilyDescriptor> column_families{ColumnFamilyDescriptor(
175 kDefaultColumnFamilyName, ColumnFamilyOptions(options))};
176
177 TransactionDB::PrepareWrap(&options, &column_families,
178 &compaction_enabled_cf_indices);
179 std::vector<ColumnFamilyHandle*> handles;
180 DB* root_db = nullptr;
181 Options options_copy(options);
182 const bool use_seq_per_batch =
183 txn_db_options.write_policy == WRITE_PREPARED ||
184 txn_db_options.write_policy == WRITE_UNPREPARED;
185 const bool use_batch_per_txn =
186 txn_db_options.write_policy == WRITE_COMMITTED ||
187 txn_db_options.write_policy == WRITE_PREPARED;
188 Status s = DBImpl::Open(options_copy, dbname, column_families, &handles,
189 &root_db, use_seq_per_batch, use_batch_per_txn);
190 if (!s.ok()) {
191 delete root_db;
192 return s;
193 }
194 StackableDB* stackable_db = new StackableDB(root_db);
195 assert(root_db != nullptr);
196 assert(handles.size() == 1);
197 s = TransactionDB::WrapStackableDB(stackable_db, txn_db_options,
198 compaction_enabled_cf_indices, handles,
199 &db);
200 delete handles[0];
201 if (!s.ok()) {
202 delete stackable_db;
203 // just in case it was not deleted (and not set to nullptr).
204 delete root_db;
205 }
206 return s;
207 }
208
209 std::atomic<size_t> linked = {0};
210 std::atomic<size_t> exp_seq = {0};
211 std::atomic<size_t> commit_writes = {0};
212 std::atomic<size_t> expected_commits = {0};
213 std::function<void(size_t, Status)> txn_t0_with_status = [&](size_t index,
214 Status exp_s) {
215 // Test DB's internal txn. It involves no prepare phase nor a commit marker.
216 WriteOptions wopts;
217 auto s = db->Put(wopts, "key" + std::to_string(index), "value");
218 ASSERT_EQ(exp_s, s);
219 if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
220 // Consume one seq per key
221 exp_seq++;
222 } else {
223 // Consume one seq per batch
224 exp_seq++;
225 if (options.two_write_queues) {
226 // Consume one seq for commit
227 exp_seq++;
228 }
229 }
230 };
231 std::function<void(size_t)> txn_t0 = [&](size_t index) {
232 return txn_t0_with_status(index, Status::OK());
233 };
234 std::function<void(size_t)> txn_t1 = [&](size_t index) {
235 // Testing directly writing a write batch. Functionality-wise it is
236 // equivalent to commit without prepare.
237 WriteBatch wb;
238 auto istr = std::to_string(index);
239 ASSERT_OK(wb.Put("k1" + istr, "v1"));
240 ASSERT_OK(wb.Put("k2" + istr, "v2"));
241 ASSERT_OK(wb.Put("k3" + istr, "v3"));
242 WriteOptions wopts;
243 auto s = db->Write(wopts, &wb);
244 if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
245 // Consume one seq per key
246 exp_seq += 3;
247 } else {
248 // Consume one seq per batch
249 exp_seq++;
250 if (options.two_write_queues) {
251 // Consume one seq for commit
252 exp_seq++;
253 }
254 }
255 ASSERT_OK(s);
256 };
257 std::function<void(size_t)> txn_t2 = [&](size_t index) {
258 // Commit without prepare. It should write to DB without a commit marker.
259 TransactionOptions txn_options;
260 WriteOptions write_options;
261 Transaction* txn = db->BeginTransaction(write_options, txn_options);
262 auto istr = std::to_string(index);
263 ASSERT_OK(txn->SetName("xid" + istr));
264 ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar")));
265 ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2")));
266 ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3")));
267 ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4")));
268 ASSERT_OK(txn->Commit());
269 if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
270 // Consume one seq per key
271 exp_seq += 4;
272 } else {
273 // Consume one seq per batch
274 exp_seq++;
275 if (options.two_write_queues) {
276 // Consume one seq for commit
277 exp_seq++;
278 }
279 }
280 delete txn;
281 };
282 std::function<void(size_t)> txn_t3 = [&](size_t index) {
283 // A full 2pc txn that also involves a commit marker.
284 TransactionOptions txn_options;
285 WriteOptions write_options;
286 Transaction* txn = db->BeginTransaction(write_options, txn_options);
287 auto istr = std::to_string(index);
288 ASSERT_OK(txn->SetName("xid" + istr));
289 ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar")));
290 ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2")));
291 ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3")));
292 ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4")));
293 ASSERT_OK(txn->Put(Slice("foo5" + istr), Slice("bar5")));
294 expected_commits++;
295 ASSERT_OK(txn->Prepare());
296 commit_writes++;
297 ASSERT_OK(txn->Commit());
298 if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
299 // Consume one seq per key
300 exp_seq += 5;
301 } else {
302 // Consume one seq per batch
303 exp_seq++;
304 // Consume one seq per commit marker
305 exp_seq++;
306 }
307 delete txn;
308 };
309 std::function<void(size_t)> txn_t4 = [&](size_t index) {
310 // A full 2pc txn that also involves a commit marker.
311 TransactionOptions txn_options;
312 WriteOptions write_options;
313 Transaction* txn = db->BeginTransaction(write_options, txn_options);
314 auto istr = std::to_string(index);
315 ASSERT_OK(txn->SetName("xid" + istr));
316 ASSERT_OK(txn->Put(Slice("foo" + istr), Slice("bar")));
317 ASSERT_OK(txn->Put(Slice("foo2" + istr), Slice("bar2")));
318 ASSERT_OK(txn->Put(Slice("foo3" + istr), Slice("bar3")));
319 ASSERT_OK(txn->Put(Slice("foo4" + istr), Slice("bar4")));
320 ASSERT_OK(txn->Put(Slice("foo5" + istr), Slice("bar5")));
321 expected_commits++;
322 ASSERT_OK(txn->Prepare());
323 commit_writes++;
324 ASSERT_OK(txn->Rollback());
325 if (txn_db_options.write_policy == TxnDBWritePolicy::WRITE_COMMITTED) {
326 // No seq is consumed for deleting the txn buffer
327 exp_seq += 0;
328 } else {
329 // Consume one seq per batch
330 exp_seq++;
331 // Consume one seq per rollback batch
332 exp_seq++;
333 if (options.two_write_queues) {
334 // Consume one seq for rollback commit
335 exp_seq++;
336 }
337 }
338 delete txn;
339 };
340
341 // Test that we can change write policy after a clean shutdown (which would
342 // empty the WAL)
343 void CrossCompatibilityTest(TxnDBWritePolicy from_policy,
344 TxnDBWritePolicy to_policy, bool empty_wal) {
345 TransactionOptions txn_options;
346 ReadOptions read_options;
347 WriteOptions write_options;
348 uint32_t index = 0;
349 Random rnd(1103);
350 options.write_buffer_size = 1024; // To create more sst files
351 std::unordered_map<std::string, std::string> committed_kvs;
352 Transaction* txn;
353
354 txn_db_options.write_policy = from_policy;
355 ReOpen();
356
357 for (int i = 0; i < 1024; i++) {
358 auto istr = std::to_string(index);
359 auto k = Slice("foo-" + istr).ToString();
360 auto v = Slice("bar-" + istr).ToString();
361 // For test the duplicate keys
362 auto v2 = Slice("bar2-" + istr).ToString();
363 auto type = rnd.Uniform(4);
364 switch (type) {
365 case 0:
366 committed_kvs[k] = v;
367 ASSERT_OK(db->Put(write_options, k, v));
368 committed_kvs[k] = v2;
369 ASSERT_OK(db->Put(write_options, k, v2));
370 break;
371 case 1: {
372 WriteBatch wb;
373 committed_kvs[k] = v;
374 wb.Put(k, v);
375 committed_kvs[k] = v2;
376 wb.Put(k, v2);
377 ASSERT_OK(db->Write(write_options, &wb));
378
379 } break;
380 case 2:
381 case 3:
382 txn = db->BeginTransaction(write_options, txn_options);
383 ASSERT_OK(txn->SetName("xid" + istr));
384 committed_kvs[k] = v;
385 ASSERT_OK(txn->Put(k, v));
386 committed_kvs[k] = v2;
387 ASSERT_OK(txn->Put(k, v2));
388
389 if (type == 3) {
390 ASSERT_OK(txn->Prepare());
391 }
392 ASSERT_OK(txn->Commit());
393 delete txn;
394 break;
395 default:
396 assert(0);
397 }
398
399 index++;
400 } // for i
401
402 txn_db_options.write_policy = to_policy;
403 auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
404 // Before upgrade/downgrade the WAL must be emptied
405 if (empty_wal) {
406 db_impl->TEST_FlushMemTable();
407 } else {
408 db_impl->FlushWAL(true);
409 }
410 auto s = ReOpenNoDelete();
411 if (empty_wal) {
412 ASSERT_OK(s);
413 } else {
414 // Test that we can detect the WAL that is produced by an incompatible
415 // WritePolicy and fail fast before mis-interpreting the WAL.
416 ASSERT_TRUE(s.IsNotSupported());
417 return;
418 }
419 db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
420 // Check that WAL is empty
421 VectorLogPtr log_files;
422 db_impl->GetSortedWalFiles(log_files);
423 ASSERT_EQ(0, log_files.size());
424
425 for (auto& kv : committed_kvs) {
426 std::string value;
427 s = db->Get(read_options, kv.first, &value);
428 if (s.IsNotFound()) {
429 printf("key = %s\n", kv.first.c_str());
430 }
431 ASSERT_OK(s);
432 if (kv.second != value) {
433 printf("key = %s\n", kv.first.c_str());
434 }
435 ASSERT_EQ(kv.second, value);
436 }
437 }
438 };
439
440 class TransactionTest : public TransactionTestBase,
441 virtual public ::testing::WithParamInterface<
442 std::tuple<bool, bool, TxnDBWritePolicy>> {
443 public:
444 TransactionTest()
445 : TransactionTestBase(std::get<0>(GetParam()), std::get<1>(GetParam()),
446 std::get<2>(GetParam())){};
447 };
448
449 class TransactionStressTest : public TransactionTest {};
450
451 class MySQLStyleTransactionTest : public TransactionTest {};
452
453 } // namespace rocksdb