]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db_stress_tool/no_batched_ops_stress.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db_stress_tool / no_batched_ops_stress.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #ifdef GFLAGS
11 #include "db_stress_tool/db_stress_common.h"
12 #include "rocksdb/utilities/transaction_db.h"
13 #include "utilities/fault_injection_fs.h"
14
15 namespace ROCKSDB_NAMESPACE {
16 class NonBatchedOpsStressTest : public StressTest {
17 public:
18 NonBatchedOpsStressTest() {}
19
20 virtual ~NonBatchedOpsStressTest() {}
21
22 void VerifyDb(ThreadState* thread) const override {
23 // This `ReadOptions` is for validation purposes. Ignore
24 // `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
25 ReadOptions options(FLAGS_verify_checksum, true);
26 std::string ts_str;
27 Slice ts;
28 if (FLAGS_user_timestamp_size > 0) {
29 ts_str = GetNowNanos();
30 ts = ts_str;
31 options.timestamp = &ts;
32 }
33
34 auto shared = thread->shared;
35 const int64_t max_key = shared->GetMaxKey();
36 const int64_t keys_per_thread = max_key / shared->GetNumThreads();
37 int64_t start = keys_per_thread * thread->tid;
38 int64_t end = start + keys_per_thread;
39 uint64_t prefix_to_use =
40 (FLAGS_prefix_size < 0) ? 1 : static_cast<size_t>(FLAGS_prefix_size);
41
42 if (thread->tid == shared->GetNumThreads() - 1) {
43 end = max_key;
44 }
45
46 for (size_t cf = 0; cf < column_families_.size(); ++cf) {
47 if (thread->shared->HasVerificationFailedYet()) {
48 break;
49 }
50
51 enum class VerificationMethod {
52 kIterator,
53 kGet,
54 kMultiGet,
55 kGetMergeOperands,
56 // Add any new items above kNumberOfMethods
57 kNumberOfMethods
58 };
59
60 constexpr int num_methods =
61 static_cast<int>(VerificationMethod::kNumberOfMethods);
62
63 const VerificationMethod method =
64 static_cast<VerificationMethod>(thread->rand.Uniform(
65 (FLAGS_user_timestamp_size > 0) ? num_methods - 1 : num_methods));
66
67 if (method == VerificationMethod::kIterator) {
68 std::unique_ptr<Iterator> iter(
69 db_->NewIterator(options, column_families_[cf]));
70
71 std::string seek_key = Key(start);
72 iter->Seek(seek_key);
73
74 Slice prefix(seek_key.data(), prefix_to_use);
75
76 for (int64_t i = start; i < end; ++i) {
77 if (thread->shared->HasVerificationFailedYet()) {
78 break;
79 }
80
81 const std::string key = Key(i);
82 const Slice k(key);
83 const Slice pfx(key.data(), prefix_to_use);
84
85 // Reseek when the prefix changes
86 if (prefix_to_use > 0 && prefix.compare(pfx) != 0) {
87 iter->Seek(k);
88 seek_key = key;
89 prefix = Slice(seek_key.data(), prefix_to_use);
90 }
91
92 Status s = iter->status();
93
94 std::string from_db;
95
96 if (iter->Valid()) {
97 const int diff = iter->key().compare(k);
98
99 if (diff > 0) {
100 s = Status::NotFound();
101 } else if (diff == 0) {
102 const WideColumns expected_columns = GenerateExpectedWideColumns(
103 GetValueBase(iter->value()), iter->value());
104 if (iter->columns() != expected_columns) {
105 VerificationAbort(shared, static_cast<int>(cf), i,
106 iter->value(), iter->columns(),
107 expected_columns);
108 break;
109 }
110
111 from_db = iter->value().ToString();
112 iter->Next();
113 } else {
114 assert(diff < 0);
115
116 VerificationAbort(shared, "An out of range key was found",
117 static_cast<int>(cf), i);
118 }
119 } else {
120 // The iterator found no value for the key in question, so do not
121 // move to the next item in the iterator
122 s = Status::NotFound();
123 }
124
125 VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
126 s, /* strict */ true);
127
128 if (!from_db.empty()) {
129 PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
130 from_db.data(), from_db.size());
131 }
132 }
133 } else if (method == VerificationMethod::kGet) {
134 for (int64_t i = start; i < end; ++i) {
135 if (thread->shared->HasVerificationFailedYet()) {
136 break;
137 }
138
139 const std::string key = Key(i);
140 std::string from_db;
141
142 Status s = db_->Get(options, column_families_[cf], key, &from_db);
143
144 VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
145 s, /* strict */ true);
146
147 if (!from_db.empty()) {
148 PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
149 from_db.data(), from_db.size());
150 }
151 }
152 } else if (method == VerificationMethod::kMultiGet) {
153 for (int64_t i = start; i < end;) {
154 if (thread->shared->HasVerificationFailedYet()) {
155 break;
156 }
157
158 // Keep the batch size to some reasonable value
159 size_t batch_size = thread->rand.Uniform(128) + 1;
160 batch_size = std::min<size_t>(batch_size, end - i);
161
162 std::vector<std::string> keystrs(batch_size);
163 std::vector<Slice> keys(batch_size);
164 std::vector<PinnableSlice> values(batch_size);
165 std::vector<Status> statuses(batch_size);
166
167 for (size_t j = 0; j < batch_size; ++j) {
168 keystrs[j] = Key(i + j);
169 keys[j] = Slice(keystrs[j].data(), keystrs[j].size());
170 }
171
172 db_->MultiGet(options, column_families_[cf], batch_size, keys.data(),
173 values.data(), statuses.data());
174
175 for (size_t j = 0; j < batch_size; ++j) {
176 const std::string from_db = values[j].ToString();
177
178 VerifyOrSyncValue(static_cast<int>(cf), i + j, options, shared,
179 from_db, statuses[j], /* strict */ true);
180
181 if (!from_db.empty()) {
182 PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i + j),
183 from_db.data(), from_db.size());
184 }
185 }
186
187 i += batch_size;
188 }
189 } else {
190 assert(method == VerificationMethod::kGetMergeOperands);
191
192 // Start off with small size that will be increased later if necessary
193 std::vector<PinnableSlice> values(4);
194
195 GetMergeOperandsOptions merge_operands_info;
196 merge_operands_info.expected_max_number_of_operands =
197 static_cast<int>(values.size());
198
199 for (int64_t i = start; i < end; ++i) {
200 if (thread->shared->HasVerificationFailedYet()) {
201 break;
202 }
203
204 const std::string key = Key(i);
205 const Slice k(key);
206 std::string from_db;
207 int number_of_operands = 0;
208
209 Status s = db_->GetMergeOperands(options, column_families_[cf], k,
210 values.data(), &merge_operands_info,
211 &number_of_operands);
212
213 if (s.IsIncomplete()) {
214 // Need to resize values as there are more than values.size() merge
215 // operands on this key. Should only happen a few times when we
216 // encounter a key that had more merge operands than any key seen so
217 // far
218 values.resize(number_of_operands);
219 merge_operands_info.expected_max_number_of_operands =
220 static_cast<int>(number_of_operands);
221 s = db_->GetMergeOperands(options, column_families_[cf], k,
222 values.data(), &merge_operands_info,
223 &number_of_operands);
224 }
225 // Assumed here that GetMergeOperands always sets number_of_operand
226 if (number_of_operands) {
227 from_db = values[number_of_operands - 1].ToString();
228 }
229
230 VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
231 s, /* strict */ true);
232
233 if (!from_db.empty()) {
234 PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
235 from_db.data(), from_db.size());
236 }
237 }
238 }
239 }
240 }
241
242 #ifndef ROCKSDB_LITE
243 void ContinuouslyVerifyDb(ThreadState* thread) const override {
244 if (!cmp_db_) {
245 return;
246 }
247 assert(cmp_db_);
248 assert(!cmp_cfhs_.empty());
249 Status s = cmp_db_->TryCatchUpWithPrimary();
250 if (!s.ok()) {
251 assert(false);
252 exit(1);
253 }
254
255 const auto checksum_column_family = [](Iterator* iter,
256 uint32_t* checksum) -> Status {
257 assert(nullptr != checksum);
258 uint32_t ret = 0;
259 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
260 ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
261 ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
262 }
263 *checksum = ret;
264 return iter->status();
265 };
266
267 auto* shared = thread->shared;
268 assert(shared);
269 const int64_t max_key = shared->GetMaxKey();
270 ReadOptions read_opts(FLAGS_verify_checksum, true);
271 std::string ts_str;
272 Slice ts;
273 if (FLAGS_user_timestamp_size > 0) {
274 ts_str = GetNowNanos();
275 ts = ts_str;
276 read_opts.timestamp = &ts;
277 }
278
279 static Random64 rand64(shared->GetSeed());
280
281 {
282 uint32_t crc = 0;
283 std::unique_ptr<Iterator> it(cmp_db_->NewIterator(read_opts));
284 s = checksum_column_family(it.get(), &crc);
285 if (!s.ok()) {
286 fprintf(stderr, "Computing checksum of default cf: %s\n",
287 s.ToString().c_str());
288 assert(false);
289 }
290 }
291
292 for (auto* handle : cmp_cfhs_) {
293 if (thread->rand.OneInOpt(3)) {
294 // Use Get()
295 uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
296 std::string key_str = Key(key);
297 std::string value;
298 std::string key_ts;
299 s = cmp_db_->Get(read_opts, handle, key_str, &value,
300 FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr);
301 s.PermitUncheckedError();
302 } else {
303 // Use range scan
304 std::unique_ptr<Iterator> iter(cmp_db_->NewIterator(read_opts, handle));
305 uint32_t rnd = (thread->rand.Next()) % 4;
306 if (0 == rnd) {
307 // SeekToFirst() + Next()*5
308 read_opts.total_order_seek = true;
309 iter->SeekToFirst();
310 for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
311 }
312 } else if (1 == rnd) {
313 // SeekToLast() + Prev()*5
314 read_opts.total_order_seek = true;
315 iter->SeekToLast();
316 for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
317 }
318 } else if (2 == rnd) {
319 // Seek() +Next()*5
320 uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
321 std::string key_str = Key(key);
322 iter->Seek(key_str);
323 for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
324 }
325 } else {
326 // SeekForPrev() + Prev()*5
327 uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
328 std::string key_str = Key(key);
329 iter->SeekForPrev(key_str);
330 for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
331 }
332 }
333 }
334 }
335 }
336 #else
337 void ContinuouslyVerifyDb(ThreadState* /*thread*/) const override {}
338 #endif // ROCKSDB_LITE
339
340 void MaybeClearOneColumnFamily(ThreadState* thread) override {
341 if (FLAGS_column_families > 1) {
342 if (thread->rand.OneInOpt(FLAGS_clear_column_family_one_in)) {
343 // drop column family and then create it again (can't drop default)
344 int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1;
345 std::string new_name =
346 std::to_string(new_column_family_name_.fetch_add(1));
347 {
348 MutexLock l(thread->shared->GetMutex());
349 fprintf(
350 stdout,
351 "[CF %d] Dropping and recreating column family. new name: %s\n",
352 cf, new_name.c_str());
353 }
354 thread->shared->LockColumnFamily(cf);
355 Status s = db_->DropColumnFamily(column_families_[cf]);
356 delete column_families_[cf];
357 if (!s.ok()) {
358 fprintf(stderr, "dropping column family error: %s\n",
359 s.ToString().c_str());
360 std::terminate();
361 }
362 s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
363 &column_families_[cf]);
364 column_family_names_[cf] = new_name;
365 thread->shared->ClearColumnFamily(cf);
366 if (!s.ok()) {
367 fprintf(stderr, "creating column family error: %s\n",
368 s.ToString().c_str());
369 std::terminate();
370 }
371 thread->shared->UnlockColumnFamily(cf);
372 }
373 }
374 }
375
376 bool ShouldAcquireMutexOnKey() const override { return true; }
377
378 bool IsStateTracked() const override { return true; }
379
380 Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
381 const std::vector<int>& rand_column_families,
382 const std::vector<int64_t>& rand_keys) override {
383 auto cfh = column_families_[rand_column_families[0]];
384 std::string key_str = Key(rand_keys[0]);
385 Slice key = key_str;
386 std::string from_db;
387 int error_count = 0;
388
389 if (fault_fs_guard) {
390 fault_fs_guard->EnableErrorInjection();
391 SharedState::ignore_read_error = false;
392 }
393
394 std::unique_ptr<MutexLock> lock(new MutexLock(
395 thread->shared->GetMutexForKey(rand_column_families[0], rand_keys[0])));
396
397 ReadOptions read_opts_copy = read_opts;
398 std::string read_ts_str;
399 Slice read_ts_slice;
400 bool read_older_ts = MaybeUseOlderTimestampForPointLookup(
401 thread, read_ts_str, read_ts_slice, read_opts_copy);
402
403 Status s = db_->Get(read_opts_copy, cfh, key, &from_db);
404 if (fault_fs_guard) {
405 error_count = fault_fs_guard->GetAndResetErrorCount();
406 }
407 if (s.ok()) {
408 if (fault_fs_guard) {
409 if (error_count && !SharedState::ignore_read_error) {
410 // Grab mutex so multiple thread don't try to print the
411 // stack trace at the same time
412 MutexLock l(thread->shared->GetMutex());
413 fprintf(stderr, "Didn't get expected error from Get\n");
414 fprintf(stderr, "Callstack that injected the fault\n");
415 fault_fs_guard->PrintFaultBacktrace();
416 std::terminate();
417 }
418 }
419 // found case
420 thread->stats.AddGets(1, 1);
421 // we only have the latest expected state
422 if (!FLAGS_skip_verifydb && !read_opts_copy.timestamp &&
423 thread->shared->Get(rand_column_families[0], rand_keys[0]) ==
424 SharedState::DELETION_SENTINEL) {
425 thread->shared->SetVerificationFailure();
426 fprintf(stderr,
427 "error : inconsistent values for key %s: Get returns %s, "
428 "expected state does not have the key.\n",
429 key.ToString(true).c_str(), StringToHex(from_db).c_str());
430 }
431 } else if (s.IsNotFound()) {
432 // not found case
433 thread->stats.AddGets(1, 0);
434 if (!FLAGS_skip_verifydb && !read_older_ts) {
435 auto expected =
436 thread->shared->Get(rand_column_families[0], rand_keys[0]);
437 if (expected != SharedState::DELETION_SENTINEL &&
438 expected != SharedState::UNKNOWN_SENTINEL) {
439 thread->shared->SetVerificationFailure();
440 fprintf(stderr,
441 "error : inconsistent values for key %s: expected state has "
442 "the key, Get() returns NotFound.\n",
443 key.ToString(true).c_str());
444 }
445 }
446 } else {
447 if (error_count == 0) {
448 // errors case
449 thread->stats.AddErrors(1);
450 } else {
451 thread->stats.AddVerifiedErrors(1);
452 }
453 }
454 if (fault_fs_guard) {
455 fault_fs_guard->DisableErrorInjection();
456 }
457 return s;
458 }
459
460 std::vector<Status> TestMultiGet(
461 ThreadState* thread, const ReadOptions& read_opts,
462 const std::vector<int>& rand_column_families,
463 const std::vector<int64_t>& rand_keys) override {
464 size_t num_keys = rand_keys.size();
465 std::vector<std::string> key_str;
466 std::vector<Slice> keys;
467 key_str.reserve(num_keys);
468 keys.reserve(num_keys);
469 std::vector<PinnableSlice> values(num_keys);
470 std::vector<Status> statuses(num_keys);
471 ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
472 int error_count = 0;
473 // Do a consistency check between Get and MultiGet. Don't do it too
474 // often as it will slow db_stress down
475 bool do_consistency_check = thread->rand.OneIn(4);
476
477 ReadOptions readoptionscopy = read_opts;
478 if (do_consistency_check) {
479 readoptionscopy.snapshot = db_->GetSnapshot();
480 }
481
482 std::string read_ts_str;
483 Slice read_ts_slice;
484 MaybeUseOlderTimestampForPointLookup(thread, read_ts_str, read_ts_slice,
485 readoptionscopy);
486
487 readoptionscopy.rate_limiter_priority =
488 FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
489
490 // To appease clang analyzer
491 const bool use_txn = FLAGS_use_txn;
492
493 // Create a transaction in order to write some data. The purpose is to
494 // exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction
495 // will be rolled back once MultiGet returns.
496 #ifndef ROCKSDB_LITE
497 Transaction* txn = nullptr;
498 if (use_txn) {
499 WriteOptions wo;
500 if (FLAGS_rate_limit_auto_wal_flush) {
501 wo.rate_limiter_priority = Env::IO_USER;
502 }
503 Status s = NewTxn(wo, &txn);
504 if (!s.ok()) {
505 fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str());
506 std::terminate();
507 }
508 }
509 #endif
510 for (size_t i = 0; i < num_keys; ++i) {
511 key_str.emplace_back(Key(rand_keys[i]));
512 keys.emplace_back(key_str.back());
513 #ifndef ROCKSDB_LITE
514 if (use_txn) {
515 // With a 1 in 10 probability, insert the just added key in the batch
516 // into the transaction. This will create an overlap with the MultiGet
517 // keys and exercise some corner cases in the code
518 if (thread->rand.OneIn(10)) {
519 int op = thread->rand.Uniform(2);
520 Status s;
521 switch (op) {
522 case 0:
523 case 1: {
524 uint32_t value_base =
525 thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
526 char value[100];
527 size_t sz = GenerateValue(value_base, value, sizeof(value));
528 Slice v(value, sz);
529 if (op == 0) {
530 s = txn->Put(cfh, keys.back(), v);
531 } else {
532 s = txn->Merge(cfh, keys.back(), v);
533 }
534 break;
535 }
536 case 2:
537 s = txn->Delete(cfh, keys.back());
538 break;
539 default:
540 assert(false);
541 }
542 if (!s.ok()) {
543 fprintf(stderr, "Transaction put: %s\n", s.ToString().c_str());
544 std::terminate();
545 }
546 }
547 }
548 #endif
549 }
550
551 if (!use_txn) {
552 if (fault_fs_guard) {
553 fault_fs_guard->EnableErrorInjection();
554 SharedState::ignore_read_error = false;
555 }
556 db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
557 statuses.data());
558 if (fault_fs_guard) {
559 error_count = fault_fs_guard->GetAndResetErrorCount();
560 }
561 } else {
562 #ifndef ROCKSDB_LITE
563 txn->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
564 statuses.data());
565 #endif
566 }
567
568 if (fault_fs_guard && error_count && !SharedState::ignore_read_error) {
569 int stat_nok = 0;
570 for (const auto& s : statuses) {
571 if (!s.ok() && !s.IsNotFound()) {
572 stat_nok++;
573 }
574 }
575
576 if (stat_nok < error_count) {
577 // Grab mutex so multiple thread don't try to print the
578 // stack trace at the same time
579 MutexLock l(thread->shared->GetMutex());
580 fprintf(stderr, "Didn't get expected error from MultiGet. \n");
581 fprintf(stderr, "num_keys %zu Expected %d errors, seen %d\n", num_keys,
582 error_count, stat_nok);
583 fprintf(stderr, "Callstack that injected the fault\n");
584 fault_fs_guard->PrintFaultBacktrace();
585 std::terminate();
586 }
587 }
588 if (fault_fs_guard) {
589 fault_fs_guard->DisableErrorInjection();
590 }
591
592 for (size_t i = 0; i < statuses.size(); ++i) {
593 Status s = statuses[i];
594 bool is_consistent = true;
595 // Only do the consistency check if no error was injected and MultiGet
596 // didn't return an unexpected error
597 if (do_consistency_check && !error_count && (s.ok() || s.IsNotFound())) {
598 Status tmp_s;
599 std::string value;
600
601 if (use_txn) {
602 #ifndef ROCKSDB_LITE
603 tmp_s = txn->Get(readoptionscopy, cfh, keys[i], &value);
604 #endif // ROCKSDB_LITE
605 } else {
606 tmp_s = db_->Get(readoptionscopy, cfh, keys[i], &value);
607 }
608 if (!tmp_s.ok() && !tmp_s.IsNotFound()) {
609 fprintf(stderr, "Get error: %s\n", s.ToString().c_str());
610 is_consistent = false;
611 } else if (!s.ok() && tmp_s.ok()) {
612 fprintf(stderr, "MultiGet returned different results with key %s\n",
613 keys[i].ToString(true).c_str());
614 fprintf(stderr, "Get returned ok, MultiGet returned not found\n");
615 is_consistent = false;
616 } else if (s.ok() && tmp_s.IsNotFound()) {
617 fprintf(stderr, "MultiGet returned different results with key %s\n",
618 keys[i].ToString(true).c_str());
619 fprintf(stderr, "MultiGet returned ok, Get returned not found\n");
620 is_consistent = false;
621 } else if (s.ok() && value != values[i].ToString()) {
622 fprintf(stderr, "MultiGet returned different results with key %s\n",
623 keys[i].ToString(true).c_str());
624 fprintf(stderr, "MultiGet returned value %s\n",
625 values[i].ToString(true).c_str());
626 fprintf(stderr, "Get returned value %s\n",
627 Slice(value).ToString(true /* hex */).c_str());
628 is_consistent = false;
629 }
630 }
631
632 if (!is_consistent) {
633 fprintf(stderr, "TestMultiGet error: is_consistent is false\n");
634 thread->stats.AddErrors(1);
635 // Fail fast to preserve the DB state
636 thread->shared->SetVerificationFailure();
637 break;
638 } else if (s.ok()) {
639 // found case
640 thread->stats.AddGets(1, 1);
641 } else if (s.IsNotFound()) {
642 // not found case
643 thread->stats.AddGets(1, 0);
644 } else if (s.IsMergeInProgress() && use_txn) {
645 // With txn this is sometimes expected.
646 thread->stats.AddGets(1, 1);
647 } else {
648 if (error_count == 0) {
649 // errors case
650 fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
651 thread->stats.AddErrors(1);
652 } else {
653 thread->stats.AddVerifiedErrors(1);
654 }
655 }
656 }
657
658 if (readoptionscopy.snapshot) {
659 db_->ReleaseSnapshot(readoptionscopy.snapshot);
660 }
661 if (use_txn) {
662 #ifndef ROCKSDB_LITE
663 RollbackTxn(txn);
664 #endif
665 }
666 return statuses;
667 }
668
669 Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
670 const std::vector<int>& rand_column_families,
671 const std::vector<int64_t>& rand_keys) override {
672 assert(!rand_column_families.empty());
673 assert(!rand_keys.empty());
674
675 ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
676 assert(cfh);
677
678 const std::string key = Key(rand_keys[0]);
679 const Slice prefix(key.data(), FLAGS_prefix_size);
680
681 std::string upper_bound;
682 Slice ub_slice;
683 ReadOptions ro_copy = read_opts;
684
685 // Get the next prefix first and then see if we want to set upper bound.
686 // We'll use the next prefix in an assertion later on
687 if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
688 // For half of the time, set the upper bound to the next prefix
689 ub_slice = Slice(upper_bound);
690 ro_copy.iterate_upper_bound = &ub_slice;
691 }
692
693 std::string read_ts_str;
694 Slice read_ts_slice;
695 MaybeUseOlderTimestampForRangeScan(thread, read_ts_str, read_ts_slice,
696 ro_copy);
697
698 std::unique_ptr<Iterator> iter(db_->NewIterator(ro_copy, cfh));
699
700 uint64_t count = 0;
701 Status s;
702
703 for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
704 iter->Next()) {
705 ++count;
706
707 // When iter_start_ts is set, iterator exposes internal keys, including
708 // tombstones; however, we want to perform column validation only for
709 // value-like types.
710 if (ro_copy.iter_start_ts) {
711 const ValueType value_type = ExtractValueType(iter->key());
712 if (value_type != kTypeValue && value_type != kTypeBlobIndex &&
713 value_type != kTypeWideColumnEntity) {
714 continue;
715 }
716 }
717
718 const WideColumns expected_columns = GenerateExpectedWideColumns(
719 GetValueBase(iter->value()), iter->value());
720 if (iter->columns() != expected_columns) {
721 s = Status::Corruption(
722 "Value and columns inconsistent",
723 DebugString(iter->value(), iter->columns(), expected_columns));
724 break;
725 }
726 }
727
728 if (ro_copy.iter_start_ts == nullptr) {
729 assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
730 }
731
732 if (s.ok()) {
733 s = iter->status();
734 }
735
736 if (!s.ok()) {
737 fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
738 thread->stats.AddErrors(1);
739
740 return s;
741 }
742
743 thread->stats.AddPrefixes(1, count);
744
745 return Status::OK();
746 }
747
748 Status TestPut(ThreadState* thread, WriteOptions& write_opts,
749 const ReadOptions& read_opts,
750 const std::vector<int>& rand_column_families,
751 const std::vector<int64_t>& rand_keys,
752 char (&value)[100]) override {
753 assert(!rand_column_families.empty());
754 assert(!rand_keys.empty());
755
756 auto shared = thread->shared;
757 assert(shared);
758
759 const int64_t max_key = shared->GetMaxKey();
760
761 int64_t rand_key = rand_keys[0];
762 int rand_column_family = rand_column_families[0];
763 std::string write_ts;
764
765 std::unique_ptr<MutexLock> lock(
766 new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
767 while (!shared->AllowsOverwrite(rand_key) &&
768 (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
769 lock.reset();
770
771 rand_key = thread->rand.Next() % max_key;
772 rand_column_family = thread->rand.Next() % FLAGS_column_families;
773
774 lock.reset(
775 new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
776 if (FLAGS_user_timestamp_size > 0) {
777 write_ts = GetNowNanos();
778 }
779 }
780
781 if (write_ts.empty() && FLAGS_user_timestamp_size) {
782 write_ts = GetNowNanos();
783 }
784
785 const std::string k = Key(rand_key);
786
787 ColumnFamilyHandle* const cfh = column_families_[rand_column_family];
788 assert(cfh);
789
790 if (FLAGS_verify_before_write) {
791 std::string from_db;
792 Status s = db_->Get(read_opts, cfh, k, &from_db);
793 if (!VerifyOrSyncValue(rand_column_family, rand_key, read_opts, shared,
794 from_db, s, /* strict */ true)) {
795 return s;
796 }
797 }
798
799 const uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
800 const size_t sz = GenerateValue(value_base, value, sizeof(value));
801 const Slice v(value, sz);
802
803 shared->Put(rand_column_family, rand_key, value_base, true /* pending */);
804
805 Status s;
806
807 if (FLAGS_use_merge) {
808 if (!FLAGS_use_txn) {
809 if (FLAGS_user_timestamp_size == 0) {
810 s = db_->Merge(write_opts, cfh, k, v);
811 } else {
812 s = db_->Merge(write_opts, cfh, k, write_ts, v);
813 }
814 } else {
815 #ifndef ROCKSDB_LITE
816 Transaction* txn;
817 s = NewTxn(write_opts, &txn);
818 if (s.ok()) {
819 s = txn->Merge(cfh, k, v);
820 if (s.ok()) {
821 s = CommitTxn(txn, thread);
822 }
823 }
824 #endif
825 }
826 } else if (FLAGS_use_put_entity_one_in > 0 &&
827 (value_base % FLAGS_use_put_entity_one_in) == 0) {
828 s = db_->PutEntity(write_opts, cfh, k,
829 GenerateWideColumns(value_base, v));
830 } else {
831 if (!FLAGS_use_txn) {
832 if (FLAGS_user_timestamp_size == 0) {
833 s = db_->Put(write_opts, cfh, k, v);
834 } else {
835 s = db_->Put(write_opts, cfh, k, write_ts, v);
836 }
837 } else {
838 #ifndef ROCKSDB_LITE
839 Transaction* txn;
840 s = NewTxn(write_opts, &txn);
841 if (s.ok()) {
842 s = txn->Put(cfh, k, v);
843 if (s.ok()) {
844 s = CommitTxn(txn, thread);
845 }
846 }
847 #endif
848 }
849 }
850
851 shared->Put(rand_column_family, rand_key, value_base, false /* pending */);
852
853 if (!s.ok()) {
854 if (FLAGS_injest_error_severity >= 2) {
855 if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
856 is_db_stopped_ = true;
857 } else if (!is_db_stopped_ ||
858 s.severity() < Status::Severity::kFatalError) {
859 fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
860 std::terminate();
861 }
862 } else {
863 fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
864 std::terminate();
865 }
866 }
867
868 thread->stats.AddBytesForWrites(1, sz);
869 PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key), value,
870 sz);
871 return s;
872 }
873
874 Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
875 const std::vector<int>& rand_column_families,
876 const std::vector<int64_t>& rand_keys) override {
877 int64_t rand_key = rand_keys[0];
878 int rand_column_family = rand_column_families[0];
879 auto shared = thread->shared;
880
881 std::unique_ptr<MutexLock> lock(
882 new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
883
884 // OPERATION delete
885 std::string write_ts_str = GetNowNanos();
886 Slice write_ts = write_ts_str;
887
888 std::string key_str = Key(rand_key);
889 Slice key = key_str;
890 auto cfh = column_families_[rand_column_family];
891
892 // Use delete if the key may be overwritten and a single deletion
893 // otherwise.
894 Status s;
895 if (shared->AllowsOverwrite(rand_key)) {
896 shared->Delete(rand_column_family, rand_key, true /* pending */);
897 if (!FLAGS_use_txn) {
898 if (FLAGS_user_timestamp_size == 0) {
899 s = db_->Delete(write_opts, cfh, key);
900 } else {
901 s = db_->Delete(write_opts, cfh, key, write_ts);
902 }
903 } else {
904 #ifndef ROCKSDB_LITE
905 Transaction* txn;
906 s = NewTxn(write_opts, &txn);
907 if (s.ok()) {
908 s = txn->Delete(cfh, key);
909 if (s.ok()) {
910 s = CommitTxn(txn, thread);
911 }
912 }
913 #endif
914 }
915 shared->Delete(rand_column_family, rand_key, false /* pending */);
916 thread->stats.AddDeletes(1);
917 if (!s.ok()) {
918 if (FLAGS_injest_error_severity >= 2) {
919 if (!is_db_stopped_ &&
920 s.severity() >= Status::Severity::kFatalError) {
921 is_db_stopped_ = true;
922 } else if (!is_db_stopped_ ||
923 s.severity() < Status::Severity::kFatalError) {
924 fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
925 std::terminate();
926 }
927 } else {
928 fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
929 std::terminate();
930 }
931 }
932 } else {
933 shared->SingleDelete(rand_column_family, rand_key, true /* pending */);
934 if (!FLAGS_use_txn) {
935 if (FLAGS_user_timestamp_size == 0) {
936 s = db_->SingleDelete(write_opts, cfh, key);
937 } else {
938 s = db_->SingleDelete(write_opts, cfh, key, write_ts);
939 }
940 } else {
941 #ifndef ROCKSDB_LITE
942 Transaction* txn;
943 s = NewTxn(write_opts, &txn);
944 if (s.ok()) {
945 s = txn->SingleDelete(cfh, key);
946 if (s.ok()) {
947 s = CommitTxn(txn, thread);
948 }
949 }
950 #endif
951 }
952 shared->SingleDelete(rand_column_family, rand_key, false /* pending */);
953 thread->stats.AddSingleDeletes(1);
954 if (!s.ok()) {
955 if (FLAGS_injest_error_severity >= 2) {
956 if (!is_db_stopped_ &&
957 s.severity() >= Status::Severity::kFatalError) {
958 is_db_stopped_ = true;
959 } else if (!is_db_stopped_ ||
960 s.severity() < Status::Severity::kFatalError) {
961 fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
962 std::terminate();
963 }
964 } else {
965 fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
966 std::terminate();
967 }
968 }
969 }
970 return s;
971 }
972
973 Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
974 const std::vector<int>& rand_column_families,
975 const std::vector<int64_t>& rand_keys) override {
976 // OPERATION delete range
977 std::vector<std::unique_ptr<MutexLock>> range_locks;
978 // delete range does not respect disallowed overwrites. the keys for
979 // which overwrites are disallowed are randomly distributed so it
980 // could be expensive to find a range where each key allows
981 // overwrites.
982 int64_t rand_key = rand_keys[0];
983 int rand_column_family = rand_column_families[0];
984 auto shared = thread->shared;
985 int64_t max_key = shared->GetMaxKey();
986 if (rand_key > max_key - FLAGS_range_deletion_width) {
987 rand_key =
988 thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
989 }
990 for (int j = 0; j < FLAGS_range_deletion_width; ++j) {
991 if (j == 0 ||
992 ((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
993 range_locks.emplace_back(new MutexLock(
994 shared->GetMutexForKey(rand_column_family, rand_key + j)));
995 }
996 }
997 shared->DeleteRange(rand_column_family, rand_key,
998 rand_key + FLAGS_range_deletion_width,
999 true /* pending */);
1000
1001 std::string keystr = Key(rand_key);
1002 Slice key = keystr;
1003 auto cfh = column_families_[rand_column_family];
1004 std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
1005 Slice end_key = end_keystr;
1006 std::string write_ts_str;
1007 Slice write_ts;
1008 Status s;
1009 if (FLAGS_user_timestamp_size) {
1010 write_ts_str = GetNowNanos();
1011 write_ts = write_ts_str;
1012 s = db_->DeleteRange(write_opts, cfh, key, end_key, write_ts);
1013 } else {
1014 s = db_->DeleteRange(write_opts, cfh, key, end_key);
1015 }
1016 if (!s.ok()) {
1017 if (FLAGS_injest_error_severity >= 2) {
1018 if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
1019 is_db_stopped_ = true;
1020 } else if (!is_db_stopped_ ||
1021 s.severity() < Status::Severity::kFatalError) {
1022 fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
1023 std::terminate();
1024 }
1025 } else {
1026 fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
1027 std::terminate();
1028 }
1029 }
1030 int covered = shared->DeleteRange(rand_column_family, rand_key,
1031 rand_key + FLAGS_range_deletion_width,
1032 false /* pending */);
1033 thread->stats.AddRangeDeletions(1);
1034 thread->stats.AddCoveredByRangeDeletions(covered);
1035 return s;
1036 }
1037
1038 #ifdef ROCKSDB_LITE
1039 void TestIngestExternalFile(
1040 ThreadState* /* thread */,
1041 const std::vector<int>& /* rand_column_families */,
1042 const std::vector<int64_t>& /* rand_keys */) override {
1043 assert(false);
1044 fprintf(stderr,
1045 "RocksDB lite does not support "
1046 "TestIngestExternalFile\n");
1047 std::terminate();
1048 }
1049 #else
1050 void TestIngestExternalFile(ThreadState* thread,
1051 const std::vector<int>& rand_column_families,
1052 const std::vector<int64_t>& rand_keys) override {
1053 const std::string sst_filename =
1054 FLAGS_db + "/." + std::to_string(thread->tid) + ".sst";
1055 Status s;
1056 if (db_stress_env->FileExists(sst_filename).ok()) {
1057 // Maybe we terminated abnormally before, so cleanup to give this file
1058 // ingestion a clean slate
1059 s = db_stress_env->DeleteFile(sst_filename);
1060 }
1061
1062 SstFileWriter sst_file_writer(EnvOptions(options_), options_);
1063 if (s.ok()) {
1064 s = sst_file_writer.Open(sst_filename);
1065 }
1066 int64_t key_base = rand_keys[0];
1067 int column_family = rand_column_families[0];
1068 std::vector<std::unique_ptr<MutexLock>> range_locks;
1069 range_locks.reserve(FLAGS_ingest_external_file_width);
1070 std::vector<int64_t> keys;
1071 keys.reserve(FLAGS_ingest_external_file_width);
1072 std::vector<uint32_t> values;
1073 values.reserve(FLAGS_ingest_external_file_width);
1074 SharedState* shared = thread->shared;
1075
1076 assert(FLAGS_nooverwritepercent < 100);
1077 // Grab locks, set pending state on expected values, and add keys
1078 for (int64_t key = key_base;
1079 s.ok() && key < shared->GetMaxKey() &&
1080 static_cast<int32_t>(keys.size()) < FLAGS_ingest_external_file_width;
1081 ++key) {
1082 if (key == key_base ||
1083 (key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
1084 range_locks.emplace_back(
1085 new MutexLock(shared->GetMutexForKey(column_family, key)));
1086 }
1087 if (!shared->AllowsOverwrite(key)) {
1088 // We could alternatively include `key` on the condition its current
1089 // value is `DELETION_SENTINEL`.
1090 continue;
1091 }
1092 keys.push_back(key);
1093
1094 uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
1095 values.push_back(value_base);
1096 shared->Put(column_family, key, value_base, true /* pending */);
1097
1098 char value[100];
1099 size_t value_len = GenerateValue(value_base, value, sizeof(value));
1100 auto key_str = Key(key);
1101 s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len));
1102 }
1103
1104 if (s.ok() && keys.empty()) {
1105 return;
1106 }
1107
1108 if (s.ok()) {
1109 s = sst_file_writer.Finish();
1110 }
1111 if (s.ok()) {
1112 s = db_->IngestExternalFile(column_families_[column_family],
1113 {sst_filename}, IngestExternalFileOptions());
1114 }
1115 if (!s.ok()) {
1116 fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str());
1117 std::terminate();
1118 }
1119 for (size_t i = 0; i < keys.size(); ++i) {
1120 shared->Put(column_family, keys[i], values[i], false /* pending */);
1121 }
1122 }
1123 #endif // ROCKSDB_LITE
1124
1125 // Given a key K, this creates an iterator which scans the range
1126 // [K, K + FLAGS_num_iterations) forward and backward.
1127 // Then does a random sequence of Next/Prev operations.
1128 Status TestIterateAgainstExpected(
1129 ThreadState* thread, const ReadOptions& read_opts,
1130 const std::vector<int>& rand_column_families,
1131 const std::vector<int64_t>& rand_keys) override {
1132 assert(thread);
1133 assert(!rand_column_families.empty());
1134 assert(!rand_keys.empty());
1135
1136 auto shared = thread->shared;
1137 assert(shared);
1138
1139 int64_t max_key = shared->GetMaxKey();
1140
1141 const int64_t num_iter = static_cast<int64_t>(FLAGS_num_iterations);
1142
1143 int64_t lb = rand_keys[0];
1144 if (lb > max_key - num_iter) {
1145 lb = thread->rand.Next() % (max_key - num_iter + 1);
1146 }
1147
1148 const int64_t ub = lb + num_iter;
1149
1150 // Lock the whole range over which we might iterate to ensure it doesn't
1151 // change under us.
1152 const int rand_column_family = rand_column_families[0];
1153 std::vector<std::unique_ptr<MutexLock>> range_locks =
1154 shared->GetLocksForKeyRange(rand_column_family, lb, ub);
1155
1156 ReadOptions ro(read_opts);
1157 ro.total_order_seek = true;
1158
1159 std::string read_ts_str;
1160 Slice read_ts;
1161 if (FLAGS_user_timestamp_size > 0) {
1162 read_ts_str = GetNowNanos();
1163 read_ts = read_ts_str;
1164 ro.timestamp = &read_ts;
1165 }
1166
1167 std::string max_key_str;
1168 Slice max_key_slice;
1169 if (!FLAGS_destroy_db_initially) {
1170 max_key_str = Key(max_key);
1171 max_key_slice = max_key_str;
1172 // to restrict iterator from reading keys written in batched_op_stress
1173 // that do not have expected state updated and may not be parseable by
1174 // GetIntVal().
1175 ro.iterate_upper_bound = &max_key_slice;
1176 }
1177
1178 ColumnFamilyHandle* const cfh = column_families_[rand_column_family];
1179 assert(cfh);
1180
1181 std::unique_ptr<Iterator> iter(db_->NewIterator(ro, cfh));
1182
1183 std::string op_logs;
1184
1185 auto check_columns = [&]() {
1186 assert(iter);
1187 assert(iter->Valid());
1188
1189 const WideColumns expected_columns = GenerateExpectedWideColumns(
1190 GetValueBase(iter->value()), iter->value());
1191 if (iter->columns() != expected_columns) {
1192 shared->SetVerificationFailure();
1193
1194 fprintf(stderr,
1195 "Verification failed for key %s: "
1196 "Value and columns inconsistent: %s\n",
1197 Slice(iter->key()).ToString(/* hex */ true).c_str(),
1198 DebugString(iter->value(), iter->columns(), expected_columns)
1199 .c_str());
1200 fprintf(stderr, "Column family: %s, op_logs: %s\n",
1201 cfh->GetName().c_str(), op_logs.c_str());
1202
1203 thread->stats.AddErrors(1);
1204
1205 return false;
1206 }
1207
1208 return true;
1209 };
1210
1211 auto check_no_key_in_range = [&](int64_t start, int64_t end) {
1212 for (auto j = std::max(start, lb); j < std::min(end, ub); ++j) {
1213 auto expected_value =
1214 shared->Get(rand_column_family, static_cast<int64_t>(j));
1215 if (expected_value != shared->DELETION_SENTINEL &&
1216 expected_value != shared->UNKNOWN_SENTINEL) {
1217 // Fail fast to preserve the DB state.
1218 thread->shared->SetVerificationFailure();
1219 if (iter->Valid()) {
1220 fprintf(stderr,
1221 "Expected state has key %s, iterator is at key %s\n",
1222 Slice(Key(j)).ToString(true).c_str(),
1223 iter->key().ToString(true).c_str());
1224 } else {
1225 fprintf(stderr, "Expected state has key %s, iterator is invalid\n",
1226 Slice(Key(j)).ToString(true).c_str());
1227 }
1228 fprintf(stderr, "Column family: %s, op_logs: %s\n",
1229 cfh->GetName().c_str(), op_logs.c_str());
1230 thread->stats.AddErrors(1);
1231 return false;
1232 }
1233 }
1234 return true;
1235 };
1236
1237 // Forward and backward scan to ensure we cover the entire range [lb, ub).
1238 // The random sequence Next and Prev test below tends to be very short
1239 // ranged.
1240 int64_t last_key = lb - 1;
1241
1242 std::string key_str = Key(lb);
1243 iter->Seek(key_str);
1244
1245 op_logs += "S " + Slice(key_str).ToString(true) + " ";
1246
1247 uint64_t curr = 0;
1248 while (true) {
1249 if (!iter->Valid()) {
1250 if (!iter->status().ok()) {
1251 thread->shared->SetVerificationFailure();
1252 fprintf(stderr, "TestIterate against expected state error: %s\n",
1253 iter->status().ToString().c_str());
1254 fprintf(stderr, "Column family: %s, op_logs: %s\n",
1255 cfh->GetName().c_str(), op_logs.c_str());
1256 thread->stats.AddErrors(1);
1257 return iter->status();
1258 }
1259 if (!check_no_key_in_range(last_key + 1, ub)) {
1260 return Status::OK();
1261 }
1262 break;
1263 }
1264
1265 if (!check_columns()) {
1266 return Status::OK();
1267 }
1268
1269 // iter is valid, the range (last_key, current key) was skipped
1270 GetIntVal(iter->key().ToString(), &curr);
1271 if (!check_no_key_in_range(last_key + 1, static_cast<int64_t>(curr))) {
1272 return Status::OK();
1273 }
1274
1275 last_key = static_cast<int64_t>(curr);
1276 if (last_key >= ub - 1) {
1277 break;
1278 }
1279
1280 iter->Next();
1281
1282 op_logs += "N";
1283 }
1284
1285 // backward scan
1286 key_str = Key(ub - 1);
1287 iter->SeekForPrev(key_str);
1288
1289 op_logs += " SFP " + Slice(key_str).ToString(true) + " ";
1290
1291 last_key = ub;
1292 while (true) {
1293 if (!iter->Valid()) {
1294 if (!iter->status().ok()) {
1295 thread->shared->SetVerificationFailure();
1296 fprintf(stderr, "TestIterate against expected state error: %s\n",
1297 iter->status().ToString().c_str());
1298 fprintf(stderr, "Column family: %s, op_logs: %s\n",
1299 cfh->GetName().c_str(), op_logs.c_str());
1300 thread->stats.AddErrors(1);
1301 return iter->status();
1302 }
1303 if (!check_no_key_in_range(lb, last_key)) {
1304 return Status::OK();
1305 }
1306 break;
1307 }
1308
1309 if (!check_columns()) {
1310 return Status::OK();
1311 }
1312
1313 // the range (current key, last key) was skipped
1314 GetIntVal(iter->key().ToString(), &curr);
1315 if (!check_no_key_in_range(static_cast<int64_t>(curr + 1), last_key)) {
1316 return Status::OK();
1317 }
1318
1319 last_key = static_cast<int64_t>(curr);
1320 if (last_key <= lb) {
1321 break;
1322 }
1323
1324 iter->Prev();
1325
1326 op_logs += "P";
1327 }
1328
1329 if (thread->rand.OneIn(2)) {
1330 // Refresh after forward/backward scan to allow higher chance of SV
1331 // change. It is safe to refresh since the testing key range is locked.
1332 iter->Refresh();
1333 }
1334
1335 // start from middle of [lb, ub) otherwise it is easy to iterate out of
1336 // locked range
1337 const int64_t mid = lb + num_iter / 2;
1338
1339 key_str = Key(mid);
1340 const Slice key(key_str);
1341
1342 if (thread->rand.OneIn(2)) {
1343 iter->Seek(key);
1344 op_logs += " S " + key.ToString(true) + " ";
1345 if (!iter->Valid() && iter->status().ok()) {
1346 if (!check_no_key_in_range(mid, ub)) {
1347 return Status::OK();
1348 }
1349 }
1350 } else {
1351 iter->SeekForPrev(key);
1352 op_logs += " SFP " + key.ToString(true) + " ";
1353 if (!iter->Valid() && iter->status().ok()) {
1354 // iterator says nothing <= mid
1355 if (!check_no_key_in_range(lb, mid + 1)) {
1356 return Status::OK();
1357 }
1358 }
1359 }
1360
1361 for (int64_t i = 0; i < num_iter && iter->Valid(); ++i) {
1362 if (!check_columns()) {
1363 return Status::OK();
1364 }
1365
1366 GetIntVal(iter->key().ToString(), &curr);
1367 if (static_cast<int64_t>(curr) < lb) {
1368 iter->Next();
1369 op_logs += "N";
1370 } else if (static_cast<int64_t>(curr) >= ub) {
1371 iter->Prev();
1372 op_logs += "P";
1373 } else {
1374 const uint32_t expected_value =
1375 shared->Get(rand_column_family, static_cast<int64_t>(curr));
1376 if (expected_value == shared->DELETION_SENTINEL) {
1377 // Fail fast to preserve the DB state.
1378 thread->shared->SetVerificationFailure();
1379 fprintf(stderr, "Iterator has key %s, but expected state does not.\n",
1380 iter->key().ToString(true).c_str());
1381 fprintf(stderr, "Column family: %s, op_logs: %s\n",
1382 cfh->GetName().c_str(), op_logs.c_str());
1383 thread->stats.AddErrors(1);
1384 break;
1385 }
1386
1387 if (thread->rand.OneIn(2)) {
1388 iter->Next();
1389 op_logs += "N";
1390 if (!iter->Valid()) {
1391 break;
1392 }
1393 uint64_t next = 0;
1394 GetIntVal(iter->key().ToString(), &next);
1395 if (!check_no_key_in_range(static_cast<int64_t>(curr + 1),
1396 static_cast<int64_t>(next))) {
1397 return Status::OK();
1398 }
1399 } else {
1400 iter->Prev();
1401 op_logs += "P";
1402 if (!iter->Valid()) {
1403 break;
1404 }
1405 uint64_t prev = 0;
1406 GetIntVal(iter->key().ToString(), &prev);
1407 if (!check_no_key_in_range(static_cast<int64_t>(prev + 1),
1408 static_cast<int64_t>(curr))) {
1409 return Status::OK();
1410 }
1411 }
1412 }
1413 }
1414
1415 if (!iter->status().ok()) {
1416 thread->shared->SetVerificationFailure();
1417 fprintf(stderr, "TestIterate against expected state error: %s\n",
1418 iter->status().ToString().c_str());
1419 fprintf(stderr, "Column family: %s, op_logs: %s\n",
1420 cfh->GetName().c_str(), op_logs.c_str());
1421 thread->stats.AddErrors(1);
1422 return iter->status();
1423 }
1424
1425 thread->stats.AddIterations(1);
1426
1427 return Status::OK();
1428 }
1429
1430 bool VerifyOrSyncValue(int cf, int64_t key, const ReadOptions& /*opts*/,
1431 SharedState* shared, const std::string& value_from_db,
1432 const Status& s, bool strict = false) const {
1433 if (shared->HasVerificationFailedYet()) {
1434 return false;
1435 }
1436 // compare value_from_db with the value in the shared state
1437 uint32_t value_base = shared->Get(cf, key);
1438 if (value_base == SharedState::UNKNOWN_SENTINEL) {
1439 if (s.ok()) {
1440 // Value exists in db, update state to reflect that
1441 Slice slice(value_from_db);
1442 value_base = GetValueBase(slice);
1443 shared->Put(cf, key, value_base, false);
1444 } else if (s.IsNotFound()) {
1445 // Value doesn't exist in db, update state to reflect that
1446 shared->SingleDelete(cf, key, false);
1447 }
1448 return true;
1449 }
1450 if (value_base == SharedState::DELETION_SENTINEL && !strict) {
1451 return true;
1452 }
1453
1454 if (s.ok()) {
1455 char value[kValueMaxLen];
1456 if (value_base == SharedState::DELETION_SENTINEL) {
1457 VerificationAbort(shared, "Unexpected value found", cf, key,
1458 value_from_db, "");
1459 return false;
1460 }
1461 size_t sz = GenerateValue(value_base, value, sizeof(value));
1462 if (value_from_db.length() != sz) {
1463 VerificationAbort(shared, "Length of value read is not equal", cf, key,
1464 value_from_db, Slice(value, sz));
1465 return false;
1466 }
1467 if (memcmp(value_from_db.data(), value, sz) != 0) {
1468 VerificationAbort(shared, "Contents of value read don't match", cf, key,
1469 value_from_db, Slice(value, sz));
1470 return false;
1471 }
1472 } else {
1473 if (value_base != SharedState::DELETION_SENTINEL) {
1474 char value[kValueMaxLen];
1475 size_t sz = GenerateValue(value_base, value, sizeof(value));
1476 VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key,
1477 "", Slice(value, sz));
1478 return false;
1479 }
1480 }
1481 return true;
1482 }
1483
1484 #ifndef ROCKSDB_LITE
1485 void PrepareTxnDbOptions(SharedState* shared,
1486 TransactionDBOptions& txn_db_opts) override {
1487 txn_db_opts.rollback_deletion_type_callback =
1488 [shared](TransactionDB*, ColumnFamilyHandle*, const Slice& key) {
1489 assert(shared);
1490 uint64_t key_num = 0;
1491 bool ok = GetIntVal(key.ToString(), &key_num);
1492 assert(ok);
1493 (void)ok;
1494 return !shared->AllowsOverwrite(key_num);
1495 };
1496 }
1497 #endif // ROCKSDB_LITE
1498 };
1499
1500 StressTest* CreateNonBatchedOpsStressTest() {
1501 return new NonBatchedOpsStressTest();
1502 }
1503
1504 } // namespace ROCKSDB_NAMESPACE
1505 #endif // GFLAGS