]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/test_util/transaction_test_util.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / test_util / transaction_test_util.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 #ifndef ROCKSDB_LITE
6
7 #include "test_util/transaction_test_util.h"
8
9 #include <algorithm>
10 #include <cinttypes>
11 #include <numeric>
12 #include <random>
13 #include <string>
14 #include <thread>
15
16 #include "rocksdb/db.h"
17 #include "rocksdb/utilities/optimistic_transaction_db.h"
18 #include "rocksdb/utilities/transaction.h"
19 #include "rocksdb/utilities/transaction_db.h"
20
21 #include "db/dbformat.h"
22 #include "db/snapshot_impl.h"
23 #include "logging/logging.h"
24 #include "util/random.h"
25 #include "util/string_util.h"
26
27 namespace ROCKSDB_NAMESPACE {
28
29 RandomTransactionInserter::RandomTransactionInserter(
30 Random64* rand, const WriteOptions& write_options,
31 const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets,
32 const uint64_t cmt_delay_ms, const uint64_t first_id)
33 : rand_(rand),
34 write_options_(write_options),
35 read_options_(read_options),
36 num_keys_(num_keys),
37 num_sets_(num_sets),
38 txn_id_(first_id),
39 cmt_delay_ms_(cmt_delay_ms) {}
40
41 RandomTransactionInserter::~RandomTransactionInserter() {
42 if (txn_ != nullptr) {
43 delete txn_;
44 }
45 if (optimistic_txn_ != nullptr) {
46 delete optimistic_txn_;
47 }
48 }
49
50 bool RandomTransactionInserter::TransactionDBInsert(
51 TransactionDB* db, const TransactionOptions& txn_options) {
52 txn_ = db->BeginTransaction(write_options_, txn_options, txn_);
53
54 std::hash<std::thread::id> hasher;
55 char name[64];
56 snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%" PRIu64,
57 hasher(std::this_thread::get_id()), txn_id_++);
58 assert(strlen(name) < 64 - 1);
59 assert(txn_->SetName(name).ok());
60
61 // Take a snapshot if set_snapshot was not set or with 50% change otherwise
62 bool take_snapshot = txn_->GetSnapshot() == nullptr || rand_->OneIn(2);
63 if (take_snapshot) {
64 txn_->SetSnapshot();
65 read_options_.snapshot = txn_->GetSnapshot();
66 }
67 auto res = DoInsert(db, txn_, false);
68 if (take_snapshot) {
69 read_options_.snapshot = nullptr;
70 }
71 return res;
72 }
73
74 bool RandomTransactionInserter::OptimisticTransactionDBInsert(
75 OptimisticTransactionDB* db,
76 const OptimisticTransactionOptions& txn_options) {
77 optimistic_txn_ =
78 db->BeginTransaction(write_options_, txn_options, optimistic_txn_);
79
80 return DoInsert(db, optimistic_txn_, true);
81 }
82
83 bool RandomTransactionInserter::DBInsert(DB* db) {
84 return DoInsert(db, nullptr, false);
85 }
86
87 Status RandomTransactionInserter::DBGet(
88 DB* db, Transaction* txn, ReadOptions& read_options, uint16_t set_i,
89 uint64_t ikey, bool get_for_update, uint64_t* int_value,
90 std::string* full_key, bool* unexpected_error) {
91 Status s;
92 // Five digits (since the largest uint16_t is 65535) plus the NUL
93 // end char.
94 char prefix_buf[6];
95 // Pad prefix appropriately so we can iterate over each set
96 assert(set_i + 1 <= 9999);
97 snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1);
98 // key format: [SET#][random#]
99 std::string skey = ToString(ikey);
100 Slice base_key(skey);
101 *full_key = std::string(prefix_buf) + base_key.ToString();
102 Slice key(*full_key);
103
104 std::string value;
105 if (txn != nullptr) {
106 if (get_for_update) {
107 s = txn->GetForUpdate(read_options, key, &value);
108 } else {
109 s = txn->Get(read_options, key, &value);
110 }
111 } else {
112 s = db->Get(read_options, key, &value);
113 }
114
115 if (s.ok()) {
116 // Found key, parse its value
117 *int_value = std::stoull(value);
118 if (*int_value == 0 || *int_value == ULONG_MAX) {
119 *unexpected_error = true;
120 fprintf(stderr, "Get returned unexpected value: %s\n", value.c_str());
121 s = Status::Corruption();
122 }
123 } else if (s.IsNotFound()) {
124 // Have not yet written to this key, so assume its value is 0
125 *int_value = 0;
126 s = Status::OK();
127 }
128 return s;
129 }
130
131 bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn,
132 bool is_optimistic) {
133 Status s;
134 WriteBatch batch;
135
136 // pick a random number to use to increment a key in each set
137 uint64_t incr = (rand_->Next() % 100) + 1;
138 bool unexpected_error = false;
139
140 std::vector<uint16_t> set_vec(num_sets_);
141 std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0));
142 std::shuffle(set_vec.begin(), set_vec.end(), std::random_device{});
143
144 // For each set, pick a key at random and increment it
145 for (uint16_t set_i : set_vec) {
146 uint64_t int_value = 0;
147 std::string full_key;
148 uint64_t rand_key = rand_->Next() % num_keys_;
149 const bool get_for_update = txn ? rand_->OneIn(2) : false;
150 s = DBGet(db, txn, read_options_, set_i, rand_key, get_for_update,
151 &int_value, &full_key, &unexpected_error);
152 Slice key(full_key);
153 if (!s.ok()) {
154 // Optimistic transactions should never return non-ok status here.
155 // Non-optimistic transactions may return write-coflict/timeout errors.
156 if (is_optimistic || !(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) {
157 fprintf(stderr, "Get returned an unexpected error: %s\n",
158 s.ToString().c_str());
159 unexpected_error = true;
160 }
161 break;
162 }
163
164 if (s.ok()) {
165 // Increment key
166 std::string sum = ToString(int_value + incr);
167 if (txn != nullptr) {
168 s = txn->Put(key, sum);
169 if (!get_for_update && (s.IsBusy() || s.IsTimedOut())) {
170 // If the initial get was not for update, then the key is not locked
171 // before put and put could fail due to concurrent writes.
172 break;
173 } else if (!s.ok()) {
174 // Since we did a GetForUpdate, Put should not fail.
175 fprintf(stderr, "Put returned an unexpected error: %s\n",
176 s.ToString().c_str());
177 unexpected_error = true;
178 }
179 } else {
180 batch.Put(key, sum);
181 }
182 bytes_inserted_ += key.size() + sum.size();
183 }
184 if (txn != nullptr) {
185 ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
186 "Insert (%s) %s snap: %" PRIu64 " key:%s value: %" PRIu64
187 "+%" PRIu64 "=%" PRIu64,
188 txn->GetName().c_str(), s.ToString().c_str(),
189 txn->GetSnapshot()->GetSequenceNumber(), full_key.c_str(),
190 int_value, incr, int_value + incr);
191 }
192 }
193
194 if (s.ok()) {
195 if (txn != nullptr) {
196 bool with_prepare = !is_optimistic && !rand_->OneIn(10);
197 if (with_prepare) {
198 // Also try commit without prepare
199 s = txn->Prepare();
200 assert(s.ok());
201 ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
202 "Prepare of %" PRIu64 " %s (%s)", txn->GetId(),
203 s.ToString().c_str(), txn->GetName().c_str());
204 if (rand_->OneIn(20)) {
205 // This currently only tests the mechanics of writing commit time
206 // write batch so the exact values would not matter.
207 s = txn_->GetCommitTimeWriteBatch()->Put("cat", "dog");
208 assert(s.ok());
209 }
210 db->GetDBOptions().env->SleepForMicroseconds(
211 static_cast<int>(cmt_delay_ms_ * 1000));
212 }
213 if (!rand_->OneIn(20)) {
214 s = txn->Commit();
215 assert(!with_prepare || s.ok());
216 ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
217 "Commit of %" PRIu64 " %s (%s)", txn->GetId(),
218 s.ToString().c_str(), txn->GetName().c_str());
219 } else {
220 // Also try 5% rollback
221 s = txn->Rollback();
222 ROCKS_LOG_DEBUG(db->GetDBOptions().info_log,
223 "Rollback %" PRIu64 " %s %s", txn->GetId(),
224 txn->GetName().c_str(), s.ToString().c_str());
225 assert(s.ok());
226 }
227 assert(is_optimistic || s.ok());
228
229 if (!s.ok()) {
230 if (is_optimistic) {
231 // Optimistic transactions can have write-conflict errors on commit.
232 // Any other error is unexpected.
233 if (!(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) {
234 unexpected_error = true;
235 }
236 } else {
237 // Non-optimistic transactions should only fail due to expiration
238 // or write failures. For testing purproses, we do not expect any
239 // write failures.
240 if (!s.IsExpired()) {
241 unexpected_error = true;
242 }
243 }
244
245 if (unexpected_error) {
246 fprintf(stderr, "Commit returned an unexpected error: %s\n",
247 s.ToString().c_str());
248 }
249 }
250 } else {
251 s = db->Write(write_options_, &batch);
252 if (!s.ok()) {
253 unexpected_error = true;
254 fprintf(stderr, "Write returned an unexpected error: %s\n",
255 s.ToString().c_str());
256 }
257 }
258 } else {
259 if (txn != nullptr) {
260 assert(txn->Rollback().ok());
261 ROCKS_LOG_DEBUG(db->GetDBOptions().info_log, "Error %s for txn %s",
262 s.ToString().c_str(), txn->GetName().c_str());
263 }
264 }
265
266 if (s.ok()) {
267 success_count_++;
268 } else {
269 failure_count_++;
270 }
271
272 last_status_ = s;
273
274 // return success if we didn't get any unexpected errors
275 return !unexpected_error;
276 }
277
278 // Verify that the sum of the keys in each set are equal
279 Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets,
280 uint64_t num_keys_per_set,
281 bool take_snapshot, Random64* rand,
282 uint64_t delay_ms) {
283 // delay_ms is the delay between taking a snapshot and doing the reads. It
284 // emulates reads from a long-running backup job.
285 assert(delay_ms == 0 || take_snapshot);
286 uint64_t prev_total = 0;
287 uint32_t prev_i = 0;
288 bool prev_assigned = false;
289
290 ReadOptions roptions;
291 if (take_snapshot) {
292 roptions.snapshot = db->GetSnapshot();
293 db->GetDBOptions().env->SleepForMicroseconds(
294 static_cast<int>(delay_ms * 1000));
295 }
296
297 std::vector<uint16_t> set_vec(num_sets);
298 std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0));
299 std::shuffle(set_vec.begin(), set_vec.end(), std::random_device{});
300
301 // For each set of keys with the same prefix, sum all the values
302 for (uint16_t set_i : set_vec) {
303 // Five digits (since the largest uint16_t is 65535) plus the NUL
304 // end char.
305 char prefix_buf[6];
306 assert(set_i + 1 <= 9999);
307 snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1);
308 uint64_t total = 0;
309
310 // Use either point lookup or iterator. Point lookups are slower so we use
311 // it less often.
312 const bool use_point_lookup =
313 num_keys_per_set != 0 && rand && rand->OneIn(10);
314 if (use_point_lookup) {
315 ReadOptions read_options;
316 for (uint64_t k = 0; k < num_keys_per_set; k++) {
317 std::string dont_care;
318 uint64_t int_value = 0;
319 bool unexpected_error = false;
320 const bool FOR_UPDATE = false;
321 Status s = DBGet(db, nullptr, roptions, set_i, k, FOR_UPDATE,
322 &int_value, &dont_care, &unexpected_error);
323 assert(s.ok());
324 assert(!unexpected_error);
325 total += int_value;
326 }
327 } else { // user iterators
328 Iterator* iter = db->NewIterator(roptions);
329 for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) {
330 Slice key = iter->key();
331 // stop when we reach a different prefix
332 if (key.ToString().compare(0, 4, prefix_buf) != 0) {
333 break;
334 }
335 Slice value = iter->value();
336 uint64_t int_value = std::stoull(value.ToString());
337 if (int_value == 0 || int_value == ULONG_MAX) {
338 fprintf(stderr, "Iter returned unexpected value: %s\n",
339 value.ToString().c_str());
340 return Status::Corruption();
341 }
342 ROCKS_LOG_DEBUG(
343 db->GetDBOptions().info_log,
344 "VerifyRead at %" PRIu64 " (%" PRIu64 "): %.*s value: %" PRIu64,
345 roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul,
346 roptions.snapshot
347 ? ((SnapshotImpl*)roptions.snapshot)->min_uncommitted_
348 : 0ul,
349 static_cast<int>(key.size()), key.data(), int_value);
350 total += int_value;
351 }
352 delete iter;
353 }
354
355 if (prev_assigned && total != prev_total) {
356 db->GetDBOptions().info_log->Flush();
357 fprintf(stdout,
358 "RandomTransactionVerify found inconsistent totals using "
359 "pointlookup? %d "
360 "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64
361 " at snapshot %" PRIu64 "\n",
362 use_point_lookup, prev_i, prev_total, set_i, total,
363 roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul);
364 fflush(stdout);
365 return Status::Corruption();
366 } else {
367 ROCKS_LOG_DEBUG(
368 db->GetDBOptions().info_log,
369 "RandomTransactionVerify pass pointlookup? %d total: %" PRIu64
370 " snap: %" PRIu64,
371 use_point_lookup, total,
372 roptions.snapshot ? roptions.snapshot->GetSequenceNumber() : 0ul);
373 }
374 prev_total = total;
375 prev_i = set_i;
376 prev_assigned = true;
377 }
378 if (take_snapshot) {
379 db->ReleaseSnapshot(roptions.snapshot);
380 }
381
382 return Status::OK();
383 }
384
385 } // namespace ROCKSDB_NAMESPACE
386
387 #endif // ROCKSDB_LITE