1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
11 #include "db_stress_tool/db_stress_common.h"
13 #include "utilities/fault_injection_fs.h"
16 namespace ROCKSDB_NAMESPACE
{
17 class NonBatchedOpsStressTest
: public StressTest
{
19 NonBatchedOpsStressTest() {}
21 virtual ~NonBatchedOpsStressTest() {}
23 void VerifyDb(ThreadState
* thread
) const override
{
24 ReadOptions
options(FLAGS_verify_checksum
, true);
25 auto shared
= thread
->shared
;
26 const int64_t max_key
= shared
->GetMaxKey();
27 const int64_t keys_per_thread
= max_key
/ shared
->GetNumThreads();
28 int64_t start
= keys_per_thread
* thread
->tid
;
29 int64_t end
= start
+ keys_per_thread
;
30 uint64_t prefix_to_use
=
31 (FLAGS_prefix_size
< 0) ? 1 : static_cast<size_t>(FLAGS_prefix_size
);
32 if (thread
->tid
== shared
->GetNumThreads() - 1) {
35 for (size_t cf
= 0; cf
< column_families_
.size(); ++cf
) {
36 if (thread
->shared
->HasVerificationFailedYet()) {
39 if (thread
->rand
.OneIn(3)) {
40 // 1/3 chance use iterator to verify this range
42 std::string seek_key
= Key(start
);
43 std::unique_ptr
<Iterator
> iter(
44 db_
->NewIterator(options
, column_families_
[cf
]));
46 prefix
= Slice(seek_key
.data(), prefix_to_use
);
47 for (auto i
= start
; i
< end
; i
++) {
48 if (thread
->shared
->HasVerificationFailedYet()) {
52 std::string keystr
= Key(i
);
54 Slice pfx
= Slice(keystr
.data(), prefix_to_use
);
55 // Reseek when the prefix changes
56 if (prefix_to_use
> 0 && prefix
.compare(pfx
) != 0) {
59 prefix
= Slice(seek_key
.data(), prefix_to_use
);
61 Status s
= iter
->status();
63 Slice iter_key
= iter
->key();
64 if (iter
->key().compare(k
) > 0) {
65 s
= Status::NotFound(Slice());
66 } else if (iter
->key().compare(k
) == 0) {
67 from_db
= iter
->value().ToString();
69 } else if (iter_key
.compare(k
) < 0) {
70 VerificationAbort(shared
, "An out of range key was found",
71 static_cast<int>(cf
), i
);
74 // The iterator found no value for the key in question, so do not
75 // move to the next item in the iterator
76 s
= Status::NotFound();
78 VerifyValue(static_cast<int>(cf
), i
, options
, shared
, from_db
, s
,
80 if (from_db
.length()) {
81 PrintKeyValue(static_cast<int>(cf
), static_cast<uint32_t>(i
),
82 from_db
.data(), from_db
.length());
85 } else if (thread
->rand
.OneIn(2)) {
86 // 1/3 chance use Get to verify this range
87 for (auto i
= start
; i
< end
; i
++) {
88 if (thread
->shared
->HasVerificationFailedYet()) {
92 std::string keystr
= Key(i
);
94 Status s
= db_
->Get(options
, column_families_
[cf
], k
, &from_db
);
95 VerifyValue(static_cast<int>(cf
), i
, options
, shared
, from_db
, s
,
97 if (from_db
.length()) {
98 PrintKeyValue(static_cast<int>(cf
), static_cast<uint32_t>(i
),
99 from_db
.data(), from_db
.length());
103 // 1/3 chance use MultiGet to verify this range
104 for (auto i
= start
; i
< end
;) {
105 if (thread
->shared
->HasVerificationFailedYet()) {
108 // Keep the batch size to some reasonable value
109 size_t batch_size
= thread
->rand
.Uniform(128) + 1;
110 batch_size
= std::min
<size_t>(batch_size
, end
- i
);
111 std::vector
<std::string
> keystrs(batch_size
);
112 std::vector
<Slice
> keys(batch_size
);
113 std::vector
<PinnableSlice
> values(batch_size
);
114 std::vector
<Status
> statuses(batch_size
);
115 for (size_t j
= 0; j
< batch_size
; ++j
) {
116 keystrs
[j
] = Key(i
+ j
);
117 keys
[j
] = Slice(keystrs
[j
].data(), keystrs
[j
].length());
119 db_
->MultiGet(options
, column_families_
[cf
], batch_size
, keys
.data(),
120 values
.data(), statuses
.data());
121 for (size_t j
= 0; j
< batch_size
; ++j
) {
122 Status s
= statuses
[j
];
123 std::string from_db
= values
[j
].ToString();
124 VerifyValue(static_cast<int>(cf
), i
+ j
, options
, shared
, from_db
,
126 if (from_db
.length()) {
127 PrintKeyValue(static_cast<int>(cf
), static_cast<uint32_t>(i
+ j
),
128 from_db
.data(), from_db
.length());
138 void MaybeClearOneColumnFamily(ThreadState
* thread
) override
{
139 if (FLAGS_column_families
> 1) {
140 if (thread
->rand
.OneInOpt(FLAGS_clear_column_family_one_in
)) {
141 // drop column family and then create it again (can't drop default)
142 int cf
= thread
->rand
.Next() % (FLAGS_column_families
- 1) + 1;
143 std::string new_name
= ToString(new_column_family_name_
.fetch_add(1));
145 MutexLock
l(thread
->shared
->GetMutex());
148 "[CF %d] Dropping and recreating column family. new name: %s\n",
149 cf
, new_name
.c_str());
151 thread
->shared
->LockColumnFamily(cf
);
152 Status s
= db_
->DropColumnFamily(column_families_
[cf
]);
153 delete column_families_
[cf
];
155 fprintf(stderr
, "dropping column family error: %s\n",
156 s
.ToString().c_str());
159 s
= db_
->CreateColumnFamily(ColumnFamilyOptions(options_
), new_name
,
160 &column_families_
[cf
]);
161 column_family_names_
[cf
] = new_name
;
162 thread
->shared
->ClearColumnFamily(cf
);
164 fprintf(stderr
, "creating column family error: %s\n",
165 s
.ToString().c_str());
168 thread
->shared
->UnlockColumnFamily(cf
);
173 bool ShouldAcquireMutexOnKey() const override
{ return true; }
175 Status
TestGet(ThreadState
* thread
, const ReadOptions
& read_opts
,
176 const std::vector
<int>& rand_column_families
,
177 const std::vector
<int64_t>& rand_keys
) override
{
178 auto cfh
= column_families_
[rand_column_families
[0]];
179 std::string key_str
= Key(rand_keys
[0]);
185 if (fault_fs_guard
) {
186 fault_fs_guard
->EnableErrorInjection();
187 SharedState::ignore_read_error
= false;
190 Status s
= db_
->Get(read_opts
, cfh
, key
, &from_db
);
192 if (fault_fs_guard
) {
193 error_count
= fault_fs_guard
->GetAndResetErrorCount();
198 if (fault_fs_guard
) {
199 if (error_count
&& !SharedState::ignore_read_error
) {
200 // Grab mutex so multiple thread don't try to print the
201 // stack trace at the same time
202 MutexLock
l(thread
->shared
->GetMutex());
203 fprintf(stderr
, "Didn't get expected error from Get\n");
204 fprintf(stderr
, "Callstack that injected the fault\n");
205 fault_fs_guard
->PrintFaultBacktrace();
211 thread
->stats
.AddGets(1, 1);
212 } else if (s
.IsNotFound()) {
214 thread
->stats
.AddGets(1, 0);
216 if (error_count
== 0) {
218 thread
->stats
.AddErrors(1);
220 thread
->stats
.AddVerifiedErrors(1);
224 if (fault_fs_guard
) {
225 fault_fs_guard
->DisableErrorInjection();
231 std::vector
<Status
> TestMultiGet(
232 ThreadState
* thread
, const ReadOptions
& read_opts
,
233 const std::vector
<int>& rand_column_families
,
234 const std::vector
<int64_t>& rand_keys
) override
{
235 size_t num_keys
= rand_keys
.size();
236 std::vector
<std::string
> key_str
;
237 std::vector
<Slice
> keys
;
238 key_str
.reserve(num_keys
);
239 keys
.reserve(num_keys
);
240 std::vector
<PinnableSlice
> values(num_keys
);
241 std::vector
<Status
> statuses(num_keys
);
242 ColumnFamilyHandle
* cfh
= column_families_
[rand_column_families
[0]];
244 // Do a consistency check between Get and MultiGet. Don't do it too
245 // often as it will slow db_stress down
246 bool do_consistency_check
= thread
->rand
.OneIn(4);
248 ReadOptions readoptionscopy
= read_opts
;
249 if (do_consistency_check
) {
250 readoptionscopy
.snapshot
= db_
->GetSnapshot();
253 // To appease clang analyzer
254 const bool use_txn
= FLAGS_use_txn
;
256 // Create a transaction in order to write some data. The purpose is to
257 // exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction
258 // will be rolled back once MultiGet returns.
260 Transaction
* txn
= nullptr;
263 Status s
= NewTxn(wo
, &txn
);
265 fprintf(stderr
, "NewTxn: %s\n", s
.ToString().c_str());
270 for (size_t i
= 0; i
< num_keys
; ++i
) {
271 key_str
.emplace_back(Key(rand_keys
[i
]));
272 keys
.emplace_back(key_str
.back());
275 // With a 1 in 10 probability, insert the just added key in the batch
276 // into the transaction. This will create an overlap with the MultiGet
277 // keys and exercise some corner cases in the code
278 if (thread
->rand
.OneIn(10)) {
279 int op
= thread
->rand
.Uniform(2);
284 uint32_t value_base
=
285 thread
->rand
.Next() % thread
->shared
->UNKNOWN_SENTINEL
;
287 size_t sz
= GenerateValue(value_base
, value
, sizeof(value
));
290 s
= txn
->Put(cfh
, keys
.back(), v
);
292 s
= txn
->Merge(cfh
, keys
.back(), v
);
297 s
= txn
->Delete(cfh
, keys
.back());
303 fprintf(stderr
, "Transaction put: %s\n", s
.ToString().c_str());
313 if (fault_fs_guard
) {
314 fault_fs_guard
->EnableErrorInjection();
315 SharedState::ignore_read_error
= false;
318 db_
->MultiGet(readoptionscopy
, cfh
, num_keys
, keys
.data(), values
.data(),
321 if (fault_fs_guard
) {
322 error_count
= fault_fs_guard
->GetAndResetErrorCount();
327 txn
->MultiGet(readoptionscopy
, cfh
, num_keys
, keys
.data(), values
.data(),
333 if (fault_fs_guard
&& error_count
&& !SharedState::ignore_read_error
) {
335 for (const auto& s
: statuses
) {
336 if (!s
.ok() && !s
.IsNotFound()) {
341 if (stat_nok
< error_count
) {
342 // Grab mutex so multiple thread don't try to print the
343 // stack trace at the same time
344 MutexLock
l(thread
->shared
->GetMutex());
345 fprintf(stderr
, "Didn't get expected error from MultiGet\n");
346 fprintf(stderr
, "Callstack that injected the fault\n");
347 fault_fs_guard
->PrintFaultBacktrace();
351 if (fault_fs_guard
) {
352 fault_fs_guard
->DisableErrorInjection();
356 for (size_t i
= 0; i
< statuses
.size(); ++i
) {
357 Status s
= statuses
[i
];
358 bool is_consistent
= true;
359 // Only do the consistency check if no error was injected and MultiGet
360 // didn't return an unexpected error
361 if (do_consistency_check
&& !error_count
&& (s
.ok() || s
.IsNotFound())) {
367 tmp_s
= txn
->Get(readoptionscopy
, cfh
, keys
[i
], &value
);
368 #endif // ROCKSDB_LITE
370 tmp_s
= db_
->Get(readoptionscopy
, cfh
, keys
[i
], &value
);
372 if (!tmp_s
.ok() && !tmp_s
.IsNotFound()) {
373 fprintf(stderr
, "Get error: %s\n", s
.ToString().c_str());
374 is_consistent
= false;
375 } else if (!s
.ok() && tmp_s
.ok()) {
376 fprintf(stderr
, "MultiGet returned different results with key %s\n",
377 keys
[i
].ToString(true).c_str());
378 fprintf(stderr
, "Get returned ok, MultiGet returned not found\n");
379 is_consistent
= false;
380 } else if (s
.ok() && tmp_s
.IsNotFound()) {
381 fprintf(stderr
, "MultiGet returned different results with key %s\n",
382 keys
[i
].ToString(true).c_str());
383 fprintf(stderr
, "MultiGet returned ok, Get returned not found\n");
384 is_consistent
= false;
385 } else if (s
.ok() && value
!= values
[i
].ToString()) {
386 fprintf(stderr
, "MultiGet returned different results with key %s\n",
387 keys
[i
].ToString(true).c_str());
388 fprintf(stderr
, "MultiGet returned value %s\n",
389 values
[i
].ToString(true).c_str());
390 fprintf(stderr
, "Get returned value %s\n", value
.c_str());
391 is_consistent
= false;
395 if (!is_consistent
) {
396 fprintf(stderr
, "TestMultiGet error: is_consistent is false\n");
397 thread
->stats
.AddErrors(1);
398 // Fail fast to preserve the DB state
399 thread
->shared
->SetVerificationFailure();
403 thread
->stats
.AddGets(1, 1);
404 } else if (s
.IsNotFound()) {
406 thread
->stats
.AddGets(1, 0);
407 } else if (s
.IsMergeInProgress() && use_txn
) {
408 // With txn this is sometimes expected.
409 thread
->stats
.AddGets(1, 1);
411 if (error_count
== 0) {
413 fprintf(stderr
, "MultiGet error: %s\n", s
.ToString().c_str());
414 thread
->stats
.AddErrors(1);
416 thread
->stats
.AddVerifiedErrors(1);
421 if (readoptionscopy
.snapshot
) {
422 db_
->ReleaseSnapshot(readoptionscopy
.snapshot
);
432 Status
TestPrefixScan(ThreadState
* thread
, const ReadOptions
& read_opts
,
433 const std::vector
<int>& rand_column_families
,
434 const std::vector
<int64_t>& rand_keys
) override
{
435 auto cfh
= column_families_
[rand_column_families
[0]];
436 std::string key_str
= Key(rand_keys
[0]);
438 Slice prefix
= Slice(key
.data(), FLAGS_prefix_size
);
440 std::string upper_bound
;
442 ReadOptions ro_copy
= read_opts
;
443 // Get the next prefix first and then see if we want to set upper bound.
444 // We'll use the next prefix in an assertion later on
445 if (GetNextPrefix(prefix
, &upper_bound
) && thread
->rand
.OneIn(2)) {
446 // For half of the time, set the upper bound to the next prefix
447 ub_slice
= Slice(upper_bound
);
448 ro_copy
.iterate_upper_bound
= &ub_slice
;
451 Iterator
* iter
= db_
->NewIterator(ro_copy
, cfh
);
452 unsigned long count
= 0;
453 for (iter
->Seek(prefix
); iter
->Valid() && iter
->key().starts_with(prefix
);
458 assert(count
<= GetPrefixKeyCount(prefix
.ToString(), upper_bound
));
460 Status s
= iter
->status();
461 if (iter
->status().ok()) {
462 thread
->stats
.AddPrefixes(1, count
);
464 fprintf(stderr
, "TestPrefixScan error: %s\n", s
.ToString().c_str());
465 thread
->stats
.AddErrors(1);
471 Status
TestPut(ThreadState
* thread
, WriteOptions
& write_opts
,
472 const ReadOptions
& read_opts
,
473 const std::vector
<int>& rand_column_families
,
474 const std::vector
<int64_t>& rand_keys
, char (&value
)[100],
475 std::unique_ptr
<MutexLock
>& lock
) override
{
476 auto shared
= thread
->shared
;
477 int64_t max_key
= shared
->GetMaxKey();
478 int64_t rand_key
= rand_keys
[0];
479 int rand_column_family
= rand_column_families
[0];
480 while (!shared
->AllowsOverwrite(rand_key
) &&
481 (FLAGS_use_merge
|| shared
->Exists(rand_column_family
, rand_key
))) {
483 rand_key
= thread
->rand
.Next() % max_key
;
484 rand_column_family
= thread
->rand
.Next() % FLAGS_column_families
;
486 new MutexLock(shared
->GetMutexForKey(rand_column_family
, rand_key
)));
489 std::string key_str
= Key(rand_key
);
491 ColumnFamilyHandle
* cfh
= column_families_
[rand_column_family
];
493 if (FLAGS_verify_before_write
) {
494 std::string key_str2
= Key(rand_key
);
497 Status s
= db_
->Get(read_opts
, cfh
, k
, &from_db
);
498 if (!VerifyValue(rand_column_family
, rand_key
, read_opts
, shared
, from_db
,
503 uint32_t value_base
= thread
->rand
.Next() % shared
->UNKNOWN_SENTINEL
;
504 size_t sz
= GenerateValue(value_base
, value
, sizeof(value
));
506 shared
->Put(rand_column_family
, rand_key
, value_base
, true /* pending */);
508 if (FLAGS_use_merge
) {
509 if (!FLAGS_use_txn
) {
510 s
= db_
->Merge(write_opts
, cfh
, key
, v
);
514 s
= NewTxn(write_opts
, &txn
);
516 s
= txn
->Merge(cfh
, key
, v
);
524 if (!FLAGS_use_txn
) {
525 s
= db_
->Put(write_opts
, cfh
, key
, v
);
529 s
= NewTxn(write_opts
, &txn
);
531 s
= txn
->Put(cfh
, key
, v
);
539 shared
->Put(rand_column_family
, rand_key
, value_base
, false /* pending */);
541 fprintf(stderr
, "put or merge error: %s\n", s
.ToString().c_str());
544 thread
->stats
.AddBytesForWrites(1, sz
);
545 PrintKeyValue(rand_column_family
, static_cast<uint32_t>(rand_key
), value
,
550 Status
TestDelete(ThreadState
* thread
, WriteOptions
& write_opts
,
551 const std::vector
<int>& rand_column_families
,
552 const std::vector
<int64_t>& rand_keys
,
553 std::unique_ptr
<MutexLock
>& lock
) override
{
554 int64_t rand_key
= rand_keys
[0];
555 int rand_column_family
= rand_column_families
[0];
556 auto shared
= thread
->shared
;
557 int64_t max_key
= shared
->GetMaxKey();
560 // If the chosen key does not allow overwrite and it does not exist,
561 // choose another key.
562 while (!shared
->AllowsOverwrite(rand_key
) &&
563 !shared
->Exists(rand_column_family
, rand_key
)) {
565 rand_key
= thread
->rand
.Next() % max_key
;
566 rand_column_family
= thread
->rand
.Next() % FLAGS_column_families
;
568 new MutexLock(shared
->GetMutexForKey(rand_column_family
, rand_key
)));
571 std::string key_str
= Key(rand_key
);
573 auto cfh
= column_families_
[rand_column_family
];
575 // Use delete if the key may be overwritten and a single deletion
578 if (shared
->AllowsOverwrite(rand_key
)) {
579 shared
->Delete(rand_column_family
, rand_key
, true /* pending */);
580 if (!FLAGS_use_txn
) {
581 s
= db_
->Delete(write_opts
, cfh
, key
);
585 s
= NewTxn(write_opts
, &txn
);
587 s
= txn
->Delete(cfh
, key
);
594 shared
->Delete(rand_column_family
, rand_key
, false /* pending */);
595 thread
->stats
.AddDeletes(1);
597 fprintf(stderr
, "delete error: %s\n", s
.ToString().c_str());
601 shared
->SingleDelete(rand_column_family
, rand_key
, true /* pending */);
602 if (!FLAGS_use_txn
) {
603 s
= db_
->SingleDelete(write_opts
, cfh
, key
);
607 s
= NewTxn(write_opts
, &txn
);
609 s
= txn
->SingleDelete(cfh
, key
);
616 shared
->SingleDelete(rand_column_family
, rand_key
, false /* pending */);
617 thread
->stats
.AddSingleDeletes(1);
619 fprintf(stderr
, "single delete error: %s\n", s
.ToString().c_str());
626 Status
TestDeleteRange(ThreadState
* thread
, WriteOptions
& write_opts
,
627 const std::vector
<int>& rand_column_families
,
628 const std::vector
<int64_t>& rand_keys
,
629 std::unique_ptr
<MutexLock
>& lock
) override
{
630 // OPERATION delete range
631 std::vector
<std::unique_ptr
<MutexLock
>> range_locks
;
632 // delete range does not respect disallowed overwrites. the keys for
633 // which overwrites are disallowed are randomly distributed so it
634 // could be expensive to find a range where each key allows
636 int64_t rand_key
= rand_keys
[0];
637 int rand_column_family
= rand_column_families
[0];
638 auto shared
= thread
->shared
;
639 int64_t max_key
= shared
->GetMaxKey();
640 if (rand_key
> max_key
- FLAGS_range_deletion_width
) {
643 thread
->rand
.Next() % (max_key
- FLAGS_range_deletion_width
+ 1);
644 range_locks
.emplace_back(
645 new MutexLock(shared
->GetMutexForKey(rand_column_family
, rand_key
)));
647 range_locks
.emplace_back(std::move(lock
));
649 for (int j
= 1; j
< FLAGS_range_deletion_width
; ++j
) {
650 if (((rand_key
+ j
) & ((1 << FLAGS_log2_keys_per_lock
) - 1)) == 0) {
651 range_locks
.emplace_back(new MutexLock(
652 shared
->GetMutexForKey(rand_column_family
, rand_key
+ j
)));
655 shared
->DeleteRange(rand_column_family
, rand_key
,
656 rand_key
+ FLAGS_range_deletion_width
,
659 std::string keystr
= Key(rand_key
);
661 auto cfh
= column_families_
[rand_column_family
];
662 std::string end_keystr
= Key(rand_key
+ FLAGS_range_deletion_width
);
663 Slice end_key
= end_keystr
;
664 Status s
= db_
->DeleteRange(write_opts
, cfh
, key
, end_key
);
666 fprintf(stderr
, "delete range error: %s\n", s
.ToString().c_str());
669 int covered
= shared
->DeleteRange(rand_column_family
, rand_key
,
670 rand_key
+ FLAGS_range_deletion_width
,
671 false /* pending */);
672 thread
->stats
.AddRangeDeletions(1);
673 thread
->stats
.AddCoveredByRangeDeletions(covered
);
678 void TestIngestExternalFile(
679 ThreadState
* /* thread */,
680 const std::vector
<int>& /* rand_column_families */,
681 const std::vector
<int64_t>& /* rand_keys */,
682 std::unique_ptr
<MutexLock
>& /* lock */) override
{
685 "RocksDB lite does not support "
686 "TestIngestExternalFile\n");
690 void TestIngestExternalFile(ThreadState
* thread
,
691 const std::vector
<int>& rand_column_families
,
692 const std::vector
<int64_t>& rand_keys
,
693 std::unique_ptr
<MutexLock
>& lock
) override
{
694 const std::string sst_filename
=
695 FLAGS_db
+ "/." + ToString(thread
->tid
) + ".sst";
697 if (db_stress_env
->FileExists(sst_filename
).ok()) {
698 // Maybe we terminated abnormally before, so cleanup to give this file
699 // ingestion a clean slate
700 s
= db_stress_env
->DeleteFile(sst_filename
);
703 SstFileWriter
sst_file_writer(EnvOptions(options_
), options_
);
705 s
= sst_file_writer
.Open(sst_filename
);
707 int64_t key_base
= rand_keys
[0];
708 int column_family
= rand_column_families
[0];
709 std::vector
<std::unique_ptr
<MutexLock
>> range_locks
;
710 std::vector
<uint32_t> values
;
711 SharedState
* shared
= thread
->shared
;
713 // Grab locks, set pending state on expected values, and add keys
714 for (int64_t key
= key_base
;
715 s
.ok() && key
< std::min(key_base
+ FLAGS_ingest_external_file_width
,
716 shared
->GetMaxKey());
718 if (key
== key_base
) {
719 range_locks
.emplace_back(std::move(lock
));
720 } else if ((key
& ((1 << FLAGS_log2_keys_per_lock
) - 1)) == 0) {
721 range_locks
.emplace_back(
722 new MutexLock(shared
->GetMutexForKey(column_family
, key
)));
725 uint32_t value_base
= thread
->rand
.Next() % shared
->UNKNOWN_SENTINEL
;
726 values
.push_back(value_base
);
727 shared
->Put(column_family
, key
, value_base
, true /* pending */);
730 size_t value_len
= GenerateValue(value_base
, value
, sizeof(value
));
731 auto key_str
= Key(key
);
732 s
= sst_file_writer
.Put(Slice(key_str
), Slice(value
, value_len
));
736 s
= sst_file_writer
.Finish();
739 s
= db_
->IngestExternalFile(column_families_
[column_family
],
740 {sst_filename
}, IngestExternalFileOptions());
743 fprintf(stderr
, "file ingestion error: %s\n", s
.ToString().c_str());
746 int64_t key
= key_base
;
747 for (int32_t value
: values
) {
748 shared
->Put(column_family
, key
, value
, false /* pending */);
752 #endif // ROCKSDB_LITE
754 bool VerifyValue(int cf
, int64_t key
, const ReadOptions
& /*opts*/,
755 SharedState
* shared
, const std::string
& value_from_db
,
756 const Status
& s
, bool strict
= false) const {
757 if (shared
->HasVerificationFailedYet()) {
760 // compare value_from_db with the value in the shared state
761 char value
[kValueMaxLen
];
762 uint32_t value_base
= shared
->Get(cf
, key
);
763 if (value_base
== SharedState::UNKNOWN_SENTINEL
) {
766 if (value_base
== SharedState::DELETION_SENTINEL
&& !strict
) {
771 if (value_base
== SharedState::DELETION_SENTINEL
) {
772 VerificationAbort(shared
, "Unexpected value found", cf
, key
);
775 size_t sz
= GenerateValue(value_base
, value
, sizeof(value
));
776 if (value_from_db
.length() != sz
) {
777 VerificationAbort(shared
, "Length of value read is not equal", cf
, key
);
780 if (memcmp(value_from_db
.data(), value
, sz
) != 0) {
781 VerificationAbort(shared
, "Contents of value read don't match", cf
,
786 if (value_base
!= SharedState::DELETION_SENTINEL
) {
787 VerificationAbort(shared
, "Value not found: " + s
.ToString(), cf
, key
);
795 StressTest
* CreateNonBatchedOpsStressTest() {
796 return new NonBatchedOpsStressTest();
799 } // namespace ROCKSDB_NAMESPACE