]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db_stress_tool/no_batched_ops_stress.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db_stress_tool / no_batched_ops_stress.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #ifdef GFLAGS
11 #include "db_stress_tool/db_stress_common.h"
12 #ifndef NDEBUG
13 #include "utilities/fault_injection_fs.h"
14 #endif // NDEBUG
15
16 namespace ROCKSDB_NAMESPACE {
17 class NonBatchedOpsStressTest : public StressTest {
18 public:
19 NonBatchedOpsStressTest() {}
20
21 virtual ~NonBatchedOpsStressTest() {}
22
23 void VerifyDb(ThreadState* thread) const override {
24 ReadOptions options(FLAGS_verify_checksum, true);
25 auto shared = thread->shared;
26 const int64_t max_key = shared->GetMaxKey();
27 const int64_t keys_per_thread = max_key / shared->GetNumThreads();
28 int64_t start = keys_per_thread * thread->tid;
29 int64_t end = start + keys_per_thread;
30 uint64_t prefix_to_use =
31 (FLAGS_prefix_size < 0) ? 1 : static_cast<size_t>(FLAGS_prefix_size);
32 if (thread->tid == shared->GetNumThreads() - 1) {
33 end = max_key;
34 }
35 for (size_t cf = 0; cf < column_families_.size(); ++cf) {
36 if (thread->shared->HasVerificationFailedYet()) {
37 break;
38 }
39 if (thread->rand.OneIn(3)) {
40 // 1/3 chance use iterator to verify this range
41 Slice prefix;
42 std::string seek_key = Key(start);
43 std::unique_ptr<Iterator> iter(
44 db_->NewIterator(options, column_families_[cf]));
45 iter->Seek(seek_key);
46 prefix = Slice(seek_key.data(), prefix_to_use);
47 for (auto i = start; i < end; i++) {
48 if (thread->shared->HasVerificationFailedYet()) {
49 break;
50 }
51 std::string from_db;
52 std::string keystr = Key(i);
53 Slice k = keystr;
54 Slice pfx = Slice(keystr.data(), prefix_to_use);
55 // Reseek when the prefix changes
56 if (prefix_to_use > 0 && prefix.compare(pfx) != 0) {
57 iter->Seek(k);
58 seek_key = keystr;
59 prefix = Slice(seek_key.data(), prefix_to_use);
60 }
61 Status s = iter->status();
62 if (iter->Valid()) {
63 Slice iter_key = iter->key();
64 if (iter->key().compare(k) > 0) {
65 s = Status::NotFound(Slice());
66 } else if (iter->key().compare(k) == 0) {
67 from_db = iter->value().ToString();
68 iter->Next();
69 } else if (iter_key.compare(k) < 0) {
70 VerificationAbort(shared, "An out of range key was found",
71 static_cast<int>(cf), i);
72 }
73 } else {
74 // The iterator found no value for the key in question, so do not
75 // move to the next item in the iterator
76 s = Status::NotFound();
77 }
78 VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
79 true);
80 if (from_db.length()) {
81 PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
82 from_db.data(), from_db.length());
83 }
84 }
85 } else if (thread->rand.OneIn(2)) {
86 // 1/3 chance use Get to verify this range
87 for (auto i = start; i < end; i++) {
88 if (thread->shared->HasVerificationFailedYet()) {
89 break;
90 }
91 std::string from_db;
92 std::string keystr = Key(i);
93 Slice k = keystr;
94 Status s = db_->Get(options, column_families_[cf], k, &from_db);
95 VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
96 true);
97 if (from_db.length()) {
98 PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
99 from_db.data(), from_db.length());
100 }
101 }
102 } else {
103 // 1/3 chance use MultiGet to verify this range
104 for (auto i = start; i < end;) {
105 if (thread->shared->HasVerificationFailedYet()) {
106 break;
107 }
108 // Keep the batch size to some reasonable value
109 size_t batch_size = thread->rand.Uniform(128) + 1;
110 batch_size = std::min<size_t>(batch_size, end - i);
111 std::vector<std::string> keystrs(batch_size);
112 std::vector<Slice> keys(batch_size);
113 std::vector<PinnableSlice> values(batch_size);
114 std::vector<Status> statuses(batch_size);
115 for (size_t j = 0; j < batch_size; ++j) {
116 keystrs[j] = Key(i + j);
117 keys[j] = Slice(keystrs[j].data(), keystrs[j].length());
118 }
119 db_->MultiGet(options, column_families_[cf], batch_size, keys.data(),
120 values.data(), statuses.data());
121 for (size_t j = 0; j < batch_size; ++j) {
122 Status s = statuses[j];
123 std::string from_db = values[j].ToString();
124 VerifyValue(static_cast<int>(cf), i + j, options, shared, from_db,
125 s, true);
126 if (from_db.length()) {
127 PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i + j),
128 from_db.data(), from_db.length());
129 }
130 }
131
132 i += batch_size;
133 }
134 }
135 }
136 }
137
138 void MaybeClearOneColumnFamily(ThreadState* thread) override {
139 if (FLAGS_column_families > 1) {
140 if (thread->rand.OneInOpt(FLAGS_clear_column_family_one_in)) {
141 // drop column family and then create it again (can't drop default)
142 int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1;
143 std::string new_name = ToString(new_column_family_name_.fetch_add(1));
144 {
145 MutexLock l(thread->shared->GetMutex());
146 fprintf(
147 stdout,
148 "[CF %d] Dropping and recreating column family. new name: %s\n",
149 cf, new_name.c_str());
150 }
151 thread->shared->LockColumnFamily(cf);
152 Status s = db_->DropColumnFamily(column_families_[cf]);
153 delete column_families_[cf];
154 if (!s.ok()) {
155 fprintf(stderr, "dropping column family error: %s\n",
156 s.ToString().c_str());
157 std::terminate();
158 }
159 s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
160 &column_families_[cf]);
161 column_family_names_[cf] = new_name;
162 thread->shared->ClearColumnFamily(cf);
163 if (!s.ok()) {
164 fprintf(stderr, "creating column family error: %s\n",
165 s.ToString().c_str());
166 std::terminate();
167 }
168 thread->shared->UnlockColumnFamily(cf);
169 }
170 }
171 }
172
173 bool ShouldAcquireMutexOnKey() const override { return true; }
174
175 Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
176 const std::vector<int>& rand_column_families,
177 const std::vector<int64_t>& rand_keys) override {
178 auto cfh = column_families_[rand_column_families[0]];
179 std::string key_str = Key(rand_keys[0]);
180 Slice key = key_str;
181 std::string from_db;
182 int error_count = 0;
183
184 #ifndef NDEBUG
185 if (fault_fs_guard) {
186 fault_fs_guard->EnableErrorInjection();
187 SharedState::ignore_read_error = false;
188 }
189 #endif // NDEBUG
190 Status s = db_->Get(read_opts, cfh, key, &from_db);
191 #ifndef NDEBUG
192 if (fault_fs_guard) {
193 error_count = fault_fs_guard->GetAndResetErrorCount();
194 }
195 #endif // NDEBUG
196 if (s.ok()) {
197 #ifndef NDEBUG
198 if (fault_fs_guard) {
199 if (error_count && !SharedState::ignore_read_error) {
200 // Grab mutex so multiple thread don't try to print the
201 // stack trace at the same time
202 MutexLock l(thread->shared->GetMutex());
203 fprintf(stderr, "Didn't get expected error from Get\n");
204 fprintf(stderr, "Callstack that injected the fault\n");
205 fault_fs_guard->PrintFaultBacktrace();
206 std::terminate();
207 }
208 }
209 #endif // NDEBUG
210 // found case
211 thread->stats.AddGets(1, 1);
212 } else if (s.IsNotFound()) {
213 // not found case
214 thread->stats.AddGets(1, 0);
215 } else {
216 if (error_count == 0) {
217 // errors case
218 thread->stats.AddErrors(1);
219 } else {
220 thread->stats.AddVerifiedErrors(1);
221 }
222 }
223 #ifndef NDEBUG
224 if (fault_fs_guard) {
225 fault_fs_guard->DisableErrorInjection();
226 }
227 #endif // NDEBUG
228 return s;
229 }
230
231 std::vector<Status> TestMultiGet(
232 ThreadState* thread, const ReadOptions& read_opts,
233 const std::vector<int>& rand_column_families,
234 const std::vector<int64_t>& rand_keys) override {
235 size_t num_keys = rand_keys.size();
236 std::vector<std::string> key_str;
237 std::vector<Slice> keys;
238 key_str.reserve(num_keys);
239 keys.reserve(num_keys);
240 std::vector<PinnableSlice> values(num_keys);
241 std::vector<Status> statuses(num_keys);
242 ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
243 int error_count = 0;
244 // Do a consistency check between Get and MultiGet. Don't do it too
245 // often as it will slow db_stress down
246 bool do_consistency_check = thread->rand.OneIn(4);
247
248 ReadOptions readoptionscopy = read_opts;
249 if (do_consistency_check) {
250 readoptionscopy.snapshot = db_->GetSnapshot();
251 }
252
253 // To appease clang analyzer
254 const bool use_txn = FLAGS_use_txn;
255
256 // Create a transaction in order to write some data. The purpose is to
257 // exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction
258 // will be rolled back once MultiGet returns.
259 #ifndef ROCKSDB_LITE
260 Transaction* txn = nullptr;
261 if (use_txn) {
262 WriteOptions wo;
263 Status s = NewTxn(wo, &txn);
264 if (!s.ok()) {
265 fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str());
266 std::terminate();
267 }
268 }
269 #endif
270 for (size_t i = 0; i < num_keys; ++i) {
271 key_str.emplace_back(Key(rand_keys[i]));
272 keys.emplace_back(key_str.back());
273 #ifndef ROCKSDB_LITE
274 if (use_txn) {
275 // With a 1 in 10 probability, insert the just added key in the batch
276 // into the transaction. This will create an overlap with the MultiGet
277 // keys and exercise some corner cases in the code
278 if (thread->rand.OneIn(10)) {
279 int op = thread->rand.Uniform(2);
280 Status s;
281 switch (op) {
282 case 0:
283 case 1: {
284 uint32_t value_base =
285 thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
286 char value[100];
287 size_t sz = GenerateValue(value_base, value, sizeof(value));
288 Slice v(value, sz);
289 if (op == 0) {
290 s = txn->Put(cfh, keys.back(), v);
291 } else {
292 s = txn->Merge(cfh, keys.back(), v);
293 }
294 break;
295 }
296 case 2:
297 s = txn->Delete(cfh, keys.back());
298 break;
299 default:
300 assert(false);
301 }
302 if (!s.ok()) {
303 fprintf(stderr, "Transaction put: %s\n", s.ToString().c_str());
304 std::terminate();
305 }
306 }
307 }
308 #endif
309 }
310
311 if (!use_txn) {
312 #ifndef NDEBUG
313 if (fault_fs_guard) {
314 fault_fs_guard->EnableErrorInjection();
315 SharedState::ignore_read_error = false;
316 }
317 #endif // NDEBUG
318 db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
319 statuses.data());
320 #ifndef NDEBUG
321 if (fault_fs_guard) {
322 error_count = fault_fs_guard->GetAndResetErrorCount();
323 }
324 #endif // NDEBUG
325 } else {
326 #ifndef ROCKSDB_LITE
327 txn->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
328 statuses.data());
329 #endif
330 }
331
332 #ifndef NDEBUG
333 if (fault_fs_guard && error_count && !SharedState::ignore_read_error) {
334 int stat_nok = 0;
335 for (const auto& s : statuses) {
336 if (!s.ok() && !s.IsNotFound()) {
337 stat_nok++;
338 }
339 }
340
341 if (stat_nok < error_count) {
342 // Grab mutex so multiple thread don't try to print the
343 // stack trace at the same time
344 MutexLock l(thread->shared->GetMutex());
345 fprintf(stderr, "Didn't get expected error from MultiGet\n");
346 fprintf(stderr, "Callstack that injected the fault\n");
347 fault_fs_guard->PrintFaultBacktrace();
348 std::terminate();
349 }
350 }
351 if (fault_fs_guard) {
352 fault_fs_guard->DisableErrorInjection();
353 }
354 #endif // NDEBUG
355
356 for (size_t i = 0; i < statuses.size(); ++i) {
357 Status s = statuses[i];
358 bool is_consistent = true;
359 // Only do the consistency check if no error was injected and MultiGet
360 // didn't return an unexpected error
361 if (do_consistency_check && !error_count && (s.ok() || s.IsNotFound())) {
362 Status tmp_s;
363 std::string value;
364
365 if (use_txn) {
366 #ifndef ROCKSDB_LITE
367 tmp_s = txn->Get(readoptionscopy, cfh, keys[i], &value);
368 #endif // ROCKSDB_LITE
369 } else {
370 tmp_s = db_->Get(readoptionscopy, cfh, keys[i], &value);
371 }
372 if (!tmp_s.ok() && !tmp_s.IsNotFound()) {
373 fprintf(stderr, "Get error: %s\n", s.ToString().c_str());
374 is_consistent = false;
375 } else if (!s.ok() && tmp_s.ok()) {
376 fprintf(stderr, "MultiGet returned different results with key %s\n",
377 keys[i].ToString(true).c_str());
378 fprintf(stderr, "Get returned ok, MultiGet returned not found\n");
379 is_consistent = false;
380 } else if (s.ok() && tmp_s.IsNotFound()) {
381 fprintf(stderr, "MultiGet returned different results with key %s\n",
382 keys[i].ToString(true).c_str());
383 fprintf(stderr, "MultiGet returned ok, Get returned not found\n");
384 is_consistent = false;
385 } else if (s.ok() && value != values[i].ToString()) {
386 fprintf(stderr, "MultiGet returned different results with key %s\n",
387 keys[i].ToString(true).c_str());
388 fprintf(stderr, "MultiGet returned value %s\n",
389 values[i].ToString(true).c_str());
390 fprintf(stderr, "Get returned value %s\n", value.c_str());
391 is_consistent = false;
392 }
393 }
394
395 if (!is_consistent) {
396 fprintf(stderr, "TestMultiGet error: is_consistent is false\n");
397 thread->stats.AddErrors(1);
398 // Fail fast to preserve the DB state
399 thread->shared->SetVerificationFailure();
400 break;
401 } else if (s.ok()) {
402 // found case
403 thread->stats.AddGets(1, 1);
404 } else if (s.IsNotFound()) {
405 // not found case
406 thread->stats.AddGets(1, 0);
407 } else if (s.IsMergeInProgress() && use_txn) {
408 // With txn this is sometimes expected.
409 thread->stats.AddGets(1, 1);
410 } else {
411 if (error_count == 0) {
412 // errors case
413 fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
414 thread->stats.AddErrors(1);
415 } else {
416 thread->stats.AddVerifiedErrors(1);
417 }
418 }
419 }
420
421 if (readoptionscopy.snapshot) {
422 db_->ReleaseSnapshot(readoptionscopy.snapshot);
423 }
424 if (use_txn) {
425 #ifndef ROCKSDB_LITE
426 RollbackTxn(txn);
427 #endif
428 }
429 return statuses;
430 }
431
432 Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
433 const std::vector<int>& rand_column_families,
434 const std::vector<int64_t>& rand_keys) override {
435 auto cfh = column_families_[rand_column_families[0]];
436 std::string key_str = Key(rand_keys[0]);
437 Slice key = key_str;
438 Slice prefix = Slice(key.data(), FLAGS_prefix_size);
439
440 std::string upper_bound;
441 Slice ub_slice;
442 ReadOptions ro_copy = read_opts;
443 // Get the next prefix first and then see if we want to set upper bound.
444 // We'll use the next prefix in an assertion later on
445 if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
446 // For half of the time, set the upper bound to the next prefix
447 ub_slice = Slice(upper_bound);
448 ro_copy.iterate_upper_bound = &ub_slice;
449 }
450
451 Iterator* iter = db_->NewIterator(ro_copy, cfh);
452 unsigned long count = 0;
453 for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
454 iter->Next()) {
455 ++count;
456 }
457
458 assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
459
460 Status s = iter->status();
461 if (iter->status().ok()) {
462 thread->stats.AddPrefixes(1, count);
463 } else {
464 fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
465 thread->stats.AddErrors(1);
466 }
467 delete iter;
468 return s;
469 }
470
471 Status TestPut(ThreadState* thread, WriteOptions& write_opts,
472 const ReadOptions& read_opts,
473 const std::vector<int>& rand_column_families,
474 const std::vector<int64_t>& rand_keys, char (&value)[100],
475 std::unique_ptr<MutexLock>& lock) override {
476 auto shared = thread->shared;
477 int64_t max_key = shared->GetMaxKey();
478 int64_t rand_key = rand_keys[0];
479 int rand_column_family = rand_column_families[0];
480 while (!shared->AllowsOverwrite(rand_key) &&
481 (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
482 lock.reset();
483 rand_key = thread->rand.Next() % max_key;
484 rand_column_family = thread->rand.Next() % FLAGS_column_families;
485 lock.reset(
486 new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
487 }
488
489 std::string key_str = Key(rand_key);
490 Slice key = key_str;
491 ColumnFamilyHandle* cfh = column_families_[rand_column_family];
492
493 if (FLAGS_verify_before_write) {
494 std::string key_str2 = Key(rand_key);
495 Slice k = key_str2;
496 std::string from_db;
497 Status s = db_->Get(read_opts, cfh, k, &from_db);
498 if (!VerifyValue(rand_column_family, rand_key, read_opts, shared, from_db,
499 s, true)) {
500 return s;
501 }
502 }
503 uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
504 size_t sz = GenerateValue(value_base, value, sizeof(value));
505 Slice v(value, sz);
506 shared->Put(rand_column_family, rand_key, value_base, true /* pending */);
507 Status s;
508 if (FLAGS_use_merge) {
509 if (!FLAGS_use_txn) {
510 s = db_->Merge(write_opts, cfh, key, v);
511 } else {
512 #ifndef ROCKSDB_LITE
513 Transaction* txn;
514 s = NewTxn(write_opts, &txn);
515 if (s.ok()) {
516 s = txn->Merge(cfh, key, v);
517 if (s.ok()) {
518 s = CommitTxn(txn);
519 }
520 }
521 #endif
522 }
523 } else {
524 if (!FLAGS_use_txn) {
525 s = db_->Put(write_opts, cfh, key, v);
526 } else {
527 #ifndef ROCKSDB_LITE
528 Transaction* txn;
529 s = NewTxn(write_opts, &txn);
530 if (s.ok()) {
531 s = txn->Put(cfh, key, v);
532 if (s.ok()) {
533 s = CommitTxn(txn);
534 }
535 }
536 #endif
537 }
538 }
539 shared->Put(rand_column_family, rand_key, value_base, false /* pending */);
540 if (!s.ok()) {
541 fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
542 std::terminate();
543 }
544 thread->stats.AddBytesForWrites(1, sz);
545 PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key), value,
546 sz);
547 return s;
548 }
549
550 Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
551 const std::vector<int>& rand_column_families,
552 const std::vector<int64_t>& rand_keys,
553 std::unique_ptr<MutexLock>& lock) override {
554 int64_t rand_key = rand_keys[0];
555 int rand_column_family = rand_column_families[0];
556 auto shared = thread->shared;
557 int64_t max_key = shared->GetMaxKey();
558
559 // OPERATION delete
560 // If the chosen key does not allow overwrite and it does not exist,
561 // choose another key.
562 while (!shared->AllowsOverwrite(rand_key) &&
563 !shared->Exists(rand_column_family, rand_key)) {
564 lock.reset();
565 rand_key = thread->rand.Next() % max_key;
566 rand_column_family = thread->rand.Next() % FLAGS_column_families;
567 lock.reset(
568 new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
569 }
570
571 std::string key_str = Key(rand_key);
572 Slice key = key_str;
573 auto cfh = column_families_[rand_column_family];
574
575 // Use delete if the key may be overwritten and a single deletion
576 // otherwise.
577 Status s;
578 if (shared->AllowsOverwrite(rand_key)) {
579 shared->Delete(rand_column_family, rand_key, true /* pending */);
580 if (!FLAGS_use_txn) {
581 s = db_->Delete(write_opts, cfh, key);
582 } else {
583 #ifndef ROCKSDB_LITE
584 Transaction* txn;
585 s = NewTxn(write_opts, &txn);
586 if (s.ok()) {
587 s = txn->Delete(cfh, key);
588 if (s.ok()) {
589 s = CommitTxn(txn);
590 }
591 }
592 #endif
593 }
594 shared->Delete(rand_column_family, rand_key, false /* pending */);
595 thread->stats.AddDeletes(1);
596 if (!s.ok()) {
597 fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
598 std::terminate();
599 }
600 } else {
601 shared->SingleDelete(rand_column_family, rand_key, true /* pending */);
602 if (!FLAGS_use_txn) {
603 s = db_->SingleDelete(write_opts, cfh, key);
604 } else {
605 #ifndef ROCKSDB_LITE
606 Transaction* txn;
607 s = NewTxn(write_opts, &txn);
608 if (s.ok()) {
609 s = txn->SingleDelete(cfh, key);
610 if (s.ok()) {
611 s = CommitTxn(txn);
612 }
613 }
614 #endif
615 }
616 shared->SingleDelete(rand_column_family, rand_key, false /* pending */);
617 thread->stats.AddSingleDeletes(1);
618 if (!s.ok()) {
619 fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
620 std::terminate();
621 }
622 }
623 return s;
624 }
625
626 Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
627 const std::vector<int>& rand_column_families,
628 const std::vector<int64_t>& rand_keys,
629 std::unique_ptr<MutexLock>& lock) override {
630 // OPERATION delete range
631 std::vector<std::unique_ptr<MutexLock>> range_locks;
632 // delete range does not respect disallowed overwrites. the keys for
633 // which overwrites are disallowed are randomly distributed so it
634 // could be expensive to find a range where each key allows
635 // overwrites.
636 int64_t rand_key = rand_keys[0];
637 int rand_column_family = rand_column_families[0];
638 auto shared = thread->shared;
639 int64_t max_key = shared->GetMaxKey();
640 if (rand_key > max_key - FLAGS_range_deletion_width) {
641 lock.reset();
642 rand_key =
643 thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
644 range_locks.emplace_back(
645 new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
646 } else {
647 range_locks.emplace_back(std::move(lock));
648 }
649 for (int j = 1; j < FLAGS_range_deletion_width; ++j) {
650 if (((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
651 range_locks.emplace_back(new MutexLock(
652 shared->GetMutexForKey(rand_column_family, rand_key + j)));
653 }
654 }
655 shared->DeleteRange(rand_column_family, rand_key,
656 rand_key + FLAGS_range_deletion_width,
657 true /* pending */);
658
659 std::string keystr = Key(rand_key);
660 Slice key = keystr;
661 auto cfh = column_families_[rand_column_family];
662 std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
663 Slice end_key = end_keystr;
664 Status s = db_->DeleteRange(write_opts, cfh, key, end_key);
665 if (!s.ok()) {
666 fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
667 std::terminate();
668 }
669 int covered = shared->DeleteRange(rand_column_family, rand_key,
670 rand_key + FLAGS_range_deletion_width,
671 false /* pending */);
672 thread->stats.AddRangeDeletions(1);
673 thread->stats.AddCoveredByRangeDeletions(covered);
674 return s;
675 }
676
677 #ifdef ROCKSDB_LITE
678 void TestIngestExternalFile(
679 ThreadState* /* thread */,
680 const std::vector<int>& /* rand_column_families */,
681 const std::vector<int64_t>& /* rand_keys */,
682 std::unique_ptr<MutexLock>& /* lock */) override {
683 assert(false);
684 fprintf(stderr,
685 "RocksDB lite does not support "
686 "TestIngestExternalFile\n");
687 std::terminate();
688 }
689 #else
690 void TestIngestExternalFile(ThreadState* thread,
691 const std::vector<int>& rand_column_families,
692 const std::vector<int64_t>& rand_keys,
693 std::unique_ptr<MutexLock>& lock) override {
694 const std::string sst_filename =
695 FLAGS_db + "/." + ToString(thread->tid) + ".sst";
696 Status s;
697 if (db_stress_env->FileExists(sst_filename).ok()) {
698 // Maybe we terminated abnormally before, so cleanup to give this file
699 // ingestion a clean slate
700 s = db_stress_env->DeleteFile(sst_filename);
701 }
702
703 SstFileWriter sst_file_writer(EnvOptions(options_), options_);
704 if (s.ok()) {
705 s = sst_file_writer.Open(sst_filename);
706 }
707 int64_t key_base = rand_keys[0];
708 int column_family = rand_column_families[0];
709 std::vector<std::unique_ptr<MutexLock>> range_locks;
710 std::vector<uint32_t> values;
711 SharedState* shared = thread->shared;
712
713 // Grab locks, set pending state on expected values, and add keys
714 for (int64_t key = key_base;
715 s.ok() && key < std::min(key_base + FLAGS_ingest_external_file_width,
716 shared->GetMaxKey());
717 ++key) {
718 if (key == key_base) {
719 range_locks.emplace_back(std::move(lock));
720 } else if ((key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
721 range_locks.emplace_back(
722 new MutexLock(shared->GetMutexForKey(column_family, key)));
723 }
724
725 uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
726 values.push_back(value_base);
727 shared->Put(column_family, key, value_base, true /* pending */);
728
729 char value[100];
730 size_t value_len = GenerateValue(value_base, value, sizeof(value));
731 auto key_str = Key(key);
732 s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len));
733 }
734
735 if (s.ok()) {
736 s = sst_file_writer.Finish();
737 }
738 if (s.ok()) {
739 s = db_->IngestExternalFile(column_families_[column_family],
740 {sst_filename}, IngestExternalFileOptions());
741 }
742 if (!s.ok()) {
743 fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str());
744 std::terminate();
745 }
746 int64_t key = key_base;
747 for (int32_t value : values) {
748 shared->Put(column_family, key, value, false /* pending */);
749 ++key;
750 }
751 }
752 #endif // ROCKSDB_LITE
753
754 bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/,
755 SharedState* shared, const std::string& value_from_db,
756 const Status& s, bool strict = false) const {
757 if (shared->HasVerificationFailedYet()) {
758 return false;
759 }
760 // compare value_from_db with the value in the shared state
761 char value[kValueMaxLen];
762 uint32_t value_base = shared->Get(cf, key);
763 if (value_base == SharedState::UNKNOWN_SENTINEL) {
764 return true;
765 }
766 if (value_base == SharedState::DELETION_SENTINEL && !strict) {
767 return true;
768 }
769
770 if (s.ok()) {
771 if (value_base == SharedState::DELETION_SENTINEL) {
772 VerificationAbort(shared, "Unexpected value found", cf, key);
773 return false;
774 }
775 size_t sz = GenerateValue(value_base, value, sizeof(value));
776 if (value_from_db.length() != sz) {
777 VerificationAbort(shared, "Length of value read is not equal", cf, key);
778 return false;
779 }
780 if (memcmp(value_from_db.data(), value, sz) != 0) {
781 VerificationAbort(shared, "Contents of value read don't match", cf,
782 key);
783 return false;
784 }
785 } else {
786 if (value_base != SharedState::DELETION_SENTINEL) {
787 VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
788 return false;
789 }
790 }
791 return true;
792 }
793 };
794
795 StressTest* CreateNonBatchedOpsStressTest() {
796 return new NonBatchedOpsStressTest();
797 }
798
799 } // namespace ROCKSDB_NAMESPACE
800 #endif // GFLAGS