]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/util/transaction_test_util.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / rocksdb / util / transaction_test_util.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 #ifndef ROCKSDB_LITE
6
7 #ifndef __STDC_FORMAT_MACROS
8 #define __STDC_FORMAT_MACROS
9 #endif
10
11 #include "util/transaction_test_util.h"
12
13 #include <inttypes.h>
14 #include <algorithm>
15 #include <numeric>
16 #include <random>
17 #include <string>
18 #include <thread>
19
20 #include "rocksdb/db.h"
21 #include "rocksdb/utilities/optimistic_transaction_db.h"
22 #include "rocksdb/utilities/transaction.h"
23 #include "rocksdb/utilities/transaction_db.h"
24
25 #include "db/dbformat.h"
26 #include "db/snapshot_impl.h"
27 #include "util/logging.h"
28 #include "util/random.h"
29 #include "util/string_util.h"
30
31 namespace rocksdb {
32
33 RandomTransactionInserter::RandomTransactionInserter(
34 Random64* rand, const WriteOptions& write_options,
35 const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets,
36 const uint64_t cmt_delay_ms, const uint64_t first_id)
37 : rand_(rand),
38 write_options_(write_options),
39 read_options_(read_options),
40 num_keys_(num_keys),
41 num_sets_(num_sets),
42 txn_id_(first_id),
43 cmt_delay_ms_(cmt_delay_ms) {}
44
45 RandomTransactionInserter::~RandomTransactionInserter() {
46 if (txn_ != nullptr) {
47 delete txn_;
48 }
49 if (optimistic_txn_ != nullptr) {
50 delete optimistic_txn_;
51 }
52 }
53
54 bool RandomTransactionInserter::TransactionDBInsert(
55 TransactionDB* db, const TransactionOptions& txn_options) {
56 txn_ = db->BeginTransaction(write_options_, txn_options, txn_);
57
58 std::hash<std::thread::id> hasher;
59 char name[64];
60 snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%" PRIu64,
61 hasher(std::this_thread::get_id()), txn_id_++);
62 assert(strlen(name) < 64 - 1);
63 assert(txn_->SetName(name).ok());
64
65 // Take a snapshot if set_snapshot was not set or with 50% change otherwise
66 bool take_snapshot = txn_->GetSnapshot() == nullptr || rand_->OneIn(2);
67 if (take_snapshot) {
68 txn_->SetSnapshot();
69 read_options_.snapshot = txn_->GetSnapshot();
70 }
71 auto res = DoInsert(db, txn_, false);
72 if (take_snapshot) {
73 read_options_.snapshot = nullptr;
74 }
75 return res;
76 }
77
78 bool RandomTransactionInserter::OptimisticTransactionDBInsert(
79 OptimisticTransactionDB* db,
80 const OptimisticTransactionOptions& txn_options) {
81 optimistic_txn_ =
82 db->BeginTransaction(write_options_, txn_options, optimistic_txn_);
83
84 return DoInsert(db, optimistic_txn_, true);
85 }
86
87 bool RandomTransactionInserter::DBInsert(DB* db) {
88 return DoInsert(db, nullptr, false);
89 }
90
91 Status RandomTransactionInserter::DBGet(
92 DB* db, Transaction* txn, ReadOptions& read_options, uint16_t set_i,
93 uint64_t ikey, bool get_for_update, uint64_t* int_value,
94 std::string* full_key, bool* unexpected_error) {
95 Status s;
96 // Five digits (since the largest uint16_t is 65535) plus the NUL
97 // end char.
98 char prefix_buf[6];
99 // Pad prefix appropriately so we can iterate over each set
100 assert(set_i + 1 <= 9999);
101 snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1);
102 // key format: [SET#][random#]
103 std::string skey = ToString(ikey);
104 Slice base_key(skey);
105 *full_key = std::string(prefix_buf) + base_key.ToString();
106 Slice key(*full_key);
107
108 std::string value;
109 if (txn != nullptr) {
110 if (get_for_update) {
111 s = txn->GetForUpdate(read_options, key, &value);
112 } else {
113 s = txn->Get(read_options, key, &value);
114 }
115 } else {
116 s = db->Get(read_options, key, &value);
117 }
118
119 if (s.ok()) {
120 // Found key, parse its value
121 *int_value = std::stoull(value);
122 if (*int_value == 0 || *int_value == ULONG_MAX) {
123 *unexpected_error = true;
124 fprintf(stderr, "Get returned unexpected value: %s\n", value.c_str());
125 s = Status::Corruption();
126 }
127 } else if (s.IsNotFound()) {
128 // Have not yet written to this key, so assume its value is 0
129 *int_value = 0;
130 s = Status::OK();
131 }
132 return s;
133 }
134
135 bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn,
136 bool is_optimistic) {
137 Status s;
138 WriteBatch batch;
139
140 // pick a random number to use to increment a key in each set
141 uint64_t incr = (rand_->Next() % 100) + 1;
142 bool unexpected_error = false;
143
144 std::vector<uint16_t> set_vec(num_sets_);
145 std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0));
146 std::shuffle(set_vec.begin(), set_vec.end(), std::random_device{});
147
148 // For each set, pick a key at random and increment it
149 for (uint16_t set_i : set_vec) {
150 uint64_t int_value = 0;
151 std::string full_key;
152 uint64_t rand_key = rand_->Next() % num_keys_;
153 const bool get_for_update = txn ? rand_->OneIn(2) : false;
154 s = DBGet(db, txn, read_options_, set_i, rand_key, get_for_update,
155 &int_value, &full_key, &unexpected_error);
156 Slice key(full_key);
157 if (!s.ok()) {
158 // Optimistic transactions should never return non-ok status here.
159 // Non-optimistic transactions may return write-coflict/timeout errors.
160 if (is_optimistic || !(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) {
161 fprintf(stderr, "Get returned an unexpected error: %s\n",
162 s.ToString().c_str());
163 unexpected_error = true;
164 }
165 break;
166 }
167
168 if (s.ok()) {
169 // Increment key
170 std::string sum = ToString(int_value + incr);
171 if (txn != nullptr) {
172 s = txn->Put(key, sum);
173 if (!get_for_update && (s.IsBusy() || s.IsTimedOut())) {
174 // If the initial get was not for update, then the key is not locked
175 // before put and put could fail due to concurrent writes.
176 break;
177 } else if (!s.ok()) {
178 // Since we did a GetForUpdate, Put should not fail.
179 fprintf(stderr, "Put returned an unexpected error: %s\n",
180 s.ToString().c_str());
181 unexpected_error = true;
182 }
183 } else {
184 batch.Put(key, sum);
185 }
186 bytes_inserted_ += key.size() + sum.size();
187 }
188 if (txn != nullptr) {
189 ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
190 "Insert (%s) %s snap: %" PRIu64 " key:%s value: %" PRIu64
191 "+%" PRIu64 "=%" PRIu64,
192 txn->GetName().c_str(), s.ToString().c_str(),
193 txn->GetSnapshot()->GetSequenceNumber(), full_key.c_str(),
194 int_value, incr, int_value + incr);
195 }
196 }
197
198 if (s.ok()) {
199 if (txn != nullptr) {
200 bool with_prepare = !is_optimistic && !rand_->OneIn(10);
201 if (with_prepare) {
202 // Also try commit without prepare
203 s = txn->Prepare();
204 assert(s.ok());
205 ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
206 "Prepare of %" PRIu64 " %s (%s)", txn->GetId(),
207 s.ToString().c_str(), txn->GetName().c_str());
208 db->GetDBOptions().env->SleepForMicroseconds(
209 static_cast<int>(cmt_delay_ms_ * 1000));
210 }
211 if (!rand_->OneIn(20)) {
212 s = txn->Commit();
213 assert(!with_prepare || s.ok());
214 ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
215 "Commit of %" PRIu64 " %s (%s)", txn->GetId(),
216 s.ToString().c_str(), txn->GetName().c_str());
217 } else {
218 // Also try 5% rollback
219 s = txn->Rollback();
220 ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
221 "Rollback %" PRIu64 " %s %s", txn->GetId(),
222 txn->GetName().c_str(), s.ToString().c_str());
223 assert(s.ok());
224 }
225 assert(is_optimistic || s.ok());
226
227 if (!s.ok()) {
228 if (is_optimistic) {
229 // Optimistic transactions can have write-conflict errors on commit.
230 // Any other error is unexpected.
231 if (!(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) {
232 unexpected_error = true;
233 }
234 } else {
235 // Non-optimistic transactions should only fail due to expiration
236 // or write failures. For testing purproses, we do not expect any
237 // write failures.
238 if (!s.IsExpired()) {
239 unexpected_error = true;
240 }
241 }
242
243 if (unexpected_error) {
244 fprintf(stderr, "Commit returned an unexpected error: %s\n",
245 s.ToString().c_str());
246 }
247 }
248 } else {
249 s = db->Write(write_options_, &batch);
250 if (!s.ok()) {
251 unexpected_error = true;
252 fprintf(stderr, "Write returned an unexpected error: %s\n",
253 s.ToString().c_str());
254 }
255 }
256 } else {
257 if (txn != nullptr) {
258 assert(txn->Rollback().ok());
259 ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, "Error %s for txn %s",
260 s.ToString().c_str(), txn->GetName().c_str());
261 }
262 }
263
264 if (s.ok()) {
265 success_count_++;
266 } else {
267 failure_count_++;
268 }
269
270 last_status_ = s;
271
272 // return success if we didn't get any unexpected errors
273 return !unexpected_error;
274 }
275
276 // Verify that the sum of the keys in each set are equal
277 Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets,
278 uint64_t num_keys_per_set,
279 bool take_snapshot, Random64* rand,
280 uint64_t delay_ms) {
281 // delay_ms is the delay between taking a snapshot and doing the reads. It
282 // emulates reads from a long-running backup job.
283 assert(delay_ms == 0 || take_snapshot);
284 uint64_t prev_total = 0;
285 uint32_t prev_i = 0;
286 bool prev_assigned = false;
287
288 ReadOptions roptions;
289 if (take_snapshot) {
290 roptions.snapshot = db->GetSnapshot();
291 db->GetDBOptions().env->SleepForMicroseconds(
292 static_cast<int>(delay_ms * 1000));
293 }
294
295 std::vector<uint16_t> set_vec(num_sets);
296 std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0));
297 std::shuffle(set_vec.begin(), set_vec.end(), std::random_device{});
298
299 // For each set of keys with the same prefix, sum all the values
300 for (uint16_t set_i : set_vec) {
301 // Five digits (since the largest uint16_t is 65535) plus the NUL
302 // end char.
303 char prefix_buf[6];
304 assert(set_i + 1 <= 9999);
305 snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1);
306 uint64_t total = 0;
307
308 // Use either point lookup or iterator. Point lookups are slower so we use
309 // it less often.
310 const bool use_point_lookup =
311 num_keys_per_set != 0 && rand && rand->OneIn(10);
312 if (use_point_lookup) {
313 ReadOptions read_options;
314 for (uint64_t k = 0; k < num_keys_per_set; k++) {
315 std::string dont_care;
316 uint64_t int_value = 0;
317 bool unexpected_error = false;
318 const bool FOR_UPDATE = false;
319 Status s = DBGet(db, nullptr, roptions, set_i, k, FOR_UPDATE,
320 &int_value, &dont_care, &unexpected_error);
321 assert(s.ok());
322 assert(!unexpected_error);
323 total += int_value;
324 }
325 } else { // user iterators
326 Iterator* iter = db->NewIterator(roptions);
327 for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) {
328 Slice key = iter->key();
329 // stop when we reach a different prefix
330 if (key.ToString().compare(0, 4, prefix_buf) != 0) {
331 break;
332 }
333 Slice value = iter->value();
334 uint64_t int_value = std::stoull(value.ToString());
335 if (int_value == 0 || int_value == ULONG_MAX) {
336 fprintf(stderr, "Iter returned unexpected value: %s\n",
337 value.ToString().c_str());
338 return Status::Corruption();
339 }
340 ROCKS_LOG_DEBUG(
341 db->GetDBOptions().info_log,
342 "VerifyRead at %" PRIu64 " (%" PRIu64 "): %.*s value: %" PRIu64,
343 roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul,
344 roptions.snapshot
345 ? ((SnapshotImpl*)roptions.snapshot)->min_uncommitted_
346 : 0ul,
347 static_cast<int>(key.size()), key.data(), int_value);
348 total += int_value;
349 }
350 delete iter;
351 }
352
353 if (prev_assigned && total != prev_total) {
354 db->GetDBOptions().info_log->Flush();
355 fprintf(stdout,
356 "RandomTransactionVerify found inconsistent totals using "
357 "pointlookup? %d "
358 "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64
359 " at snapshot %" PRIu64 "\n",
360 use_point_lookup, prev_i, prev_total, set_i, total,
361 roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul);
362 fflush(stdout);
363 return Status::Corruption();
364 } else {
365 ROCKS_LOG_DEBUG(
366 db->GetDBOptions().info_log,
367 "RandomTransactionVerify pass pointlookup? %d total: %" PRIu64
368 " snap: %" PRIu64,
369 use_point_lookup, total,
370 roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul);
371 }
372 prev_total = total;
373 prev_i = set_i;
374 prev_assigned = true;
375 }
376 if (take_snapshot) {
377 db->ReleaseSnapshot(roptions.snapshot);
378 }
379
380 return Status::OK();
381 }
382
383 } // namespace rocksdb
384
385 #endif // ROCKSDB_LITE