1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
7 #include "test_util/transaction_test_util.h"
16 #include "rocksdb/db.h"
17 #include "rocksdb/utilities/optimistic_transaction_db.h"
18 #include "rocksdb/utilities/transaction.h"
19 #include "rocksdb/utilities/transaction_db.h"
21 #include "db/dbformat.h"
22 #include "db/snapshot_impl.h"
23 #include "logging/logging.h"
24 #include "util/random.h"
25 #include "util/string_util.h"
27 namespace ROCKSDB_NAMESPACE
{
29 RandomTransactionInserter::RandomTransactionInserter(
30 Random64
* rand
, const WriteOptions
& write_options
,
31 const ReadOptions
& read_options
, uint64_t num_keys
, uint16_t num_sets
,
32 const uint64_t cmt_delay_ms
, const uint64_t first_id
)
34 write_options_(write_options
),
35 read_options_(read_options
),
39 cmt_delay_ms_(cmt_delay_ms
) {}
41 RandomTransactionInserter::~RandomTransactionInserter() {
42 if (txn_
!= nullptr) {
45 if (optimistic_txn_
!= nullptr) {
46 delete optimistic_txn_
;
50 bool RandomTransactionInserter::TransactionDBInsert(
51 TransactionDB
* db
, const TransactionOptions
& txn_options
) {
52 txn_
= db
->BeginTransaction(write_options_
, txn_options
, txn_
);
54 std::hash
<std::thread::id
> hasher
;
56 snprintf(name
, 64, "txn%" ROCKSDB_PRIszt
"-%" PRIu64
,
57 hasher(std::this_thread::get_id()), txn_id_
++);
58 assert(strlen(name
) < 64 - 1);
59 assert(txn_
->SetName(name
).ok());
61 // Take a snapshot if set_snapshot was not set or with 50% change otherwise
62 bool take_snapshot
= txn_
->GetSnapshot() == nullptr || rand_
->OneIn(2);
65 read_options_
.snapshot
= txn_
->GetSnapshot();
67 auto res
= DoInsert(db
, txn_
, false);
69 read_options_
.snapshot
= nullptr;
74 bool RandomTransactionInserter::OptimisticTransactionDBInsert(
75 OptimisticTransactionDB
* db
,
76 const OptimisticTransactionOptions
& txn_options
) {
78 db
->BeginTransaction(write_options_
, txn_options
, optimistic_txn_
);
80 return DoInsert(db
, optimistic_txn_
, true);
83 bool RandomTransactionInserter::DBInsert(DB
* db
) {
84 return DoInsert(db
, nullptr, false);
87 Status
RandomTransactionInserter::DBGet(
88 DB
* db
, Transaction
* txn
, ReadOptions
& read_options
, uint16_t set_i
,
89 uint64_t ikey
, bool get_for_update
, uint64_t* int_value
,
90 std::string
* full_key
, bool* unexpected_error
) {
92 // Five digits (since the largest uint16_t is 65535) plus the NUL
95 // Pad prefix appropriately so we can iterate over each set
96 assert(set_i
+ 1 <= 9999);
97 snprintf(prefix_buf
, sizeof(prefix_buf
), "%.4u", set_i
+ 1);
98 // key format: [SET#][random#]
99 std::string skey
= ToString(ikey
);
100 Slice
base_key(skey
);
101 *full_key
= std::string(prefix_buf
) + base_key
.ToString();
102 Slice
key(*full_key
);
105 if (txn
!= nullptr) {
106 if (get_for_update
) {
107 s
= txn
->GetForUpdate(read_options
, key
, &value
);
109 s
= txn
->Get(read_options
, key
, &value
);
112 s
= db
->Get(read_options
, key
, &value
);
116 // Found key, parse its value
117 *int_value
= std::stoull(value
);
118 if (*int_value
== 0 || *int_value
== ULONG_MAX
) {
119 *unexpected_error
= true;
120 fprintf(stderr
, "Get returned unexpected value: %s\n", value
.c_str());
121 s
= Status::Corruption();
123 } else if (s
.IsNotFound()) {
124 // Have not yet written to this key, so assume its value is 0
131 bool RandomTransactionInserter::DoInsert(DB
* db
, Transaction
* txn
,
132 bool is_optimistic
) {
136 // pick a random number to use to increment a key in each set
137 uint64_t incr
= (rand_
->Next() % 100) + 1;
138 bool unexpected_error
= false;
140 std::vector
<uint16_t> set_vec(num_sets_
);
141 std::iota(set_vec
.begin(), set_vec
.end(), static_cast<uint16_t>(0));
142 std::shuffle(set_vec
.begin(), set_vec
.end(), std::random_device
{});
144 // For each set, pick a key at random and increment it
145 for (uint16_t set_i
: set_vec
) {
146 uint64_t int_value
= 0;
147 std::string full_key
;
148 uint64_t rand_key
= rand_
->Next() % num_keys_
;
149 const bool get_for_update
= txn
? rand_
->OneIn(2) : false;
150 s
= DBGet(db
, txn
, read_options_
, set_i
, rand_key
, get_for_update
,
151 &int_value
, &full_key
, &unexpected_error
);
154 // Optimistic transactions should never return non-ok status here.
155 // Non-optimistic transactions may return write-coflict/timeout errors.
156 if (is_optimistic
|| !(s
.IsBusy() || s
.IsTimedOut() || s
.IsTryAgain())) {
157 fprintf(stderr
, "Get returned an unexpected error: %s\n",
158 s
.ToString().c_str());
159 unexpected_error
= true;
166 std::string sum
= ToString(int_value
+ incr
);
167 if (txn
!= nullptr) {
168 s
= txn
->Put(key
, sum
);
169 if (!get_for_update
&& (s
.IsBusy() || s
.IsTimedOut())) {
170 // If the initial get was not for update, then the key is not locked
171 // before put and put could fail due to concurrent writes.
173 } else if (!s
.ok()) {
174 // Since we did a GetForUpdate, Put should not fail.
175 fprintf(stderr
, "Put returned an unexpected error: %s\n",
176 s
.ToString().c_str());
177 unexpected_error
= true;
182 bytes_inserted_
+= key
.size() + sum
.size();
184 if (txn
!= nullptr) {
185 ROCKS_LOG_DEBUG(db
->GetDBOptions().info_log
,
186 "Insert (%s) %s snap: %" PRIu64
" key:%s value: %" PRIu64
187 "+%" PRIu64
"=%" PRIu64
,
188 txn
->GetName().c_str(), s
.ToString().c_str(),
189 txn
->GetSnapshot()->GetSequenceNumber(), full_key
.c_str(),
190 int_value
, incr
, int_value
+ incr
);
195 if (txn
!= nullptr) {
196 bool with_prepare
= !is_optimistic
&& !rand_
->OneIn(10);
198 // Also try commit without prepare
201 ROCKS_LOG_DEBUG(db
->GetDBOptions().info_log
,
202 "Prepare of %" PRIu64
" %s (%s)", txn
->GetId(),
203 s
.ToString().c_str(), txn
->GetName().c_str());
204 if (rand_
->OneIn(20)) {
205 // This currently only tests the mechanics of writing commit time
206 // write batch so the exact values would not matter.
207 s
= txn_
->GetCommitTimeWriteBatch()->Put("cat", "dog");
210 db
->GetDBOptions().env
->SleepForMicroseconds(
211 static_cast<int>(cmt_delay_ms_
* 1000));
213 if (!rand_
->OneIn(20)) {
215 assert(!with_prepare
|| s
.ok());
216 ROCKS_LOG_DEBUG(db
->GetDBOptions().info_log
,
217 "Commit of %" PRIu64
" %s (%s)", txn
->GetId(),
218 s
.ToString().c_str(), txn
->GetName().c_str());
220 // Also try 5% rollback
222 ROCKS_LOG_DEBUG(db
->GetDBOptions().info_log
,
223 "Rollback %" PRIu64
" %s %s", txn
->GetId(),
224 txn
->GetName().c_str(), s
.ToString().c_str());
227 assert(is_optimistic
|| s
.ok());
231 // Optimistic transactions can have write-conflict errors on commit.
232 // Any other error is unexpected.
233 if (!(s
.IsBusy() || s
.IsTimedOut() || s
.IsTryAgain())) {
234 unexpected_error
= true;
237 // Non-optimistic transactions should only fail due to expiration
238 // or write failures. For testing purproses, we do not expect any
240 if (!s
.IsExpired()) {
241 unexpected_error
= true;
245 if (unexpected_error
) {
246 fprintf(stderr
, "Commit returned an unexpected error: %s\n",
247 s
.ToString().c_str());
251 s
= db
->Write(write_options_
, &batch
);
253 unexpected_error
= true;
254 fprintf(stderr
, "Write returned an unexpected error: %s\n",
255 s
.ToString().c_str());
259 if (txn
!= nullptr) {
260 assert(txn
->Rollback().ok());
261 ROCKS_LOG_DEBUG(db
->GetDBOptions().info_log
, "Error %s for txn %s",
262 s
.ToString().c_str(), txn
->GetName().c_str());
274 // return success if we didn't get any unexpected errors
275 return !unexpected_error
;
278 // Verify that the sum of the keys in each set are equal
279 Status
RandomTransactionInserter::Verify(DB
* db
, uint16_t num_sets
,
280 uint64_t num_keys_per_set
,
281 bool take_snapshot
, Random64
* rand
,
283 // delay_ms is the delay between taking a snapshot and doing the reads. It
284 // emulates reads from a long-running backup job.
285 assert(delay_ms
== 0 || take_snapshot
);
286 uint64_t prev_total
= 0;
288 bool prev_assigned
= false;
290 ReadOptions roptions
;
292 roptions
.snapshot
= db
->GetSnapshot();
293 db
->GetDBOptions().env
->SleepForMicroseconds(
294 static_cast<int>(delay_ms
* 1000));
297 std::vector
<uint16_t> set_vec(num_sets
);
298 std::iota(set_vec
.begin(), set_vec
.end(), static_cast<uint16_t>(0));
299 std::shuffle(set_vec
.begin(), set_vec
.end(), std::random_device
{});
301 // For each set of keys with the same prefix, sum all the values
302 for (uint16_t set_i
: set_vec
) {
303 // Five digits (since the largest uint16_t is 65535) plus the NUL
306 assert(set_i
+ 1 <= 9999);
307 snprintf(prefix_buf
, sizeof(prefix_buf
), "%.4u", set_i
+ 1);
310 // Use either point lookup or iterator. Point lookups are slower so we use
312 const bool use_point_lookup
=
313 num_keys_per_set
!= 0 && rand
&& rand
->OneIn(10);
314 if (use_point_lookup
) {
315 ReadOptions read_options
;
316 for (uint64_t k
= 0; k
< num_keys_per_set
; k
++) {
317 std::string dont_care
;
318 uint64_t int_value
= 0;
319 bool unexpected_error
= false;
320 const bool FOR_UPDATE
= false;
321 Status s
= DBGet(db
, nullptr, roptions
, set_i
, k
, FOR_UPDATE
,
322 &int_value
, &dont_care
, &unexpected_error
);
324 assert(!unexpected_error
);
327 } else { // user iterators
328 Iterator
* iter
= db
->NewIterator(roptions
);
329 for (iter
->Seek(Slice(prefix_buf
, 4)); iter
->Valid(); iter
->Next()) {
330 Slice key
= iter
->key();
331 // stop when we reach a different prefix
332 if (key
.ToString().compare(0, 4, prefix_buf
) != 0) {
335 Slice value
= iter
->value();
336 uint64_t int_value
= std::stoull(value
.ToString());
337 if (int_value
== 0 || int_value
== ULONG_MAX
) {
338 fprintf(stderr
, "Iter returned unexpected value: %s\n",
339 value
.ToString().c_str());
340 return Status::Corruption();
343 db
->GetDBOptions().info_log
,
344 "VerifyRead at %" PRIu64
" (%" PRIu64
"): %.*s value: %" PRIu64
,
345 roptions
.snapshot
? roptions
.snapshot
->GetSequenceNumber() : 0ul,
347 ? ((SnapshotImpl
*)roptions
.snapshot
)->min_uncommitted_
349 static_cast<int>(key
.size()), key
.data(), int_value
);
355 if (prev_assigned
&& total
!= prev_total
) {
356 db
->GetDBOptions().info_log
->Flush();
358 "RandomTransactionVerify found inconsistent totals using "
360 "Set[%" PRIu32
"]: %" PRIu64
", Set[%" PRIu32
"]: %" PRIu64
361 " at snapshot %" PRIu64
"\n",
362 use_point_lookup
, prev_i
, prev_total
, set_i
, total
,
363 roptions
.snapshot
? roptions
.snapshot
->GetSequenceNumber() : 0ul);
365 return Status::Corruption();
368 db
->GetDBOptions().info_log
,
369 "RandomTransactionVerify pass pointlookup? %d total: %" PRIu64
371 use_point_lookup
, total
,
372 roptions
.snapshot
? roptions
.snapshot
->GetSequenceNumber() : 0ul);
376 prev_assigned
= true;
379 db
->ReleaseSnapshot(roptions
.snapshot
);
385 } // namespace ROCKSDB_NAMESPACE
387 #endif // ROCKSDB_LITE