1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors
13 #include "db_stress_tool/db_stress_stat.h"
14 #include "util/gflags_compat.h"
17 DECLARE_int64(max_key
);
18 DECLARE_uint64(log2_keys_per_lock
);
19 DECLARE_int32(threads
);
20 DECLARE_int32(column_families
);
21 DECLARE_int32(nooverwritepercent
);
22 DECLARE_string(expected_values_path
);
23 DECLARE_int32(clear_column_family_one_in
);
24 DECLARE_bool(test_batches_snapshots
);
25 DECLARE_int32(compaction_thread_pool_adjust_interval
);
26 DECLARE_int32(continuous_verification_interval
);
28 namespace ROCKSDB_NAMESPACE
{
31 // State shared by all concurrent executions of the same benchmark.
34 // indicates a key may have any value (or not be present) as an operation on
36 static const uint32_t UNKNOWN_SENTINEL
;
37 // indicates a key should definitely be deleted
38 static const uint32_t DELETION_SENTINEL
;
40 SharedState(Env
* env
, StressTest
* stress_test
)
42 seed_(static_cast<uint32_t>(FLAGS_seed
)),
43 max_key_(FLAGS_max_key
),
44 log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock
)),
45 num_threads_(FLAGS_threads
),
53 should_stop_bg_thread_(false),
54 bg_thread_finished_(0),
55 stress_test_(stress_test
),
56 verification_failure_(false),
57 should_stop_test_(false),
58 no_overwrite_ids_(FLAGS_column_families
),
60 printing_verification_results_(false) {
61 // Pick random keys in each column family that will not experience
64 fprintf(stdout
, "Choosing random keys with no overwrite\n");
66 // Start with the identity permutation. Subsequent iterations of
67 // for loop below will start with perm of previous for loop
68 int64_t* permutation
= new int64_t[max_key_
];
69 for (int64_t i
= 0; i
< max_key_
; i
++) {
72 // Now do the Knuth shuffle
73 int64_t num_no_overwrite_keys
= (max_key_
* FLAGS_nooverwritepercent
) / 100;
74 // Only need to figure out first num_no_overwrite_keys of permutation
75 no_overwrite_ids_
.reserve(num_no_overwrite_keys
);
76 for (int64_t i
= 0; i
< num_no_overwrite_keys
; i
++) {
77 int64_t rand_index
= i
+ rnd
.Next() % (max_key_
- i
);
78 // Swap i and rand_index;
79 int64_t temp
= permutation
[i
];
80 permutation
[i
] = permutation
[rand_index
];
81 permutation
[rand_index
] = temp
;
82 // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of
84 no_overwrite_ids_
.insert(permutation
[i
]);
88 size_t expected_values_size
=
89 sizeof(std::atomic
<uint32_t>) * FLAGS_column_families
* max_key_
;
90 bool values_init_needed
= false;
92 if (!FLAGS_expected_values_path
.empty()) {
93 if (!std::atomic
<uint32_t>{}.is_lock_free()) {
94 status
= Status::InvalidArgument(
95 "Cannot use --expected_values_path on platforms without lock-free "
96 "std::atomic<uint32_t>");
98 if (status
.ok() && FLAGS_clear_column_family_one_in
> 0) {
99 status
= Status::InvalidArgument(
100 "Cannot use --expected_values_path on when "
101 "--clear_column_family_one_in is greater than zero.");
105 status
= env
->GetFileSize(FLAGS_expected_values_path
, &size
);
107 std::unique_ptr
<WritableFile
> wfile
;
108 if (status
.ok() && size
== 0) {
109 const EnvOptions soptions
;
111 env
->NewWritableFile(FLAGS_expected_values_path
, &wfile
, soptions
);
113 if (status
.ok() && size
== 0) {
114 std::string
buf(expected_values_size
, '\0');
115 status
= wfile
->Append(buf
);
116 values_init_needed
= true;
119 status
= env
->NewMemoryMappedFileBuffer(FLAGS_expected_values_path
,
120 &expected_mmap_buffer_
);
123 assert(expected_mmap_buffer_
->GetLen() == expected_values_size
);
124 values_
= static_cast<std::atomic
<uint32_t>*>(
125 expected_mmap_buffer_
->GetBase());
126 assert(values_
!= nullptr);
128 fprintf(stderr
, "Failed opening shared file '%s' with error: %s\n",
129 FLAGS_expected_values_path
.c_str(), status
.ToString().c_str());
130 assert(values_
== nullptr);
133 if (values_
== nullptr) {
134 values_allocation_
.reset(
135 new std::atomic
<uint32_t>[FLAGS_column_families
* max_key_
]);
136 values_
= &values_allocation_
[0];
137 values_init_needed
= true;
139 assert(values_
!= nullptr);
140 if (values_init_needed
) {
141 for (int i
= 0; i
< FLAGS_column_families
; ++i
) {
142 for (int j
= 0; j
< max_key_
; ++j
) {
143 Delete(i
, j
, false /* pending */);
148 if (FLAGS_test_batches_snapshots
) {
149 fprintf(stdout
, "No lock creation because test_batches_snapshots set\n");
153 long num_locks
= static_cast<long>(max_key_
>> log2_keys_per_lock_
);
154 if (max_key_
& ((1 << log2_keys_per_lock_
) - 1)) {
157 fprintf(stdout
, "Creating %ld locks\n", num_locks
* FLAGS_column_families
);
158 key_locks_
.resize(FLAGS_column_families
);
160 for (int i
= 0; i
< FLAGS_column_families
; ++i
) {
161 key_locks_
[i
].resize(num_locks
);
162 for (auto& ptr
: key_locks_
[i
]) {
163 ptr
.reset(new port::Mutex
);
166 if (FLAGS_compaction_thread_pool_adjust_interval
> 0) {
168 fprintf(stdout
, "Starting compaction_thread_pool_adjust_thread\n");
170 if (FLAGS_continuous_verification_interval
> 0) {
172 fprintf(stdout
, "Starting continuous_verification_thread\n");
178 port::Mutex
* GetMutex() { return &mu_
; }
180 port::CondVar
* GetCondVar() { return &cv_
; }
182 StressTest
* GetStressTest() const { return stress_test_
; }
184 int64_t GetMaxKey() const { return max_key_
; }
186 uint32_t GetNumThreads() const { return num_threads_
; }
188 void IncInitialized() { num_initialized_
++; }
190 void IncOperated() { num_populated_
++; }
192 void IncDone() { num_done_
++; }
194 void IncVotedReopen() { vote_reopen_
= (vote_reopen_
+ 1) % num_threads_
; }
196 bool AllInitialized() const { return num_initialized_
>= num_threads_
; }
198 bool AllOperated() const { return num_populated_
>= num_threads_
; }
200 bool AllDone() const { return num_done_
>= num_threads_
; }
202 bool AllVotedReopen() { return (vote_reopen_
== 0); }
204 void SetStart() { start_
= true; }
206 void SetStartVerify() { start_verify_
= true; }
208 bool Started() const { return start_
; }
210 bool VerifyStarted() const { return start_verify_
; }
212 void SetVerificationFailure() { verification_failure_
.store(true); }
214 bool HasVerificationFailedYet() const { return verification_failure_
.load(); }
216 void SetShouldStopTest() { should_stop_test_
.store(true); }
218 bool ShouldStopTest() const { return should_stop_test_
.load(); }
220 port::Mutex
* GetMutexForKey(int cf
, int64_t key
) {
221 return key_locks_
[cf
][key
>> log2_keys_per_lock_
].get();
224 void LockColumnFamily(int cf
) {
225 for (auto& mutex
: key_locks_
[cf
]) {
230 void UnlockColumnFamily(int cf
) {
231 for (auto& mutex
: key_locks_
[cf
]) {
236 std::atomic
<uint32_t>& Value(int cf
, int64_t key
) const {
237 return values_
[cf
* max_key_
+ key
];
240 void ClearColumnFamily(int cf
) {
241 std::fill(&Value(cf
, 0 /* key */), &Value(cf
+ 1, 0 /* key */),
245 // @param pending True if the update may have started but is not yet
246 // guaranteed finished. This is useful for crash-recovery testing when the
247 // process may crash before updating the expected values array.
248 void Put(int cf
, int64_t key
, uint32_t value_base
, bool pending
) {
250 // prevent expected-value update from reordering before Write
251 std::atomic_thread_fence(std::memory_order_release
);
253 Value(cf
, key
).store(pending
? UNKNOWN_SENTINEL
: value_base
,
254 std::memory_order_relaxed
);
256 // prevent Write from reordering before expected-value update
257 std::atomic_thread_fence(std::memory_order_release
);
261 uint32_t Get(int cf
, int64_t key
) const { return Value(cf
, key
); }
263 // @param pending See comment above Put()
264 // Returns true if the key was not yet deleted.
265 bool Delete(int cf
, int64_t key
, bool pending
) {
266 if (Value(cf
, key
) == DELETION_SENTINEL
) {
269 Put(cf
, key
, DELETION_SENTINEL
, pending
);
273 // @param pending See comment above Put()
274 // Returns true if the key was not yet deleted.
275 bool SingleDelete(int cf
, int64_t key
, bool pending
) {
276 return Delete(cf
, key
, pending
);
279 // @param pending See comment above Put()
280 // Returns number of keys deleted by the call.
281 int DeleteRange(int cf
, int64_t begin_key
, int64_t end_key
, bool pending
) {
283 for (int64_t key
= begin_key
; key
< end_key
; ++key
) {
284 if (Delete(cf
, key
, pending
)) {
291 bool AllowsOverwrite(int64_t key
) {
292 return no_overwrite_ids_
.find(key
) == no_overwrite_ids_
.end();
295 bool Exists(int cf
, int64_t key
) {
296 // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite
297 // is disallowed can't be accidentally added a second time, in which case
298 // SingleDelete wouldn't be able to properly delete the key. It does allow
299 // the case where a SingleDelete might be added which covers nothing, but
300 // that's not a correctness issue.
301 uint32_t expected_value
= Value(cf
, key
).load();
302 return expected_value
!= DELETION_SENTINEL
;
305 uint32_t GetSeed() const { return seed_
; }
307 void SetShouldStopBgThread() { should_stop_bg_thread_
= true; }
309 bool ShouldStopBgThread() { return should_stop_bg_thread_
; }
311 void IncBgThreadsFinished() { ++bg_thread_finished_
; }
313 bool BgThreadsFinished() const {
314 return bg_thread_finished_
== num_bg_threads_
;
317 bool ShouldVerifyAtBeginning() const {
318 return expected_mmap_buffer_
.get() != nullptr;
321 bool PrintingVerificationResults() {
323 return !printing_verification_results_
.compare_exchange_strong(
324 tmp
, true, std::memory_order_relaxed
);
327 void FinishPrintingVerificationResults() {
328 printing_verification_results_
.store(false, std::memory_order_relaxed
);
334 const uint32_t seed_
;
335 const int64_t max_key_
;
336 const uint32_t log2_keys_per_lock_
;
337 const int num_threads_
;
338 long num_initialized_
;
345 bool should_stop_bg_thread_
;
346 int bg_thread_finished_
;
347 StressTest
* stress_test_
;
348 std::atomic
<bool> verification_failure_
;
349 std::atomic
<bool> should_stop_test_
;
351 // Keys that should not be overwritten
352 std::unordered_set
<size_t> no_overwrite_ids_
;
354 std::atomic
<uint32_t>* values_
;
355 std::unique_ptr
<std::atomic
<uint32_t>[]> values_allocation_
;
356 // Has to make it owned by a smart ptr as port::Mutex is not copyable
357 // and storing it in the container may require copying depending on the impl.
358 std::vector
<std::vector
<std::unique_ptr
<port::Mutex
>>> key_locks_
;
359 std::unique_ptr
<MemoryMappedFileBuffer
> expected_mmap_buffer_
;
360 std::atomic
<bool> printing_verification_results_
;
363 // Per-thread state for concurrent executions of the same benchmark.
365 uint32_t tid
; // 0..n-1
366 Random rand
; // Has different seeds for different threads
369 struct SnapshotState
{
370 const Snapshot
* snapshot
;
371 // The cf from which we did a Get at this snapshot
373 // The name of the cf at the time that we did a read
374 std::string cf_at_name
;
375 // The key with which we did a Get at this snapshot
377 // The status of the Get
379 // The value of the Get
381 // optional state of all keys in the db
382 std::vector
<bool>* key_vec
;
384 std::queue
<std::pair
<uint64_t, SnapshotState
>> snapshot_queue
;
386 ThreadState(uint32_t index
, SharedState
* _shared
)
387 : tid(index
), rand(1000 + index
+ _shared
->GetSeed()), shared(_shared
) {}
389 } // namespace ROCKSDB_NAMESPACE