]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db_stress_tool/db_stress_shared_state.h
b68670b580efe0c70198f64fd355fbbe1c9447fc
[ceph.git] / ceph / src / rocksdb / db_stress_tool / db_stress_shared_state.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors
9
10 #ifdef GFLAGS
11 #pragma once
12
13 #include "db_stress_tool/db_stress_stat.h"
14 #include "util/gflags_compat.h"
15
16 DECLARE_uint64(seed);
17 DECLARE_int64(max_key);
18 DECLARE_uint64(log2_keys_per_lock);
19 DECLARE_int32(threads);
20 DECLARE_int32(column_families);
21 DECLARE_int32(nooverwritepercent);
22 DECLARE_string(expected_values_path);
23 DECLARE_int32(clear_column_family_one_in);
24 DECLARE_bool(test_batches_snapshots);
25 DECLARE_int32(compaction_thread_pool_adjust_interval);
26 DECLARE_int32(continuous_verification_interval);
27
28 namespace ROCKSDB_NAMESPACE {
29 class StressTest;
30
31 // State shared by all concurrent executions of the same benchmark.
32 class SharedState {
33 public:
34 // indicates a key may have any value (or not be present) as an operation on
35 // it is incomplete.
36 static const uint32_t UNKNOWN_SENTINEL;
37 // indicates a key should definitely be deleted
38 static const uint32_t DELETION_SENTINEL;
39
40 SharedState(Env* env, StressTest* stress_test)
41 : cv_(&mu_),
42 seed_(static_cast<uint32_t>(FLAGS_seed)),
43 max_key_(FLAGS_max_key),
44 log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)),
45 num_threads_(FLAGS_threads),
46 num_initialized_(0),
47 num_populated_(0),
48 vote_reopen_(0),
49 num_done_(0),
50 start_(false),
51 start_verify_(false),
52 num_bg_threads_(0),
53 should_stop_bg_thread_(false),
54 bg_thread_finished_(0),
55 stress_test_(stress_test),
56 verification_failure_(false),
57 should_stop_test_(false),
58 no_overwrite_ids_(FLAGS_column_families),
59 values_(nullptr),
60 printing_verification_results_(false) {
61 // Pick random keys in each column family that will not experience
62 // overwrite
63
64 fprintf(stdout, "Choosing random keys with no overwrite\n");
65 Random64 rnd(seed_);
66 // Start with the identity permutation. Subsequent iterations of
67 // for loop below will start with perm of previous for loop
68 int64_t* permutation = new int64_t[max_key_];
69 for (int64_t i = 0; i < max_key_; i++) {
70 permutation[i] = i;
71 }
72 // Now do the Knuth shuffle
73 int64_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100;
74 // Only need to figure out first num_no_overwrite_keys of permutation
75 no_overwrite_ids_.reserve(num_no_overwrite_keys);
76 for (int64_t i = 0; i < num_no_overwrite_keys; i++) {
77 int64_t rand_index = i + rnd.Next() % (max_key_ - i);
78 // Swap i and rand_index;
79 int64_t temp = permutation[i];
80 permutation[i] = permutation[rand_index];
81 permutation[rand_index] = temp;
82 // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of
83 // permutation
84 no_overwrite_ids_.insert(permutation[i]);
85 }
86 delete[] permutation;
87
88 size_t expected_values_size =
89 sizeof(std::atomic<uint32_t>) * FLAGS_column_families * max_key_;
90 bool values_init_needed = false;
91 Status status;
92 if (!FLAGS_expected_values_path.empty()) {
93 if (!std::atomic<uint32_t>{}.is_lock_free()) {
94 status = Status::InvalidArgument(
95 "Cannot use --expected_values_path on platforms without lock-free "
96 "std::atomic<uint32_t>");
97 }
98 if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
99 status = Status::InvalidArgument(
100 "Cannot use --expected_values_path on when "
101 "--clear_column_family_one_in is greater than zero.");
102 }
103 uint64_t size = 0;
104 if (status.ok()) {
105 status = env->GetFileSize(FLAGS_expected_values_path, &size);
106 }
107 std::unique_ptr<WritableFile> wfile;
108 if (status.ok() && size == 0) {
109 const EnvOptions soptions;
110 status =
111 env->NewWritableFile(FLAGS_expected_values_path, &wfile, soptions);
112 }
113 if (status.ok() && size == 0) {
114 std::string buf(expected_values_size, '\0');
115 status = wfile->Append(buf);
116 values_init_needed = true;
117 }
118 if (status.ok()) {
119 status = env->NewMemoryMappedFileBuffer(FLAGS_expected_values_path,
120 &expected_mmap_buffer_);
121 }
122 if (status.ok()) {
123 assert(expected_mmap_buffer_->GetLen() == expected_values_size);
124 values_ = static_cast<std::atomic<uint32_t>*>(
125 expected_mmap_buffer_->GetBase());
126 assert(values_ != nullptr);
127 } else {
128 fprintf(stderr, "Failed opening shared file '%s' with error: %s\n",
129 FLAGS_expected_values_path.c_str(), status.ToString().c_str());
130 assert(values_ == nullptr);
131 }
132 }
133 if (values_ == nullptr) {
134 values_allocation_.reset(
135 new std::atomic<uint32_t>[FLAGS_column_families * max_key_]);
136 values_ = &values_allocation_[0];
137 values_init_needed = true;
138 }
139 assert(values_ != nullptr);
140 if (values_init_needed) {
141 for (int i = 0; i < FLAGS_column_families; ++i) {
142 for (int j = 0; j < max_key_; ++j) {
143 Delete(i, j, false /* pending */);
144 }
145 }
146 }
147
148 if (FLAGS_test_batches_snapshots) {
149 fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
150 return;
151 }
152
153 long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_);
154 if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) {
155 num_locks++;
156 }
157 fprintf(stdout, "Creating %ld locks\n", num_locks * FLAGS_column_families);
158 key_locks_.resize(FLAGS_column_families);
159
160 for (int i = 0; i < FLAGS_column_families; ++i) {
161 key_locks_[i].resize(num_locks);
162 for (auto& ptr : key_locks_[i]) {
163 ptr.reset(new port::Mutex);
164 }
165 }
166 if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
167 ++num_bg_threads_;
168 fprintf(stdout, "Starting compaction_thread_pool_adjust_thread\n");
169 }
170 if (FLAGS_continuous_verification_interval > 0) {
171 ++num_bg_threads_;
172 fprintf(stdout, "Starting continuous_verification_thread\n");
173 }
174 }
175
176 ~SharedState() {}
177
178 port::Mutex* GetMutex() { return &mu_; }
179
180 port::CondVar* GetCondVar() { return &cv_; }
181
182 StressTest* GetStressTest() const { return stress_test_; }
183
184 int64_t GetMaxKey() const { return max_key_; }
185
186 uint32_t GetNumThreads() const { return num_threads_; }
187
188 void IncInitialized() { num_initialized_++; }
189
190 void IncOperated() { num_populated_++; }
191
192 void IncDone() { num_done_++; }
193
194 void IncVotedReopen() { vote_reopen_ = (vote_reopen_ + 1) % num_threads_; }
195
196 bool AllInitialized() const { return num_initialized_ >= num_threads_; }
197
198 bool AllOperated() const { return num_populated_ >= num_threads_; }
199
200 bool AllDone() const { return num_done_ >= num_threads_; }
201
202 bool AllVotedReopen() { return (vote_reopen_ == 0); }
203
204 void SetStart() { start_ = true; }
205
206 void SetStartVerify() { start_verify_ = true; }
207
208 bool Started() const { return start_; }
209
210 bool VerifyStarted() const { return start_verify_; }
211
212 void SetVerificationFailure() { verification_failure_.store(true); }
213
214 bool HasVerificationFailedYet() const { return verification_failure_.load(); }
215
216 void SetShouldStopTest() { should_stop_test_.store(true); }
217
218 bool ShouldStopTest() const { return should_stop_test_.load(); }
219
220 port::Mutex* GetMutexForKey(int cf, int64_t key) {
221 return key_locks_[cf][key >> log2_keys_per_lock_].get();
222 }
223
224 void LockColumnFamily(int cf) {
225 for (auto& mutex : key_locks_[cf]) {
226 mutex->Lock();
227 }
228 }
229
230 void UnlockColumnFamily(int cf) {
231 for (auto& mutex : key_locks_[cf]) {
232 mutex->Unlock();
233 }
234 }
235
236 std::atomic<uint32_t>& Value(int cf, int64_t key) const {
237 return values_[cf * max_key_ + key];
238 }
239
240 void ClearColumnFamily(int cf) {
241 std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */),
242 DELETION_SENTINEL);
243 }
244
245 // @param pending True if the update may have started but is not yet
246 // guaranteed finished. This is useful for crash-recovery testing when the
247 // process may crash before updating the expected values array.
248 void Put(int cf, int64_t key, uint32_t value_base, bool pending) {
249 if (!pending) {
250 // prevent expected-value update from reordering before Write
251 std::atomic_thread_fence(std::memory_order_release);
252 }
253 Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base,
254 std::memory_order_relaxed);
255 if (pending) {
256 // prevent Write from reordering before expected-value update
257 std::atomic_thread_fence(std::memory_order_release);
258 }
259 }
260
261 uint32_t Get(int cf, int64_t key) const { return Value(cf, key); }
262
263 // @param pending See comment above Put()
264 // Returns true if the key was not yet deleted.
265 bool Delete(int cf, int64_t key, bool pending) {
266 if (Value(cf, key) == DELETION_SENTINEL) {
267 return false;
268 }
269 Put(cf, key, DELETION_SENTINEL, pending);
270 return true;
271 }
272
273 // @param pending See comment above Put()
274 // Returns true if the key was not yet deleted.
275 bool SingleDelete(int cf, int64_t key, bool pending) {
276 return Delete(cf, key, pending);
277 }
278
279 // @param pending See comment above Put()
280 // Returns number of keys deleted by the call.
281 int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) {
282 int covered = 0;
283 for (int64_t key = begin_key; key < end_key; ++key) {
284 if (Delete(cf, key, pending)) {
285 ++covered;
286 }
287 }
288 return covered;
289 }
290
291 bool AllowsOverwrite(int64_t key) {
292 return no_overwrite_ids_.find(key) == no_overwrite_ids_.end();
293 }
294
295 bool Exists(int cf, int64_t key) {
296 // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite
297 // is disallowed can't be accidentally added a second time, in which case
298 // SingleDelete wouldn't be able to properly delete the key. It does allow
299 // the case where a SingleDelete might be added which covers nothing, but
300 // that's not a correctness issue.
301 uint32_t expected_value = Value(cf, key).load();
302 return expected_value != DELETION_SENTINEL;
303 }
304
305 uint32_t GetSeed() const { return seed_; }
306
307 void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }
308
309 bool ShouldStopBgThread() { return should_stop_bg_thread_; }
310
311 void IncBgThreadsFinished() { ++bg_thread_finished_; }
312
313 bool BgThreadsFinished() const {
314 return bg_thread_finished_ == num_bg_threads_;
315 }
316
317 bool ShouldVerifyAtBeginning() const {
318 return expected_mmap_buffer_.get() != nullptr;
319 }
320
321 bool PrintingVerificationResults() {
322 bool tmp = false;
323 return !printing_verification_results_.compare_exchange_strong(
324 tmp, true, std::memory_order_relaxed);
325 }
326
327 void FinishPrintingVerificationResults() {
328 printing_verification_results_.store(false, std::memory_order_relaxed);
329 }
330
331 private:
332 port::Mutex mu_;
333 port::CondVar cv_;
334 const uint32_t seed_;
335 const int64_t max_key_;
336 const uint32_t log2_keys_per_lock_;
337 const int num_threads_;
338 long num_initialized_;
339 long num_populated_;
340 long vote_reopen_;
341 long num_done_;
342 bool start_;
343 bool start_verify_;
344 int num_bg_threads_;
345 bool should_stop_bg_thread_;
346 int bg_thread_finished_;
347 StressTest* stress_test_;
348 std::atomic<bool> verification_failure_;
349 std::atomic<bool> should_stop_test_;
350
351 // Keys that should not be overwritten
352 std::unordered_set<size_t> no_overwrite_ids_;
353
354 std::atomic<uint32_t>* values_;
355 std::unique_ptr<std::atomic<uint32_t>[]> values_allocation_;
356 // Has to make it owned by a smart ptr as port::Mutex is not copyable
357 // and storing it in the container may require copying depending on the impl.
358 std::vector<std::vector<std::unique_ptr<port::Mutex>>> key_locks_;
359 std::unique_ptr<MemoryMappedFileBuffer> expected_mmap_buffer_;
360 std::atomic<bool> printing_verification_results_;
361 };
362
363 // Per-thread state for concurrent executions of the same benchmark.
364 struct ThreadState {
365 uint32_t tid; // 0..n-1
366 Random rand; // Has different seeds for different threads
367 SharedState* shared;
368 Stats stats;
369 struct SnapshotState {
370 const Snapshot* snapshot;
371 // The cf from which we did a Get at this snapshot
372 int cf_at;
373 // The name of the cf at the time that we did a read
374 std::string cf_at_name;
375 // The key with which we did a Get at this snapshot
376 std::string key;
377 // The status of the Get
378 Status status;
379 // The value of the Get
380 std::string value;
381 // optional state of all keys in the db
382 std::vector<bool>* key_vec;
383 };
384 std::queue<std::pair<uint64_t, SnapshotState>> snapshot_queue;
385
386 ThreadState(uint32_t index, SharedState* _shared)
387 : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {}
388 };
389 } // namespace ROCKSDB_NAMESPACE
390 #endif // GFLAGS