1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
7 #ifndef __STDC_FORMAT_MACROS
8 #define __STDC_FORMAT_MACROS
11 #include "util/transaction_test_util.h"
20 #include "rocksdb/db.h"
21 #include "rocksdb/utilities/optimistic_transaction_db.h"
22 #include "rocksdb/utilities/transaction.h"
23 #include "rocksdb/utilities/transaction_db.h"
25 #include "db/dbformat.h"
26 #include "db/snapshot_impl.h"
27 #include "util/logging.h"
28 #include "util/random.h"
29 #include "util/string_util.h"
33 RandomTransactionInserter::RandomTransactionInserter(
34 Random64
* rand
, const WriteOptions
& write_options
,
35 const ReadOptions
& read_options
, uint64_t num_keys
, uint16_t num_sets
,
36 const uint64_t cmt_delay_ms
, const uint64_t first_id
)
38 write_options_(write_options
),
39 read_options_(read_options
),
43 cmt_delay_ms_(cmt_delay_ms
) {}
45 RandomTransactionInserter::~RandomTransactionInserter() {
46 if (txn_
!= nullptr) {
49 if (optimistic_txn_
!= nullptr) {
50 delete optimistic_txn_
;
54 bool RandomTransactionInserter::TransactionDBInsert(
55 TransactionDB
* db
, const TransactionOptions
& txn_options
) {
56 txn_
= db
->BeginTransaction(write_options_
, txn_options
, txn_
);
58 std::hash
<std::thread::id
> hasher
;
60 snprintf(name
, 64, "txn%" ROCKSDB_PRIszt
"-%" PRIu64
,
61 hasher(std::this_thread::get_id()), txn_id_
++);
62 assert(strlen(name
) < 64 - 1);
63 assert(txn_
->SetName(name
).ok());
65 // Take a snapshot if set_snapshot was not set or with 50% change otherwise
66 bool take_snapshot
= txn_
->GetSnapshot() == nullptr || rand_
->OneIn(2);
69 read_options_
.snapshot
= txn_
->GetSnapshot();
71 auto res
= DoInsert(db
, txn_
, false);
73 read_options_
.snapshot
= nullptr;
78 bool RandomTransactionInserter::OptimisticTransactionDBInsert(
79 OptimisticTransactionDB
* db
,
80 const OptimisticTransactionOptions
& txn_options
) {
82 db
->BeginTransaction(write_options_
, txn_options
, optimistic_txn_
);
84 return DoInsert(db
, optimistic_txn_
, true);
87 bool RandomTransactionInserter::DBInsert(DB
* db
) {
88 return DoInsert(db
, nullptr, false);
91 Status
RandomTransactionInserter::DBGet(
92 DB
* db
, Transaction
* txn
, ReadOptions
& read_options
, uint16_t set_i
,
93 uint64_t ikey
, bool get_for_update
, uint64_t* int_value
,
94 std::string
* full_key
, bool* unexpected_error
) {
96 // Five digits (since the largest uint16_t is 65535) plus the NUL
99 // Pad prefix appropriately so we can iterate over each set
100 assert(set_i
+ 1 <= 9999);
101 snprintf(prefix_buf
, sizeof(prefix_buf
), "%.4u", set_i
+ 1);
102 // key format: [SET#][random#]
103 std::string skey
= ToString(ikey
);
104 Slice
base_key(skey
);
105 *full_key
= std::string(prefix_buf
) + base_key
.ToString();
106 Slice
key(*full_key
);
109 if (txn
!= nullptr) {
110 if (get_for_update
) {
111 s
= txn
->GetForUpdate(read_options
, key
, &value
);
113 s
= txn
->Get(read_options
, key
, &value
);
116 s
= db
->Get(read_options
, key
, &value
);
120 // Found key, parse its value
121 *int_value
= std::stoull(value
);
122 if (*int_value
== 0 || *int_value
== ULONG_MAX
) {
123 *unexpected_error
= true;
124 fprintf(stderr
, "Get returned unexpected value: %s\n", value
.c_str());
125 s
= Status::Corruption();
127 } else if (s
.IsNotFound()) {
128 // Have not yet written to this key, so assume its value is 0
135 bool RandomTransactionInserter::DoInsert(DB
* db
, Transaction
* txn
,
136 bool is_optimistic
) {
140 // pick a random number to use to increment a key in each set
141 uint64_t incr
= (rand_
->Next() % 100) + 1;
142 bool unexpected_error
= false;
144 std::vector
<uint16_t> set_vec(num_sets_
);
145 std::iota(set_vec
.begin(), set_vec
.end(), static_cast<uint16_t>(0));
146 std::shuffle(set_vec
.begin(), set_vec
.end(), std::random_device
{});
148 // For each set, pick a key at random and increment it
149 for (uint16_t set_i
: set_vec
) {
150 uint64_t int_value
= 0;
151 std::string full_key
;
152 uint64_t rand_key
= rand_
->Next() % num_keys_
;
153 const bool get_for_update
= txn
? rand_
->OneIn(2) : false;
154 s
= DBGet(db
, txn
, read_options_
, set_i
, rand_key
, get_for_update
,
155 &int_value
, &full_key
, &unexpected_error
);
158 // Optimistic transactions should never return non-ok status here.
159 // Non-optimistic transactions may return write-coflict/timeout errors.
160 if (is_optimistic
|| !(s
.IsBusy() || s
.IsTimedOut() || s
.IsTryAgain())) {
161 fprintf(stderr
, "Get returned an unexpected error: %s\n",
162 s
.ToString().c_str());
163 unexpected_error
= true;
170 std::string sum
= ToString(int_value
+ incr
);
171 if (txn
!= nullptr) {
172 s
= txn
->Put(key
, sum
);
173 if (!get_for_update
&& (s
.IsBusy() || s
.IsTimedOut())) {
174 // If the initial get was not for update, then the key is not locked
175 // before put and put could fail due to concurrent writes.
177 } else if (!s
.ok()) {
178 // Since we did a GetForUpdate, Put should not fail.
179 fprintf(stderr
, "Put returned an unexpected error: %s\n",
180 s
.ToString().c_str());
181 unexpected_error
= true;
186 bytes_inserted_
+= key
.size() + sum
.size();
188 if (txn
!= nullptr) {
189 ROCKS_LOG_DEBUG(db
->GetDBOptions().info_log
,
190 "Insert (%s) %s snap: %" PRIu64
" key:%s value: %" PRIu64
191 "+%" PRIu64
"=%" PRIu64
,
192 txn
->GetName().c_str(), s
.ToString().c_str(),
193 txn
->GetSnapshot()->GetSequenceNumber(), full_key
.c_str(),
194 int_value
, incr
, int_value
+ incr
);
199 if (txn
!= nullptr) {
200 bool with_prepare
= !is_optimistic
&& !rand_
->OneIn(10);
202 // Also try commit without prepare
205 ROCKS_LOG_DEBUG(db
->GetDBOptions().info_log
,
206 "Prepare of %" PRIu64
" %s (%s)", txn
->GetId(),
207 s
.ToString().c_str(), txn
->GetName().c_str());
208 db
->GetDBOptions().env
->SleepForMicroseconds(
209 static_cast<int>(cmt_delay_ms_
* 1000));
211 if (!rand_
->OneIn(20)) {
213 assert(!with_prepare
|| s
.ok());
214 ROCKS_LOG_DEBUG(db
->GetDBOptions().info_log
,
215 "Commit of %" PRIu64
" %s (%s)", txn
->GetId(),
216 s
.ToString().c_str(), txn
->GetName().c_str());
218 // Also try 5% rollback
220 ROCKS_LOG_DEBUG(db
->GetDBOptions().info_log
,
221 "Rollback %" PRIu64
" %s %s", txn
->GetId(),
222 txn
->GetName().c_str(), s
.ToString().c_str());
225 assert(is_optimistic
|| s
.ok());
229 // Optimistic transactions can have write-conflict errors on commit.
230 // Any other error is unexpected.
231 if (!(s
.IsBusy() || s
.IsTimedOut() || s
.IsTryAgain())) {
232 unexpected_error
= true;
235 // Non-optimistic transactions should only fail due to expiration
236 // or write failures. For testing purproses, we do not expect any
238 if (!s
.IsExpired()) {
239 unexpected_error
= true;
243 if (unexpected_error
) {
244 fprintf(stderr
, "Commit returned an unexpected error: %s\n",
245 s
.ToString().c_str());
249 s
= db
->Write(write_options_
, &batch
);
251 unexpected_error
= true;
252 fprintf(stderr
, "Write returned an unexpected error: %s\n",
253 s
.ToString().c_str());
257 if (txn
!= nullptr) {
258 assert(txn
->Rollback().ok());
259 ROCKS_LOG_DEBUG(db
->GetDBOptions().info_log
, "Error %s for txn %s",
260 s
.ToString().c_str(), txn
->GetName().c_str());
272 // return success if we didn't get any unexpected errors
273 return !unexpected_error
;
276 // Verify that the sum of the keys in each set are equal
277 Status
RandomTransactionInserter::Verify(DB
* db
, uint16_t num_sets
,
278 uint64_t num_keys_per_set
,
279 bool take_snapshot
, Random64
* rand
,
281 // delay_ms is the delay between taking a snapshot and doing the reads. It
282 // emulates reads from a long-running backup job.
283 assert(delay_ms
== 0 || take_snapshot
);
284 uint64_t prev_total
= 0;
286 bool prev_assigned
= false;
288 ReadOptions roptions
;
290 roptions
.snapshot
= db
->GetSnapshot();
291 db
->GetDBOptions().env
->SleepForMicroseconds(
292 static_cast<int>(delay_ms
* 1000));
295 std::vector
<uint16_t> set_vec(num_sets
);
296 std::iota(set_vec
.begin(), set_vec
.end(), static_cast<uint16_t>(0));
297 std::shuffle(set_vec
.begin(), set_vec
.end(), std::random_device
{});
299 // For each set of keys with the same prefix, sum all the values
300 for (uint16_t set_i
: set_vec
) {
301 // Five digits (since the largest uint16_t is 65535) plus the NUL
304 assert(set_i
+ 1 <= 9999);
305 snprintf(prefix_buf
, sizeof(prefix_buf
), "%.4u", set_i
+ 1);
308 // Use either point lookup or iterator. Point lookups are slower so we use
310 const bool use_point_lookup
=
311 num_keys_per_set
!= 0 && rand
&& rand
->OneIn(10);
312 if (use_point_lookup
) {
313 ReadOptions read_options
;
314 for (uint64_t k
= 0; k
< num_keys_per_set
; k
++) {
315 std::string dont_care
;
316 uint64_t int_value
= 0;
317 bool unexpected_error
= false;
318 const bool FOR_UPDATE
= false;
319 Status s
= DBGet(db
, nullptr, roptions
, set_i
, k
, FOR_UPDATE
,
320 &int_value
, &dont_care
, &unexpected_error
);
322 assert(!unexpected_error
);
325 } else { // user iterators
326 Iterator
* iter
= db
->NewIterator(roptions
);
327 for (iter
->Seek(Slice(prefix_buf
, 4)); iter
->Valid(); iter
->Next()) {
328 Slice key
= iter
->key();
329 // stop when we reach a different prefix
330 if (key
.ToString().compare(0, 4, prefix_buf
) != 0) {
333 Slice value
= iter
->value();
334 uint64_t int_value
= std::stoull(value
.ToString());
335 if (int_value
== 0 || int_value
== ULONG_MAX
) {
336 fprintf(stderr
, "Iter returned unexpected value: %s\n",
337 value
.ToString().c_str());
338 return Status::Corruption();
341 db
->GetDBOptions().info_log
,
342 "VerifyRead at %" PRIu64
" (%" PRIu64
"): %.*s value: %" PRIu64
,
343 roptions
.snapshot
? roptions
.snapshot
->GetSequenceNumber() : 0ul,
345 ? ((SnapshotImpl
*)roptions
.snapshot
)->min_uncommitted_
347 static_cast<int>(key
.size()), key
.data(), int_value
);
353 if (prev_assigned
&& total
!= prev_total
) {
354 db
->GetDBOptions().info_log
->Flush();
356 "RandomTransactionVerify found inconsistent totals using "
358 "Set[%" PRIu32
"]: %" PRIu64
", Set[%" PRIu32
"]: %" PRIu64
359 " at snapshot %" PRIu64
"\n",
360 use_point_lookup
, prev_i
, prev_total
, set_i
, total
,
361 roptions
.snapshot
? roptions
.snapshot
->GetSequenceNumber() : 0ul);
363 return Status::Corruption();
366 db
->GetDBOptions().info_log
,
367 "RandomTransactionVerify pass pointlookup? %d total: %" PRIu64
369 use_point_lookup
, total
,
370 roptions
.snapshot
? roptions
.snapshot
->GetSequenceNumber() : 0ul);
374 prev_assigned
= true;
377 db
->ReleaseSnapshot(roptions
.snapshot
);
383 } // namespace rocksdb
385 #endif // ROCKSDB_LITE