]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db_stress_tool/db_stress_shared_state.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db_stress_tool / db_stress_shared_state.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors
9
10 #ifdef GFLAGS
11 #pragma once
12
13 #include "db_stress_tool/db_stress_stat.h"
14 // SyncPoint is not supported in Released Windows Mode.
15 #if !(defined NDEBUG) || !defined(OS_WIN)
16 #include "test_util/sync_point.h"
17 #endif // !(defined NDEBUG) || !defined(OS_WIN)
18 #include "util/gflags_compat.h"
19
20 DECLARE_uint64(seed);
21 DECLARE_int64(max_key);
22 DECLARE_uint64(log2_keys_per_lock);
23 DECLARE_int32(threads);
24 DECLARE_int32(column_families);
25 DECLARE_int32(nooverwritepercent);
26 DECLARE_string(expected_values_path);
27 DECLARE_int32(clear_column_family_one_in);
28 DECLARE_bool(test_batches_snapshots);
29 DECLARE_int32(compaction_thread_pool_adjust_interval);
30 DECLARE_int32(continuous_verification_interval);
31 DECLARE_int32(read_fault_one_in);
32
33 namespace ROCKSDB_NAMESPACE {
34 class StressTest;
35
36 // State shared by all concurrent executions of the same benchmark.
37 class SharedState {
38 public:
39 // indicates a key may have any value (or not be present) as an operation on
40 // it is incomplete.
41 static const uint32_t UNKNOWN_SENTINEL;
42 // indicates a key should definitely be deleted
43 static const uint32_t DELETION_SENTINEL;
44
45 // Errors when reading filter blocks are ignored, so we use a thread
46 // local variable updated via sync points to keep track of errors injected
47 // while reading filter blocks in order to ignore the Get/MultiGet result
48 // for those calls
49 #if defined(ROCKSDB_SUPPORT_THREAD_LOCAL)
50 #if defined(OS_SOLARIS)
51 static __thread bool ignore_read_error;
52 #else
53 static thread_local bool ignore_read_error;
54 #endif // OS_SOLARIS
55 #else
56 static bool ignore_read_error;
57 #endif // ROCKSDB_SUPPORT_THREAD_LOCAL
58
59 SharedState(Env* env, StressTest* stress_test)
60 : cv_(&mu_),
61 seed_(static_cast<uint32_t>(FLAGS_seed)),
62 max_key_(FLAGS_max_key),
63 log2_keys_per_lock_(static_cast<uint32_t>(FLAGS_log2_keys_per_lock)),
64 num_threads_(FLAGS_threads),
65 num_initialized_(0),
66 num_populated_(0),
67 vote_reopen_(0),
68 num_done_(0),
69 start_(false),
70 start_verify_(false),
71 num_bg_threads_(0),
72 should_stop_bg_thread_(false),
73 bg_thread_finished_(0),
74 stress_test_(stress_test),
75 verification_failure_(false),
76 should_stop_test_(false),
77 no_overwrite_ids_(FLAGS_column_families),
78 values_(nullptr),
79 printing_verification_results_(false) {
80 // Pick random keys in each column family that will not experience
81 // overwrite
82
83 fprintf(stdout, "Choosing random keys with no overwrite\n");
84 Random64 rnd(seed_);
85 // Start with the identity permutation. Subsequent iterations of
86 // for loop below will start with perm of previous for loop
87 int64_t* permutation = new int64_t[max_key_];
88 for (int64_t i = 0; i < max_key_; i++) {
89 permutation[i] = i;
90 }
91 // Now do the Knuth shuffle
92 int64_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100;
93 // Only need to figure out first num_no_overwrite_keys of permutation
94 no_overwrite_ids_.reserve(num_no_overwrite_keys);
95 for (int64_t i = 0; i < num_no_overwrite_keys; i++) {
96 int64_t rand_index = i + rnd.Next() % (max_key_ - i);
97 // Swap i and rand_index;
98 int64_t temp = permutation[i];
99 permutation[i] = permutation[rand_index];
100 permutation[rand_index] = temp;
101 // Fill no_overwrite_ids_ with the first num_no_overwrite_keys of
102 // permutation
103 no_overwrite_ids_.insert(permutation[i]);
104 }
105 delete[] permutation;
106
107 size_t expected_values_size =
108 sizeof(std::atomic<uint32_t>) * FLAGS_column_families * max_key_;
109 bool values_init_needed = false;
110 Status status;
111 if (!FLAGS_expected_values_path.empty()) {
112 if (!std::atomic<uint32_t>{}.is_lock_free()) {
113 status = Status::InvalidArgument(
114 "Cannot use --expected_values_path on platforms without lock-free "
115 "std::atomic<uint32_t>");
116 }
117 if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
118 status = Status::InvalidArgument(
119 "Cannot use --expected_values_path on when "
120 "--clear_column_family_one_in is greater than zero.");
121 }
122 uint64_t size = 0;
123 if (status.ok()) {
124 status = env->GetFileSize(FLAGS_expected_values_path, &size);
125 }
126 std::unique_ptr<WritableFile> wfile;
127 if (status.ok() && size == 0) {
128 const EnvOptions soptions;
129 status =
130 env->NewWritableFile(FLAGS_expected_values_path, &wfile, soptions);
131 }
132 if (status.ok() && size == 0) {
133 std::string buf(expected_values_size, '\0');
134 status = wfile->Append(buf);
135 values_init_needed = true;
136 }
137 if (status.ok()) {
138 status = env->NewMemoryMappedFileBuffer(FLAGS_expected_values_path,
139 &expected_mmap_buffer_);
140 }
141 if (status.ok()) {
142 assert(expected_mmap_buffer_->GetLen() == expected_values_size);
143 values_ = static_cast<std::atomic<uint32_t>*>(
144 expected_mmap_buffer_->GetBase());
145 assert(values_ != nullptr);
146 } else {
147 fprintf(stderr, "Failed opening shared file '%s' with error: %s\n",
148 FLAGS_expected_values_path.c_str(), status.ToString().c_str());
149 assert(values_ == nullptr);
150 }
151 }
152 if (values_ == nullptr) {
153 values_allocation_.reset(
154 new std::atomic<uint32_t>[FLAGS_column_families * max_key_]);
155 values_ = &values_allocation_[0];
156 values_init_needed = true;
157 }
158 assert(values_ != nullptr);
159 if (values_init_needed) {
160 for (int i = 0; i < FLAGS_column_families; ++i) {
161 for (int j = 0; j < max_key_; ++j) {
162 Delete(i, j, false /* pending */);
163 }
164 }
165 }
166
167 if (FLAGS_test_batches_snapshots) {
168 fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
169 return;
170 }
171
172 long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_);
173 if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) {
174 num_locks++;
175 }
176 fprintf(stdout, "Creating %ld locks\n", num_locks * FLAGS_column_families);
177 key_locks_.resize(FLAGS_column_families);
178
179 for (int i = 0; i < FLAGS_column_families; ++i) {
180 key_locks_[i].resize(num_locks);
181 for (auto& ptr : key_locks_[i]) {
182 ptr.reset(new port::Mutex);
183 }
184 }
185 if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
186 ++num_bg_threads_;
187 fprintf(stdout, "Starting compaction_thread_pool_adjust_thread\n");
188 }
189 if (FLAGS_continuous_verification_interval > 0) {
190 ++num_bg_threads_;
191 fprintf(stdout, "Starting continuous_verification_thread\n");
192 }
193 #ifndef NDEBUG
194 if (FLAGS_read_fault_one_in) {
195 SyncPoint::GetInstance()->SetCallBack("FaultInjectionIgnoreError",
196 IgnoreReadErrorCallback);
197 SyncPoint::GetInstance()->EnableProcessing();
198 }
199 #endif // NDEBUG
200 }
201
202 ~SharedState() {
203 #ifndef NDEBUG
204 if (FLAGS_read_fault_one_in) {
205 SyncPoint::GetInstance()->ClearAllCallBacks();
206 SyncPoint::GetInstance()->DisableProcessing();
207 }
208 #endif
209 }
210
211 port::Mutex* GetMutex() { return &mu_; }
212
213 port::CondVar* GetCondVar() { return &cv_; }
214
215 StressTest* GetStressTest() const { return stress_test_; }
216
217 int64_t GetMaxKey() const { return max_key_; }
218
219 uint32_t GetNumThreads() const { return num_threads_; }
220
221 void IncInitialized() { num_initialized_++; }
222
223 void IncOperated() { num_populated_++; }
224
225 void IncDone() { num_done_++; }
226
227 void IncVotedReopen() { vote_reopen_ = (vote_reopen_ + 1) % num_threads_; }
228
229 bool AllInitialized() const { return num_initialized_ >= num_threads_; }
230
231 bool AllOperated() const { return num_populated_ >= num_threads_; }
232
233 bool AllDone() const { return num_done_ >= num_threads_; }
234
235 bool AllVotedReopen() { return (vote_reopen_ == 0); }
236
237 void SetStart() { start_ = true; }
238
239 void SetStartVerify() { start_verify_ = true; }
240
241 bool Started() const { return start_; }
242
243 bool VerifyStarted() const { return start_verify_; }
244
245 void SetVerificationFailure() { verification_failure_.store(true); }
246
247 bool HasVerificationFailedYet() const { return verification_failure_.load(); }
248
249 void SetShouldStopTest() { should_stop_test_.store(true); }
250
251 bool ShouldStopTest() const { return should_stop_test_.load(); }
252
253 port::Mutex* GetMutexForKey(int cf, int64_t key) {
254 return key_locks_[cf][key >> log2_keys_per_lock_].get();
255 }
256
257 void LockColumnFamily(int cf) {
258 for (auto& mutex : key_locks_[cf]) {
259 mutex->Lock();
260 }
261 }
262
263 void UnlockColumnFamily(int cf) {
264 for (auto& mutex : key_locks_[cf]) {
265 mutex->Unlock();
266 }
267 }
268
269 std::atomic<uint32_t>& Value(int cf, int64_t key) const {
270 return values_[cf * max_key_ + key];
271 }
272
273 void ClearColumnFamily(int cf) {
274 std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */),
275 DELETION_SENTINEL);
276 }
277
278 // @param pending True if the update may have started but is not yet
279 // guaranteed finished. This is useful for crash-recovery testing when the
280 // process may crash before updating the expected values array.
281 void Put(int cf, int64_t key, uint32_t value_base, bool pending) {
282 if (!pending) {
283 // prevent expected-value update from reordering before Write
284 std::atomic_thread_fence(std::memory_order_release);
285 }
286 Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base,
287 std::memory_order_relaxed);
288 if (pending) {
289 // prevent Write from reordering before expected-value update
290 std::atomic_thread_fence(std::memory_order_release);
291 }
292 }
293
294 uint32_t Get(int cf, int64_t key) const { return Value(cf, key); }
295
296 // @param pending See comment above Put()
297 // Returns true if the key was not yet deleted.
298 bool Delete(int cf, int64_t key, bool pending) {
299 if (Value(cf, key) == DELETION_SENTINEL) {
300 return false;
301 }
302 Put(cf, key, DELETION_SENTINEL, pending);
303 return true;
304 }
305
306 // @param pending See comment above Put()
307 // Returns true if the key was not yet deleted.
308 bool SingleDelete(int cf, int64_t key, bool pending) {
309 return Delete(cf, key, pending);
310 }
311
312 // @param pending See comment above Put()
313 // Returns number of keys deleted by the call.
314 int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) {
315 int covered = 0;
316 for (int64_t key = begin_key; key < end_key; ++key) {
317 if (Delete(cf, key, pending)) {
318 ++covered;
319 }
320 }
321 return covered;
322 }
323
324 bool AllowsOverwrite(int64_t key) {
325 return no_overwrite_ids_.find(key) == no_overwrite_ids_.end();
326 }
327
328 bool Exists(int cf, int64_t key) {
329 // UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite
330 // is disallowed can't be accidentally added a second time, in which case
331 // SingleDelete wouldn't be able to properly delete the key. It does allow
332 // the case where a SingleDelete might be added which covers nothing, but
333 // that's not a correctness issue.
334 uint32_t expected_value = Value(cf, key).load();
335 return expected_value != DELETION_SENTINEL;
336 }
337
338 uint32_t GetSeed() const { return seed_; }
339
340 void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }
341
342 bool ShouldStopBgThread() { return should_stop_bg_thread_; }
343
344 void IncBgThreadsFinished() { ++bg_thread_finished_; }
345
346 bool BgThreadsFinished() const {
347 return bg_thread_finished_ == num_bg_threads_;
348 }
349
350 bool ShouldVerifyAtBeginning() const {
351 return expected_mmap_buffer_.get() != nullptr;
352 }
353
354 bool PrintingVerificationResults() {
355 bool tmp = false;
356 return !printing_verification_results_.compare_exchange_strong(
357 tmp, true, std::memory_order_relaxed);
358 }
359
360 void FinishPrintingVerificationResults() {
361 printing_verification_results_.store(false, std::memory_order_relaxed);
362 }
363
364 private:
365 static void IgnoreReadErrorCallback(void*) {
366 ignore_read_error = true;
367 }
368
369 port::Mutex mu_;
370 port::CondVar cv_;
371 const uint32_t seed_;
372 const int64_t max_key_;
373 const uint32_t log2_keys_per_lock_;
374 const int num_threads_;
375 long num_initialized_;
376 long num_populated_;
377 long vote_reopen_;
378 long num_done_;
379 bool start_;
380 bool start_verify_;
381 int num_bg_threads_;
382 bool should_stop_bg_thread_;
383 int bg_thread_finished_;
384 StressTest* stress_test_;
385 std::atomic<bool> verification_failure_;
386 std::atomic<bool> should_stop_test_;
387
388 // Keys that should not be overwritten
389 std::unordered_set<size_t> no_overwrite_ids_;
390
391 std::atomic<uint32_t>* values_;
392 std::unique_ptr<std::atomic<uint32_t>[]> values_allocation_;
393 // Has to make it owned by a smart ptr as port::Mutex is not copyable
394 // and storing it in the container may require copying depending on the impl.
395 std::vector<std::vector<std::unique_ptr<port::Mutex>>> key_locks_;
396 std::unique_ptr<MemoryMappedFileBuffer> expected_mmap_buffer_;
397 std::atomic<bool> printing_verification_results_;
398 };
399
400 // Per-thread state for concurrent executions of the same benchmark.
401 struct ThreadState {
402 uint32_t tid; // 0..n-1
403 Random rand; // Has different seeds for different threads
404 SharedState* shared;
405 Stats stats;
406 struct SnapshotState {
407 const Snapshot* snapshot;
408 // The cf from which we did a Get at this snapshot
409 int cf_at;
410 // The name of the cf at the time that we did a read
411 std::string cf_at_name;
412 // The key with which we did a Get at this snapshot
413 std::string key;
414 // The status of the Get
415 Status status;
416 // The value of the Get
417 std::string value;
418 // optional state of all keys in the db
419 std::vector<bool>* key_vec;
420 };
421 std::queue<std::pair<uint64_t, SnapshotState>> snapshot_queue;
422
423 ThreadState(uint32_t index, SharedState* _shared)
424 : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {}
425 };
426 } // namespace ROCKSDB_NAMESPACE
427 #endif // GFLAGS