1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors
13 #include "db_stress_tool/db_stress_stat.h"
14 // SyncPoint is not supported in Released Windows Mode.
15 #if !(defined NDEBUG) || !defined(OS_WIN)
16 #include "test_util/sync_point.h"
17 #endif // !(defined NDEBUG) || !defined(OS_WIN)
18 #include "util/gflags_compat.h"
21 DECLARE_int64(max_key
);
22 DECLARE_uint64(log2_keys_per_lock
);
23 DECLARE_int32(threads
);
24 DECLARE_int32(column_families
);
25 DECLARE_int32(nooverwritepercent
);
26 DECLARE_string(expected_values_path
);
27 DECLARE_int32(clear_column_family_one_in
);
28 DECLARE_bool(test_batches_snapshots
);
29 DECLARE_int32(compaction_thread_pool_adjust_interval
);
30 DECLARE_int32(continuous_verification_interval
);
31 DECLARE_int32(read_fault_one_in
);
33 namespace ROCKSDB_NAMESPACE
{
36 // State shared by all concurrent executions of the same benchmark.
39 // indicates a key may have any value (or not be present) as an operation on
41 static const uint32_t UNKNOWN_SENTINEL
;
42 // indicates a key should definitely be deleted
43 static const uint32_t DELETION_SENTINEL
;
45 // Errors when reading filter blocks are ignored, so we use a thread
46 // local variable updated via sync points to keep track of errors injected
47 // while reading filter blocks in order to ignore the Get/MultiGet result
49 #if defined(ROCKSDB_SUPPORT_THREAD_LOCAL)
50 #if defined(OS_SOLARIS)
51 static __thread
bool ignore_read_error
;
53 static thread_local
bool ignore_read_error
;
56 static bool ignore_read_error
;
57 #endif // ROCKSDB_SUPPORT_THREAD_LOCAL
59 SharedState(Env
* env
, StressTest
* stress_test
)
61 seed_(static_cast<uint32_t>(FLAGS_seed
)),
62 max_key_(FLAGS_max_key
),
63 log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock
)),
64 num_threads_(FLAGS_threads
),
72 should_stop_bg_thread_(false),
73 bg_thread_finished_(0),
74 stress_test_(stress_test
),
75 verification_failure_(false),
76 should_stop_test_(false),
77 no_overwrite_ids_(FLAGS_column_families
),
79 printing_verification_results_(false) {
80 // Pick random keys in each column family that will not experience
83 fprintf(stdout
, "Choosing random keys with no overwrite\n");
85 // Start with the identity permutation. Subsequent iterations of
86 // for loop below will start with perm of previous for loop
87 int64_t* permutation
= new int64_t[max_key_
];
88 for (int64_t i
= 0; i
< max_key_
; i
++) {
91 // Now do the Knuth shuffle
92 int64_t num_no_overwrite_keys
= (max_key_
* FLAGS_nooverwritepercent
) / 100;
93 // Only need to figure out first num_no_overwrite_keys of permutation
94 no_overwrite_ids_
.reserve(num_no_overwrite_keys
);
95 for (int64_t i
= 0; i
< num_no_overwrite_keys
; i
++) {
96 int64_t rand_index
= i
+ rnd
.Next() % (max_key_
- i
);
97 // Swap i and rand_index;
98 int64_t temp
= permutation
[i
];
99 permutation
[i
] = permutation
[rand_index
];
100 permutation
[rand_index
] = temp
;
101 // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of
103 no_overwrite_ids_
.insert(permutation
[i
]);
105 delete[] permutation
;
107 size_t expected_values_size
=
108 sizeof(std::atomic
<uint32_t>) * FLAGS_column_families
* max_key_
;
109 bool values_init_needed
= false;
111 if (!FLAGS_expected_values_path
.empty()) {
112 if (!std::atomic
<uint32_t>{}.is_lock_free()) {
113 status
= Status::InvalidArgument(
114 "Cannot use --expected_values_path on platforms without lock-free "
115 "std::atomic<uint32_t>");
117 if (status
.ok() && FLAGS_clear_column_family_one_in
> 0) {
118 status
= Status::InvalidArgument(
119 "Cannot use --expected_values_path on when "
120 "--clear_column_family_one_in is greater than zero.");
124 status
= env
->GetFileSize(FLAGS_expected_values_path
, &size
);
126 std::unique_ptr
<WritableFile
> wfile
;
127 if (status
.ok() && size
== 0) {
128 const EnvOptions soptions
;
130 env
->NewWritableFile(FLAGS_expected_values_path
, &wfile
, soptions
);
132 if (status
.ok() && size
== 0) {
133 std::string
buf(expected_values_size
, '\0');
134 status
= wfile
->Append(buf
);
135 values_init_needed
= true;
138 status
= env
->NewMemoryMappedFileBuffer(FLAGS_expected_values_path
,
139 &expected_mmap_buffer_
);
142 assert(expected_mmap_buffer_
->GetLen() == expected_values_size
);
143 values_
= static_cast<std::atomic
<uint32_t>*>(
144 expected_mmap_buffer_
->GetBase());
145 assert(values_
!= nullptr);
147 fprintf(stderr
, "Failed opening shared file '%s' with error: %s\n",
148 FLAGS_expected_values_path
.c_str(), status
.ToString().c_str());
149 assert(values_
== nullptr);
152 if (values_
== nullptr) {
153 values_allocation_
.reset(
154 new std::atomic
<uint32_t>[FLAGS_column_families
* max_key_
]);
155 values_
= &values_allocation_
[0];
156 values_init_needed
= true;
158 assert(values_
!= nullptr);
159 if (values_init_needed
) {
160 for (int i
= 0; i
< FLAGS_column_families
; ++i
) {
161 for (int j
= 0; j
< max_key_
; ++j
) {
162 Delete(i
, j
, false /* pending */);
167 if (FLAGS_test_batches_snapshots
) {
168 fprintf(stdout
, "No lock creation because test_batches_snapshots set\n");
172 long num_locks
= static_cast<long>(max_key_
>> log2_keys_per_lock_
);
173 if (max_key_
& ((1 << log2_keys_per_lock_
) - 1)) {
176 fprintf(stdout
, "Creating %ld locks\n", num_locks
* FLAGS_column_families
);
177 key_locks_
.resize(FLAGS_column_families
);
179 for (int i
= 0; i
< FLAGS_column_families
; ++i
) {
180 key_locks_
[i
].resize(num_locks
);
181 for (auto& ptr
: key_locks_
[i
]) {
182 ptr
.reset(new port::Mutex
);
185 if (FLAGS_compaction_thread_pool_adjust_interval
> 0) {
187 fprintf(stdout
, "Starting compaction_thread_pool_adjust_thread\n");
189 if (FLAGS_continuous_verification_interval
> 0) {
191 fprintf(stdout
, "Starting continuous_verification_thread\n");
194 if (FLAGS_read_fault_one_in
) {
195 SyncPoint::GetInstance()->SetCallBack("FaultInjectionIgnoreError",
196 IgnoreReadErrorCallback
);
197 SyncPoint::GetInstance()->EnableProcessing();
204 if (FLAGS_read_fault_one_in
) {
205 SyncPoint::GetInstance()->ClearAllCallBacks();
206 SyncPoint::GetInstance()->DisableProcessing();
211 port::Mutex
* GetMutex() { return &mu_
; }
213 port::CondVar
* GetCondVar() { return &cv_
; }
215 StressTest
* GetStressTest() const { return stress_test_
; }
217 int64_t GetMaxKey() const { return max_key_
; }
219 uint32_t GetNumThreads() const { return num_threads_
; }
221 void IncInitialized() { num_initialized_
++; }
223 void IncOperated() { num_populated_
++; }
225 void IncDone() { num_done_
++; }
227 void IncVotedReopen() { vote_reopen_
= (vote_reopen_
+ 1) % num_threads_
; }
229 bool AllInitialized() const { return num_initialized_
>= num_threads_
; }
231 bool AllOperated() const { return num_populated_
>= num_threads_
; }
233 bool AllDone() const { return num_done_
>= num_threads_
; }
235 bool AllVotedReopen() { return (vote_reopen_
== 0); }
237 void SetStart() { start_
= true; }
239 void SetStartVerify() { start_verify_
= true; }
241 bool Started() const { return start_
; }
243 bool VerifyStarted() const { return start_verify_
; }
245 void SetVerificationFailure() { verification_failure_
.store(true); }
247 bool HasVerificationFailedYet() const { return verification_failure_
.load(); }
249 void SetShouldStopTest() { should_stop_test_
.store(true); }
251 bool ShouldStopTest() const { return should_stop_test_
.load(); }
253 port::Mutex
* GetMutexForKey(int cf
, int64_t key
) {
254 return key_locks_
[cf
][key
>> log2_keys_per_lock_
].get();
257 void LockColumnFamily(int cf
) {
258 for (auto& mutex
: key_locks_
[cf
]) {
263 void UnlockColumnFamily(int cf
) {
264 for (auto& mutex
: key_locks_
[cf
]) {
269 std::atomic
<uint32_t>& Value(int cf
, int64_t key
) const {
270 return values_
[cf
* max_key_
+ key
];
273 void ClearColumnFamily(int cf
) {
274 std::fill(&Value(cf
, 0 /* key */), &Value(cf
+ 1, 0 /* key */),
278 // @param pending True if the update may have started but is not yet
279 // guaranteed finished. This is useful for crash-recovery testing when the
280 // process may crash before updating the expected values array.
281 void Put(int cf
, int64_t key
, uint32_t value_base
, bool pending
) {
283 // prevent expected-value update from reordering before Write
284 std::atomic_thread_fence(std::memory_order_release
);
286 Value(cf
, key
).store(pending
? UNKNOWN_SENTINEL
: value_base
,
287 std::memory_order_relaxed
);
289 // prevent Write from reordering before expected-value update
290 std::atomic_thread_fence(std::memory_order_release
);
294 uint32_t Get(int cf
, int64_t key
) const { return Value(cf
, key
); }
296 // @param pending See comment above Put()
297 // Returns true if the key was not yet deleted.
298 bool Delete(int cf
, int64_t key
, bool pending
) {
299 if (Value(cf
, key
) == DELETION_SENTINEL
) {
302 Put(cf
, key
, DELETION_SENTINEL
, pending
);
306 // @param pending See comment above Put()
307 // Returns true if the key was not yet deleted.
308 bool SingleDelete(int cf
, int64_t key
, bool pending
) {
309 return Delete(cf
, key
, pending
);
312 // @param pending See comment above Put()
313 // Returns number of keys deleted by the call.
314 int DeleteRange(int cf
, int64_t begin_key
, int64_t end_key
, bool pending
) {
316 for (int64_t key
= begin_key
; key
< end_key
; ++key
) {
317 if (Delete(cf
, key
, pending
)) {
324 bool AllowsOverwrite(int64_t key
) {
325 return no_overwrite_ids_
.find(key
) == no_overwrite_ids_
.end();
328 bool Exists(int cf
, int64_t key
) {
329 // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite
330 // is disallowed can't be accidentally added a second time, in which case
331 // SingleDelete wouldn't be able to properly delete the key. It does allow
332 // the case where a SingleDelete might be added which covers nothing, but
333 // that's not a correctness issue.
334 uint32_t expected_value
= Value(cf
, key
).load();
335 return expected_value
!= DELETION_SENTINEL
;
338 uint32_t GetSeed() const { return seed_
; }
340 void SetShouldStopBgThread() { should_stop_bg_thread_
= true; }
342 bool ShouldStopBgThread() { return should_stop_bg_thread_
; }
344 void IncBgThreadsFinished() { ++bg_thread_finished_
; }
346 bool BgThreadsFinished() const {
347 return bg_thread_finished_
== num_bg_threads_
;
350 bool ShouldVerifyAtBeginning() const {
351 return expected_mmap_buffer_
.get() != nullptr;
354 bool PrintingVerificationResults() {
356 return !printing_verification_results_
.compare_exchange_strong(
357 tmp
, true, std::memory_order_relaxed
);
360 void FinishPrintingVerificationResults() {
361 printing_verification_results_
.store(false, std::memory_order_relaxed
);
365 static void IgnoreReadErrorCallback(void*) {
366 ignore_read_error
= true;
371 const uint32_t seed_
;
372 const int64_t max_key_
;
373 const uint32_t log2_keys_per_lock_
;
374 const int num_threads_
;
375 long num_initialized_
;
382 bool should_stop_bg_thread_
;
383 int bg_thread_finished_
;
384 StressTest
* stress_test_
;
385 std::atomic
<bool> verification_failure_
;
386 std::atomic
<bool> should_stop_test_
;
388 // Keys that should not be overwritten
389 std::unordered_set
<size_t> no_overwrite_ids_
;
391 std::atomic
<uint32_t>* values_
;
392 std::unique_ptr
<std::atomic
<uint32_t>[]> values_allocation_
;
393 // Has to make it owned by a smart ptr as port::Mutex is not copyable
394 // and storing it in the container may require copying depending on the impl.
395 std::vector
<std::vector
<std::unique_ptr
<port::Mutex
>>> key_locks_
;
396 std::unique_ptr
<MemoryMappedFileBuffer
> expected_mmap_buffer_
;
397 std::atomic
<bool> printing_verification_results_
;
400 // Per-thread state for concurrent executions of the same benchmark.
402 uint32_t tid
; // 0..n-1
403 Random rand
; // Has different seeds for different threads
406 struct SnapshotState
{
407 const Snapshot
* snapshot
;
408 // The cf from which we did a Get at this snapshot
410 // The name of the cf at the time that we did a read
411 std::string cf_at_name
;
412 // The key with which we did a Get at this snapshot
414 // The status of the Get
416 // The value of the Get
418 // optional state of all keys in the db
419 std::vector
<bool>* key_vec
;
421 std::queue
<std::pair
<uint64_t, SnapshotState
>> snapshot_queue
;
423 ThreadState(uint32_t index
, SharedState
* _shared
)
424 : tid(index
), rand(1000 + index
+ _shared
->GetSeed()), shared(_shared
) {}
426 } // namespace ROCKSDB_NAMESPACE