]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/util/transaction_test_util.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / util / transaction_test_util.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5#ifndef ROCKSDB_LITE
6
7#ifndef __STDC_FORMAT_MACROS
8#define __STDC_FORMAT_MACROS
9#endif
10
11#include "util/transaction_test_util.h"
12
13#include <inttypes.h>
11fdf7f2
TL
14#include <algorithm>
15#include <numeric>
7c673cae 16#include <string>
11fdf7f2 17#include <thread>
7c673cae
FG
18
19#include "rocksdb/db.h"
20#include "rocksdb/utilities/optimistic_transaction_db.h"
21#include "rocksdb/utilities/transaction.h"
22#include "rocksdb/utilities/transaction_db.h"
23#include "util/random.h"
24#include "util/string_util.h"
25
26namespace rocksdb {
27
28RandomTransactionInserter::RandomTransactionInserter(
29 Random64* rand, const WriteOptions& write_options,
30 const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets)
31 : rand_(rand),
32 write_options_(write_options),
33 read_options_(read_options),
34 num_keys_(num_keys),
11fdf7f2
TL
35 num_sets_(num_sets),
36 txn_id_(0) {}
7c673cae
FG
37
38RandomTransactionInserter::~RandomTransactionInserter() {
39 if (txn_ != nullptr) {
40 delete txn_;
41 }
42 if (optimistic_txn_ != nullptr) {
43 delete optimistic_txn_;
44 }
45}
46
47bool RandomTransactionInserter::TransactionDBInsert(
48 TransactionDB* db, const TransactionOptions& txn_options) {
49 txn_ = db->BeginTransaction(write_options_, txn_options, txn_);
50
11fdf7f2
TL
51 std::hash<std::thread::id> hasher;
52 char name[64];
53 snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%d",
54 hasher(std::this_thread::get_id()), txn_id_++);
55 assert(strlen(name) < 64 - 1);
56 txn_->SetName(name);
57
58 bool take_snapshot = rand_->OneIn(2);
59 if (take_snapshot) {
60 txn_->SetSnapshot();
61 read_options_.snapshot = txn_->GetSnapshot();
62 }
63 auto res = DoInsert(nullptr, txn_, false);
64 if (take_snapshot) {
65 read_options_.snapshot = nullptr;
66 }
67 return res;
7c673cae
FG
68}
69
70bool RandomTransactionInserter::OptimisticTransactionDBInsert(
71 OptimisticTransactionDB* db,
72 const OptimisticTransactionOptions& txn_options) {
73 optimistic_txn_ =
74 db->BeginTransaction(write_options_, txn_options, optimistic_txn_);
75
76 return DoInsert(nullptr, optimistic_txn_, true);
77}
78
79bool RandomTransactionInserter::DBInsert(DB* db) {
80 return DoInsert(db, nullptr, false);
81}
82
11fdf7f2
TL
83Status RandomTransactionInserter::DBGet(
84 DB* db, Transaction* txn, ReadOptions& read_options, uint16_t set_i,
85 uint64_t ikey, bool get_for_update, uint64_t* int_value,
86 std::string* full_key, bool* unexpected_error) {
87 Status s;
88 // Five digits (since the largest uint16_t is 65535) plus the NUL
89 // end char.
90 char prefix_buf[6];
91 // Pad prefix appropriately so we can iterate over each set
92 assert(set_i + 1 <= 9999);
93 snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1);
94 // key format: [SET#][random#]
95 std::string skey = ToString(ikey);
96 Slice base_key(skey);
97 *full_key = std::string(prefix_buf) + base_key.ToString();
98 Slice key(*full_key);
99
100 std::string value;
101 if (txn != nullptr) {
102 if (get_for_update) {
103 s = txn->GetForUpdate(read_options, key, &value);
104 } else {
105 s = txn->Get(read_options, key, &value);
106 }
107 } else {
108 s = db->Get(read_options, key, &value);
109 }
110
111 if (s.ok()) {
112 // Found key, parse its value
113 *int_value = std::stoull(value);
114 if (*int_value == 0 || *int_value == ULONG_MAX) {
115 *unexpected_error = true;
116 fprintf(stderr, "Get returned unexpected value: %s\n", value.c_str());
117 s = Status::Corruption();
118 }
119 } else if (s.IsNotFound()) {
120 // Have not yet written to this key, so assume its value is 0
121 *int_value = 0;
122 s = Status::OK();
123 }
124 return s;
125}
126
7c673cae
FG
127bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn,
128 bool is_optimistic) {
129 Status s;
130 WriteBatch batch;
7c673cae
FG
131
132 // pick a random number to use to increment a key in each set
133 uint64_t incr = (rand_->Next() % 100) + 1;
7c673cae
FG
134 bool unexpected_error = false;
135
11fdf7f2
TL
136 std::vector<uint16_t> set_vec(num_sets_);
137 std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0));
138 std::random_shuffle(set_vec.begin(), set_vec.end(),
139 [&](uint64_t r) { return rand_->Uniform(r); });
140
7c673cae 141 // For each set, pick a key at random and increment it
11fdf7f2 142 for (uint16_t set_i : set_vec) {
7c673cae 143 uint64_t int_value = 0;
11fdf7f2
TL
144 std::string full_key;
145 uint64_t rand_key = rand_->Next() % num_keys_;
146 const bool get_for_update = txn ? rand_->OneIn(2) : false;
147 s = DBGet(db, txn, read_options_, set_i, rand_key, get_for_update,
148 &int_value, &full_key, &unexpected_error);
7c673cae 149 Slice key(full_key);
11fdf7f2 150 if (!s.ok()) {
7c673cae
FG
151 // Optimistic transactions should never return non-ok status here.
152 // Non-optimistic transactions may return write-coflict/timeout errors.
153 if (is_optimistic || !(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) {
154 fprintf(stderr, "Get returned an unexpected error: %s\n",
155 s.ToString().c_str());
156 unexpected_error = true;
157 }
158 break;
159 }
160
161 if (s.ok()) {
162 // Increment key
163 std::string sum = ToString(int_value + incr);
164 if (txn != nullptr) {
165 s = txn->Put(key, sum);
11fdf7f2
TL
166 if (!get_for_update && (s.IsBusy() || s.IsTimedOut())) {
167 // If the initial get was not for update, then the key is not locked
168 // before put and put could fail due to concurrent writes.
169 break;
170 } else if (!s.ok()) {
7c673cae
FG
171 // Since we did a GetForUpdate, Put should not fail.
172 fprintf(stderr, "Put returned an unexpected error: %s\n",
173 s.ToString().c_str());
174 unexpected_error = true;
175 }
176 } else {
177 batch.Put(key, sum);
178 }
11fdf7f2 179 bytes_inserted_ += key.size() + sum.size();
7c673cae
FG
180 }
181 }
182
183 if (s.ok()) {
184 if (txn != nullptr) {
11fdf7f2
TL
185 if (!is_optimistic && !rand_->OneIn(10)) {
186 // also try commit without prpare
187 s = txn->Prepare();
188 assert(s.ok());
189 }
190 if (!rand_->OneIn(20)) {
191 s = txn->Commit();
192 } else {
193 // Also try 5% rollback
194 s = txn->Rollback();
195 assert(s.ok());
196 }
197 assert(is_optimistic || s.ok());
7c673cae
FG
198
199 if (!s.ok()) {
200 if (is_optimistic) {
201 // Optimistic transactions can have write-conflict errors on commit.
202 // Any other error is unexpected.
203 if (!(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) {
204 unexpected_error = true;
205 }
206 } else {
207 // Non-optimistic transactions should only fail due to expiration
208 // or write failures. For testing purproses, we do not expect any
209 // write failures.
210 if (!s.IsExpired()) {
211 unexpected_error = true;
212 }
213 }
214
215 if (unexpected_error) {
216 fprintf(stderr, "Commit returned an unexpected error: %s\n",
217 s.ToString().c_str());
218 }
219 }
7c673cae
FG
220 } else {
221 s = db->Write(write_options_, &batch);
222 if (!s.ok()) {
223 unexpected_error = true;
224 fprintf(stderr, "Write returned an unexpected error: %s\n",
225 s.ToString().c_str());
226 }
227 }
228 } else {
229 if (txn != nullptr) {
11fdf7f2 230 assert(txn->Rollback().ok());
7c673cae
FG
231 }
232 }
233
234 if (s.ok()) {
235 success_count_++;
236 } else {
237 failure_count_++;
238 }
239
240 last_status_ = s;
241
242 // return success if we didn't get any unexpected errors
243 return !unexpected_error;
244}
245
11fdf7f2
TL
246// Verify that the sum of the keys in each set are equal
247Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets,
248 uint64_t num_keys_per_set,
249 bool take_snapshot, Random64* rand) {
7c673cae 250 uint64_t prev_total = 0;
11fdf7f2
TL
251 uint32_t prev_i = 0;
252 bool prev_assigned = false;
7c673cae 253
11fdf7f2
TL
254 ReadOptions roptions;
255 if (take_snapshot) {
256 roptions.snapshot = db->GetSnapshot();
257 }
258
259 std::vector<uint16_t> set_vec(num_sets);
260 std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0));
261 if (rand) {
262 std::random_shuffle(set_vec.begin(), set_vec.end(),
263 [&](uint64_t r) { return rand->Uniform(r); });
264 }
7c673cae 265 // For each set of keys with the same prefix, sum all the values
11fdf7f2
TL
266 for (uint16_t set_i : set_vec) {
267 // Five digits (since the largest uint16_t is 65535) plus the NUL
268 // end char.
7c673cae 269 char prefix_buf[6];
11fdf7f2
TL
270 assert(set_i + 1 <= 9999);
271 snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1);
7c673cae
FG
272 uint64_t total = 0;
273
11fdf7f2
TL
274 // Use either point lookup or iterator. Point lookups are slower so we use
275 // it less often.
276 if (num_keys_per_set != 0 && rand && rand->OneIn(10)) { // use point lookup
277 ReadOptions read_options;
278 for (uint64_t k = 0; k < num_keys_per_set; k++) {
279 std::string dont_care;
280 uint64_t int_value = 0;
281 bool unexpected_error = false;
282 const bool FOR_UPDATE = false;
283 Status s = DBGet(db, nullptr, roptions, set_i, k, FOR_UPDATE,
284 &int_value, &dont_care, &unexpected_error);
285 assert(s.ok());
286 assert(!unexpected_error);
287 total += int_value;
7c673cae 288 }
11fdf7f2
TL
289 } else { // user iterators
290 Iterator* iter = db->NewIterator(roptions);
291 for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) {
292 Slice key = iter->key();
293 // stop when we reach a different prefix
294 if (key.ToString().compare(0, 4, prefix_buf) != 0) {
295 break;
296 }
297 Slice value = iter->value();
298 uint64_t int_value = std::stoull(value.ToString());
299 if (int_value == 0 || int_value == ULONG_MAX) {
300 fprintf(stderr, "Iter returned unexpected value: %s\n",
301 value.ToString().c_str());
302 return Status::Corruption();
303 }
304 total += int_value;
7c673cae 305 }
11fdf7f2 306 delete iter;
7c673cae 307 }
11fdf7f2
TL
308
309 if (prev_assigned && total != prev_total) {
310 fprintf(stdout,
311 "RandomTransactionVerify found inconsistent totals. "
312 "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64 " \n",
313 prev_i, prev_total, set_i, total);
314 return Status::Corruption();
7c673cae
FG
315 }
316 prev_total = total;
11fdf7f2
TL
317 prev_i = set_i;
318 prev_assigned = true;
319 }
320 if (take_snapshot) {
321 db->ReleaseSnapshot(roptions.snapshot);
7c673cae
FG
322 }
323
324 return Status::OK();
325}
326
327} // namespace rocksdb
328
329#endif // ROCKSDB_LITE