1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
12 #include "db_stress_tool/db_stress_common.h"
15 ROCKSDB_NAMESPACE::DbStressEnvWrapper
* db_stress_env
= nullptr;
16 enum ROCKSDB_NAMESPACE::CompressionType compression_type_e
=
17 ROCKSDB_NAMESPACE::kSnappyCompression
;
18 enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e
=
19 ROCKSDB_NAMESPACE::kSnappyCompression
;
20 enum ROCKSDB_NAMESPACE::ChecksumType checksum_type_e
=
21 ROCKSDB_NAMESPACE::kCRC32c
;
22 enum RepFactory FLAGS_rep_factory
= kSkipList
;
23 std::vector
<double> sum_probs(100001);
24 int64_t zipf_sum_size
= 100000;
26 namespace ROCKSDB_NAMESPACE
{
28 // Zipfian distribution is generated based on a pre-calculated array.
29 // It should be used before start the stress test.
30 // First, the probability distribution function (PDF) of this Zipfian follows
31 // power low. P(x) = 1/(x^alpha).
32 // So we calculate the PDF when x is from 0 to zipf_sum_size in first for loop
33 // and add the PDF value togetger as c. So we get the total probability in c.
34 // Next, we calculate inverse CDF of Zipfian and store the value of each in
35 // an array (sum_probs). The rank is from 0 to zipf_sum_size. For example, for
36 // integer k, its Zipfian CDF value is sum_probs[k].
37 // Third, when we need to get an integer whose probability follows Zipfian
38 // distribution, we use a rand_seed [0,1] which follows uniform distribution
39 // as a seed and search it in the sum_probs via binary search. When we find
40 // the closest sum_probs[i] of rand_seed, i is the integer that in
41 // [0, zipf_sum_size] following Zipfian distribution with parameter alpha.
42 // Finally, we can scale i to [0, max_key] scale.
43 // In order to avoid that hot keys are close to each other and skew towards 0,
44 // we use Rando64 to shuffle it.
45 void InitializeHotKeyGenerator(double alpha
) {
47 for (int64_t i
= 1; i
<= zipf_sum_size
; i
++) {
48 c
= c
+ (1.0 / std::pow(static_cast<double>(i
), alpha
));
53 for (int64_t i
= 1; i
<= zipf_sum_size
; i
++) {
55 sum_probs
[i
- 1] + c
/ std::pow(static_cast<double>(i
), alpha
);
59 // Generate one key that follows the Zipfian distribution. The skewness
60 // is decided by the parameter alpha. Input is the rand_seed [0,1] and
61 // the max of the key to be generated. If we directly return tmp_zipf_seed,
62 // the closer to 0, the higher probability will be. To randomly distribute
63 // the hot keys in [0, max_key], we use Random64 to shuffle it.
64 int64_t GetOneHotKeyID(double rand_seed
, int64_t max_key
) {
65 int64_t low
= 1, mid
, high
= zipf_sum_size
, zipf
= 0;
67 mid
= (low
+ high
) / 2;
68 if (sum_probs
[mid
] >= rand_seed
&& sum_probs
[mid
- 1] < rand_seed
) {
71 } else if (sum_probs
[mid
] >= rand_seed
) {
77 int64_t tmp_zipf_seed
= zipf
* max_key
/ zipf_sum_size
;
78 Random64
rand_local(tmp_zipf_seed
);
79 return rand_local
.Next() % max_key
;
82 void PoolSizeChangeThread(void* v
) {
83 assert(FLAGS_compaction_thread_pool_adjust_interval
> 0);
84 ThreadState
* thread
= reinterpret_cast<ThreadState
*>(v
);
85 SharedState
* shared
= thread
->shared
;
89 MutexLock
l(shared
->GetMutex());
90 if (shared
->ShouldStopBgThread()) {
91 shared
->IncBgThreadsFinished();
92 if (shared
->BgThreadsFinished()) {
93 shared
->GetCondVar()->SignalAll();
99 auto thread_pool_size_base
= FLAGS_max_background_compactions
;
100 auto thread_pool_size_var
= FLAGS_compaction_thread_pool_variations
;
101 int new_thread_pool_size
=
102 thread_pool_size_base
- thread_pool_size_var
+
103 thread
->rand
.Next() % (thread_pool_size_var
* 2 + 1);
104 if (new_thread_pool_size
< 1) {
105 new_thread_pool_size
= 1;
107 db_stress_env
->SetBackgroundThreads(new_thread_pool_size
,
108 ROCKSDB_NAMESPACE::Env::Priority::LOW
);
109 // Sleep up to 3 seconds
110 db_stress_env
->SleepForMicroseconds(
111 thread
->rand
.Next() % FLAGS_compaction_thread_pool_adjust_interval
*
117 void DbVerificationThread(void* v
) {
118 assert(FLAGS_continuous_verification_interval
> 0);
119 auto* thread
= reinterpret_cast<ThreadState
*>(v
);
120 SharedState
* shared
= thread
->shared
;
121 StressTest
* stress_test
= shared
->GetStressTest();
122 assert(stress_test
!= nullptr);
125 MutexLock
l(shared
->GetMutex());
126 if (shared
->ShouldStopBgThread()) {
127 shared
->IncBgThreadsFinished();
128 if (shared
->BgThreadsFinished()) {
129 shared
->GetCondVar()->SignalAll();
134 if (!shared
->HasVerificationFailedYet()) {
135 stress_test
->ContinuouslyVerifyDb(thread
);
137 db_stress_env
->SleepForMicroseconds(
138 thread
->rand
.Next() % FLAGS_continuous_verification_interval
* 1000 +
143 void PrintKeyValue(int cf
, uint64_t key
, const char* value
, size_t sz
) {
144 if (!FLAGS_verbose
) {
148 tmp
.reserve(sz
* 2 + 16);
150 for (size_t i
= 0; i
< sz
; i
++) {
151 snprintf(buf
, 4, "%X", value
[i
]);
154 fprintf(stdout
, "[CF %d] %" PRIi64
" == > (%" ROCKSDB_PRIszt
") %s\n", cf
,
155 key
, sz
, tmp
.c_str());
158 // Note that if hot_key_alpha != 0, it generates the key based on Zipfian
159 // distribution. Keys are randomly scattered to [0, FLAGS_max_key]. It does
160 // not ensure the order of the keys being generated and the keys does not have
161 // the active range which is related to FLAGS_active_width.
162 int64_t GenerateOneKey(ThreadState
* thread
, uint64_t iteration
) {
163 const double completed_ratio
=
164 static_cast<double>(iteration
) / FLAGS_ops_per_thread
;
165 const int64_t base_key
= static_cast<int64_t>(
166 completed_ratio
* (FLAGS_max_key
- FLAGS_active_width
));
167 int64_t rand_seed
= base_key
+ thread
->rand
.Next() % FLAGS_active_width
;
168 int64_t cur_key
= rand_seed
;
169 if (FLAGS_hot_key_alpha
!= 0) {
170 // If set the Zipfian distribution Alpha to non 0, use Zipfian
172 (static_cast<double>(thread
->rand
.Next() % FLAGS_max_key
)) /
174 cur_key
= GetOneHotKeyID(float_rand
, FLAGS_max_key
);
179 // Note that if hot_key_alpha != 0, it generates the key based on Zipfian
180 // distribution. Keys being generated are in random order.
181 // If user want to generate keys based on uniform distribution, user needs to
182 // set hot_key_alpha == 0. It will generate the random keys in increasing
183 // order in the key array (ensure key[i] >= key[i+1]) and constrained in a
184 // range related to FLAGS_active_width.
185 std::vector
<int64_t> GenerateNKeys(ThreadState
* thread
, int num_keys
,
186 uint64_t iteration
) {
187 const double completed_ratio
=
188 static_cast<double>(iteration
) / FLAGS_ops_per_thread
;
189 const int64_t base_key
= static_cast<int64_t>(
190 completed_ratio
* (FLAGS_max_key
- FLAGS_active_width
));
191 std::vector
<int64_t> keys
;
192 keys
.reserve(num_keys
);
193 int64_t next_key
= base_key
+ thread
->rand
.Next() % FLAGS_active_width
;
194 keys
.push_back(next_key
);
195 for (int i
= 1; i
< num_keys
; ++i
) {
196 // Generate the key follows zipfian distribution
197 if (FLAGS_hot_key_alpha
!= 0) {
199 (static_cast<double>(thread
->rand
.Next() % FLAGS_max_key
)) /
201 next_key
= GetOneHotKeyID(float_rand
, FLAGS_max_key
);
203 // This may result in some duplicate keys
204 next_key
= next_key
+ thread
->rand
.Next() %
205 (FLAGS_active_width
- (next_key
- base_key
));
207 keys
.push_back(next_key
);
212 size_t GenerateValue(uint32_t rand
, char* v
, size_t max_sz
) {
214 ((rand
% kRandomValueMaxFactor
) + 1) * FLAGS_value_size_mult
;
215 assert(value_sz
<= max_sz
&& value_sz
>= sizeof(uint32_t));
217 *((uint32_t*)v
) = rand
;
218 for (size_t i
= sizeof(uint32_t); i
< value_sz
; i
++) {
219 v
[i
] = (char)(rand
^ i
);
222 return value_sz
; // the size of the value set.
224 } // namespace ROCKSDB_NAMESPACE