]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | #ifndef ROCKSDB_LITE |
6 | ||
7 | #ifndef __STDC_FORMAT_MACROS | |
8 | #define __STDC_FORMAT_MACROS | |
9 | #endif | |
10 | ||
11 | #include "util/transaction_test_util.h" | |
12 | ||
13 | #include <inttypes.h> | |
11fdf7f2 TL |
14 | #include <algorithm> |
15 | #include <numeric> | |
7c673cae | 16 | #include <string> |
11fdf7f2 | 17 | #include <thread> |
7c673cae FG |
18 | |
19 | #include "rocksdb/db.h" | |
20 | #include "rocksdb/utilities/optimistic_transaction_db.h" | |
21 | #include "rocksdb/utilities/transaction.h" | |
22 | #include "rocksdb/utilities/transaction_db.h" | |
23 | #include "util/random.h" | |
24 | #include "util/string_util.h" | |
25 | ||
26 | namespace rocksdb { | |
27 | ||
28 | RandomTransactionInserter::RandomTransactionInserter( | |
29 | Random64* rand, const WriteOptions& write_options, | |
30 | const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets) | |
31 | : rand_(rand), | |
32 | write_options_(write_options), | |
33 | read_options_(read_options), | |
34 | num_keys_(num_keys), | |
11fdf7f2 TL |
35 | num_sets_(num_sets), |
36 | txn_id_(0) {} | |
7c673cae FG |
37 | |
38 | RandomTransactionInserter::~RandomTransactionInserter() { | |
39 | if (txn_ != nullptr) { | |
40 | delete txn_; | |
41 | } | |
42 | if (optimistic_txn_ != nullptr) { | |
43 | delete optimistic_txn_; | |
44 | } | |
45 | } | |
46 | ||
47 | bool RandomTransactionInserter::TransactionDBInsert( | |
48 | TransactionDB* db, const TransactionOptions& txn_options) { | |
49 | txn_ = db->BeginTransaction(write_options_, txn_options, txn_); | |
50 | ||
11fdf7f2 TL |
51 | std::hash<std::thread::id> hasher; |
52 | char name[64]; | |
53 | snprintf(name, 64, "txn%" ROCKSDB_PRIszt "-%d", | |
54 | hasher(std::this_thread::get_id()), txn_id_++); | |
55 | assert(strlen(name) < 64 - 1); | |
56 | txn_->SetName(name); | |
57 | ||
58 | bool take_snapshot = rand_->OneIn(2); | |
59 | if (take_snapshot) { | |
60 | txn_->SetSnapshot(); | |
61 | read_options_.snapshot = txn_->GetSnapshot(); | |
62 | } | |
63 | auto res = DoInsert(nullptr, txn_, false); | |
64 | if (take_snapshot) { | |
65 | read_options_.snapshot = nullptr; | |
66 | } | |
67 | return res; | |
7c673cae FG |
68 | } |
69 | ||
70 | bool RandomTransactionInserter::OptimisticTransactionDBInsert( | |
71 | OptimisticTransactionDB* db, | |
72 | const OptimisticTransactionOptions& txn_options) { | |
73 | optimistic_txn_ = | |
74 | db->BeginTransaction(write_options_, txn_options, optimistic_txn_); | |
75 | ||
76 | return DoInsert(nullptr, optimistic_txn_, true); | |
77 | } | |
78 | ||
79 | bool RandomTransactionInserter::DBInsert(DB* db) { | |
80 | return DoInsert(db, nullptr, false); | |
81 | } | |
82 | ||
11fdf7f2 TL |
83 | Status RandomTransactionInserter::DBGet( |
84 | DB* db, Transaction* txn, ReadOptions& read_options, uint16_t set_i, | |
85 | uint64_t ikey, bool get_for_update, uint64_t* int_value, | |
86 | std::string* full_key, bool* unexpected_error) { | |
87 | Status s; | |
88 | // Five digits (since the largest uint16_t is 65535) plus the NUL | |
89 | // end char. | |
90 | char prefix_buf[6]; | |
91 | // Pad prefix appropriately so we can iterate over each set | |
92 | assert(set_i + 1 <= 9999); | |
93 | snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1); | |
94 | // key format: [SET#][random#] | |
95 | std::string skey = ToString(ikey); | |
96 | Slice base_key(skey); | |
97 | *full_key = std::string(prefix_buf) + base_key.ToString(); | |
98 | Slice key(*full_key); | |
99 | ||
100 | std::string value; | |
101 | if (txn != nullptr) { | |
102 | if (get_for_update) { | |
103 | s = txn->GetForUpdate(read_options, key, &value); | |
104 | } else { | |
105 | s = txn->Get(read_options, key, &value); | |
106 | } | |
107 | } else { | |
108 | s = db->Get(read_options, key, &value); | |
109 | } | |
110 | ||
111 | if (s.ok()) { | |
112 | // Found key, parse its value | |
113 | *int_value = std::stoull(value); | |
114 | if (*int_value == 0 || *int_value == ULONG_MAX) { | |
115 | *unexpected_error = true; | |
116 | fprintf(stderr, "Get returned unexpected value: %s\n", value.c_str()); | |
117 | s = Status::Corruption(); | |
118 | } | |
119 | } else if (s.IsNotFound()) { | |
120 | // Have not yet written to this key, so assume its value is 0 | |
121 | *int_value = 0; | |
122 | s = Status::OK(); | |
123 | } | |
124 | return s; | |
125 | } | |
126 | ||
7c673cae FG |
127 | bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, |
128 | bool is_optimistic) { | |
129 | Status s; | |
130 | WriteBatch batch; | |
7c673cae FG |
131 | |
132 | // pick a random number to use to increment a key in each set | |
133 | uint64_t incr = (rand_->Next() % 100) + 1; | |
7c673cae FG |
134 | bool unexpected_error = false; |
135 | ||
11fdf7f2 TL |
136 | std::vector<uint16_t> set_vec(num_sets_); |
137 | std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0)); | |
138 | std::random_shuffle(set_vec.begin(), set_vec.end(), | |
139 | [&](uint64_t r) { return rand_->Uniform(r); }); | |
140 | ||
7c673cae | 141 | // For each set, pick a key at random and increment it |
11fdf7f2 | 142 | for (uint16_t set_i : set_vec) { |
7c673cae | 143 | uint64_t int_value = 0; |
11fdf7f2 TL |
144 | std::string full_key; |
145 | uint64_t rand_key = rand_->Next() % num_keys_; | |
146 | const bool get_for_update = txn ? rand_->OneIn(2) : false; | |
147 | s = DBGet(db, txn, read_options_, set_i, rand_key, get_for_update, | |
148 | &int_value, &full_key, &unexpected_error); | |
7c673cae | 149 | Slice key(full_key); |
11fdf7f2 | 150 | if (!s.ok()) { |
7c673cae FG |
151 | // Optimistic transactions should never return non-ok status here. |
152 | // Non-optimistic transactions may return write-coflict/timeout errors. | |
153 | if (is_optimistic || !(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { | |
154 | fprintf(stderr, "Get returned an unexpected error: %s\n", | |
155 | s.ToString().c_str()); | |
156 | unexpected_error = true; | |
157 | } | |
158 | break; | |
159 | } | |
160 | ||
161 | if (s.ok()) { | |
162 | // Increment key | |
163 | std::string sum = ToString(int_value + incr); | |
164 | if (txn != nullptr) { | |
165 | s = txn->Put(key, sum); | |
11fdf7f2 TL |
166 | if (!get_for_update && (s.IsBusy() || s.IsTimedOut())) { |
167 | // If the initial get was not for update, then the key is not locked | |
168 | // before put and put could fail due to concurrent writes. | |
169 | break; | |
170 | } else if (!s.ok()) { | |
7c673cae FG |
171 | // Since we did a GetForUpdate, Put should not fail. |
172 | fprintf(stderr, "Put returned an unexpected error: %s\n", | |
173 | s.ToString().c_str()); | |
174 | unexpected_error = true; | |
175 | } | |
176 | } else { | |
177 | batch.Put(key, sum); | |
178 | } | |
11fdf7f2 | 179 | bytes_inserted_ += key.size() + sum.size(); |
7c673cae FG |
180 | } |
181 | } | |
182 | ||
183 | if (s.ok()) { | |
184 | if (txn != nullptr) { | |
11fdf7f2 TL |
185 | if (!is_optimistic && !rand_->OneIn(10)) { |
186 | // also try commit without prpare | |
187 | s = txn->Prepare(); | |
188 | assert(s.ok()); | |
189 | } | |
190 | if (!rand_->OneIn(20)) { | |
191 | s = txn->Commit(); | |
192 | } else { | |
193 | // Also try 5% rollback | |
194 | s = txn->Rollback(); | |
195 | assert(s.ok()); | |
196 | } | |
197 | assert(is_optimistic || s.ok()); | |
7c673cae FG |
198 | |
199 | if (!s.ok()) { | |
200 | if (is_optimistic) { | |
201 | // Optimistic transactions can have write-conflict errors on commit. | |
202 | // Any other error is unexpected. | |
203 | if (!(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { | |
204 | unexpected_error = true; | |
205 | } | |
206 | } else { | |
207 | // Non-optimistic transactions should only fail due to expiration | |
208 | // or write failures. For testing purproses, we do not expect any | |
209 | // write failures. | |
210 | if (!s.IsExpired()) { | |
211 | unexpected_error = true; | |
212 | } | |
213 | } | |
214 | ||
215 | if (unexpected_error) { | |
216 | fprintf(stderr, "Commit returned an unexpected error: %s\n", | |
217 | s.ToString().c_str()); | |
218 | } | |
219 | } | |
7c673cae FG |
220 | } else { |
221 | s = db->Write(write_options_, &batch); | |
222 | if (!s.ok()) { | |
223 | unexpected_error = true; | |
224 | fprintf(stderr, "Write returned an unexpected error: %s\n", | |
225 | s.ToString().c_str()); | |
226 | } | |
227 | } | |
228 | } else { | |
229 | if (txn != nullptr) { | |
11fdf7f2 | 230 | assert(txn->Rollback().ok()); |
7c673cae FG |
231 | } |
232 | } | |
233 | ||
234 | if (s.ok()) { | |
235 | success_count_++; | |
236 | } else { | |
237 | failure_count_++; | |
238 | } | |
239 | ||
240 | last_status_ = s; | |
241 | ||
242 | // return success if we didn't get any unexpected errors | |
243 | return !unexpected_error; | |
244 | } | |
245 | ||
11fdf7f2 TL |
246 | // Verify that the sum of the keys in each set are equal |
247 | Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets, | |
248 | uint64_t num_keys_per_set, | |
249 | bool take_snapshot, Random64* rand) { | |
7c673cae | 250 | uint64_t prev_total = 0; |
11fdf7f2 TL |
251 | uint32_t prev_i = 0; |
252 | bool prev_assigned = false; | |
7c673cae | 253 | |
11fdf7f2 TL |
254 | ReadOptions roptions; |
255 | if (take_snapshot) { | |
256 | roptions.snapshot = db->GetSnapshot(); | |
257 | } | |
258 | ||
259 | std::vector<uint16_t> set_vec(num_sets); | |
260 | std::iota(set_vec.begin(), set_vec.end(), static_cast<uint16_t>(0)); | |
261 | if (rand) { | |
262 | std::random_shuffle(set_vec.begin(), set_vec.end(), | |
263 | [&](uint64_t r) { return rand->Uniform(r); }); | |
264 | } | |
7c673cae | 265 | // For each set of keys with the same prefix, sum all the values |
11fdf7f2 TL |
266 | for (uint16_t set_i : set_vec) { |
267 | // Five digits (since the largest uint16_t is 65535) plus the NUL | |
268 | // end char. | |
7c673cae | 269 | char prefix_buf[6]; |
11fdf7f2 TL |
270 | assert(set_i + 1 <= 9999); |
271 | snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", set_i + 1); | |
7c673cae FG |
272 | uint64_t total = 0; |
273 | ||
11fdf7f2 TL |
274 | // Use either point lookup or iterator. Point lookups are slower so we use |
275 | // it less often. | |
276 | if (num_keys_per_set != 0 && rand && rand->OneIn(10)) { // use point lookup | |
277 | ReadOptions read_options; | |
278 | for (uint64_t k = 0; k < num_keys_per_set; k++) { | |
279 | std::string dont_care; | |
280 | uint64_t int_value = 0; | |
281 | bool unexpected_error = false; | |
282 | const bool FOR_UPDATE = false; | |
283 | Status s = DBGet(db, nullptr, roptions, set_i, k, FOR_UPDATE, | |
284 | &int_value, &dont_care, &unexpected_error); | |
285 | assert(s.ok()); | |
286 | assert(!unexpected_error); | |
287 | total += int_value; | |
7c673cae | 288 | } |
11fdf7f2 TL |
289 | } else { // user iterators |
290 | Iterator* iter = db->NewIterator(roptions); | |
291 | for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) { | |
292 | Slice key = iter->key(); | |
293 | // stop when we reach a different prefix | |
294 | if (key.ToString().compare(0, 4, prefix_buf) != 0) { | |
295 | break; | |
296 | } | |
297 | Slice value = iter->value(); | |
298 | uint64_t int_value = std::stoull(value.ToString()); | |
299 | if (int_value == 0 || int_value == ULONG_MAX) { | |
300 | fprintf(stderr, "Iter returned unexpected value: %s\n", | |
301 | value.ToString().c_str()); | |
302 | return Status::Corruption(); | |
303 | } | |
304 | total += int_value; | |
7c673cae | 305 | } |
11fdf7f2 | 306 | delete iter; |
7c673cae | 307 | } |
11fdf7f2 TL |
308 | |
309 | if (prev_assigned && total != prev_total) { | |
310 | fprintf(stdout, | |
311 | "RandomTransactionVerify found inconsistent totals. " | |
312 | "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64 " \n", | |
313 | prev_i, prev_total, set_i, total); | |
314 | return Status::Corruption(); | |
7c673cae FG |
315 | } |
316 | prev_total = total; | |
11fdf7f2 TL |
317 | prev_i = set_i; |
318 | prev_assigned = true; | |
319 | } | |
320 | if (take_snapshot) { | |
321 | db->ReleaseSnapshot(roptions.snapshot); | |
7c673cae FG |
322 | } |
323 | ||
324 | return Status::OK(); | |
325 | } | |
326 | ||
327 | } // namespace rocksdb | |
328 | ||
329 | #endif // ROCKSDB_LITE |