]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db_stress_tool/db_stress_common.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db_stress_tool / db_stress_common.cc
CommitLineData
f67539c2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9//
10
11#ifdef GFLAGS
12#include "db_stress_tool/db_stress_common.h"
20effc67 13
f67539c2
TL
14#include <cmath>
15
20effc67
TL
16#include "util/file_checksum_helper.h"
17#include "util/xxhash.h"
18
f67539c2 19ROCKSDB_NAMESPACE::DbStressEnvWrapper* db_stress_env = nullptr;
20effc67
TL
20#ifndef NDEBUG
21// If non-null, injects read error at a rate specified by the
22// read_fault_one_in flag
23std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard;
24#endif // NDEBUG
f67539c2
TL
25enum ROCKSDB_NAMESPACE::CompressionType compression_type_e =
26 ROCKSDB_NAMESPACE::kSnappyCompression;
27enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e =
28 ROCKSDB_NAMESPACE::kSnappyCompression;
29enum ROCKSDB_NAMESPACE::ChecksumType checksum_type_e =
30 ROCKSDB_NAMESPACE::kCRC32c;
31enum RepFactory FLAGS_rep_factory = kSkipList;
32std::vector<double> sum_probs(100001);
33int64_t zipf_sum_size = 100000;
34
35namespace ROCKSDB_NAMESPACE {
36
37// Zipfian distribution is generated based on a pre-calculated array.
38// It should be used before start the stress test.
39// First, the probability distribution function (PDF) of this Zipfian follows
40// power low. P(x) = 1/(x^alpha).
41// So we calculate the PDF when x is from 0 to zipf_sum_size in first for loop
42// and add the PDF value togetger as c. So we get the total probability in c.
43// Next, we calculate inverse CDF of Zipfian and store the value of each in
44// an array (sum_probs). The rank is from 0 to zipf_sum_size. For example, for
45// integer k, its Zipfian CDF value is sum_probs[k].
46// Third, when we need to get an integer whose probability follows Zipfian
47// distribution, we use a rand_seed [0,1] which follows uniform distribution
48// as a seed and search it in the sum_probs via binary search. When we find
49// the closest sum_probs[i] of rand_seed, i is the integer that in
50// [0, zipf_sum_size] following Zipfian distribution with parameter alpha.
51// Finally, we can scale i to [0, max_key] scale.
52// In order to avoid that hot keys are close to each other and skew towards 0,
53// we use Rando64 to shuffle it.
54void InitializeHotKeyGenerator(double alpha) {
55 double c = 0;
56 for (int64_t i = 1; i <= zipf_sum_size; i++) {
57 c = c + (1.0 / std::pow(static_cast<double>(i), alpha));
58 }
59 c = 1.0 / c;
60
61 sum_probs[0] = 0;
62 for (int64_t i = 1; i <= zipf_sum_size; i++) {
63 sum_probs[i] =
64 sum_probs[i - 1] + c / std::pow(static_cast<double>(i), alpha);
65 }
66}
67
68// Generate one key that follows the Zipfian distribution. The skewness
69// is decided by the parameter alpha. Input is the rand_seed [0,1] and
70// the max of the key to be generated. If we directly return tmp_zipf_seed,
71// the closer to 0, the higher probability will be. To randomly distribute
72// the hot keys in [0, max_key], we use Random64 to shuffle it.
73int64_t GetOneHotKeyID(double rand_seed, int64_t max_key) {
74 int64_t low = 1, mid, high = zipf_sum_size, zipf = 0;
75 while (low <= high) {
76 mid = (low + high) / 2;
77 if (sum_probs[mid] >= rand_seed && sum_probs[mid - 1] < rand_seed) {
78 zipf = mid;
79 break;
80 } else if (sum_probs[mid] >= rand_seed) {
81 high = mid - 1;
82 } else {
83 low = mid + 1;
84 }
85 }
86 int64_t tmp_zipf_seed = zipf * max_key / zipf_sum_size;
87 Random64 rand_local(tmp_zipf_seed);
88 return rand_local.Next() % max_key;
89}
90
91void PoolSizeChangeThread(void* v) {
92 assert(FLAGS_compaction_thread_pool_adjust_interval > 0);
93 ThreadState* thread = reinterpret_cast<ThreadState*>(v);
94 SharedState* shared = thread->shared;
95
96 while (true) {
97 {
98 MutexLock l(shared->GetMutex());
99 if (shared->ShouldStopBgThread()) {
100 shared->IncBgThreadsFinished();
101 if (shared->BgThreadsFinished()) {
102 shared->GetCondVar()->SignalAll();
103 }
104 return;
105 }
106 }
107
108 auto thread_pool_size_base = FLAGS_max_background_compactions;
109 auto thread_pool_size_var = FLAGS_compaction_thread_pool_variations;
110 int new_thread_pool_size =
111 thread_pool_size_base - thread_pool_size_var +
112 thread->rand.Next() % (thread_pool_size_var * 2 + 1);
113 if (new_thread_pool_size < 1) {
114 new_thread_pool_size = 1;
115 }
116 db_stress_env->SetBackgroundThreads(new_thread_pool_size,
117 ROCKSDB_NAMESPACE::Env::Priority::LOW);
118 // Sleep up to 3 seconds
119 db_stress_env->SleepForMicroseconds(
120 thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval *
121 1000 +
122 1);
123 }
124}
125
126void DbVerificationThread(void* v) {
127 assert(FLAGS_continuous_verification_interval > 0);
128 auto* thread = reinterpret_cast<ThreadState*>(v);
129 SharedState* shared = thread->shared;
130 StressTest* stress_test = shared->GetStressTest();
131 assert(stress_test != nullptr);
132 while (true) {
133 {
134 MutexLock l(shared->GetMutex());
135 if (shared->ShouldStopBgThread()) {
136 shared->IncBgThreadsFinished();
137 if (shared->BgThreadsFinished()) {
138 shared->GetCondVar()->SignalAll();
139 }
140 return;
141 }
142 }
143 if (!shared->HasVerificationFailedYet()) {
144 stress_test->ContinuouslyVerifyDb(thread);
145 }
146 db_stress_env->SleepForMicroseconds(
147 thread->rand.Next() % FLAGS_continuous_verification_interval * 1000 +
148 1);
149 }
150}
151
152void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) {
153 if (!FLAGS_verbose) {
154 return;
155 }
156 std::string tmp;
157 tmp.reserve(sz * 2 + 16);
158 char buf[4];
159 for (size_t i = 0; i < sz; i++) {
160 snprintf(buf, 4, "%X", value[i]);
161 tmp.append(buf);
162 }
20effc67
TL
163 auto key_str = Key(key);
164 Slice key_slice = key_str;
165 fprintf(stdout, "[CF %d] %s (%" PRIi64 ") == > (%" ROCKSDB_PRIszt ") %s\n",
166 cf, key_slice.ToString(true).c_str(), key, sz, tmp.c_str());
f67539c2
TL
167}
168
169// Note that if hot_key_alpha != 0, it generates the key based on Zipfian
170// distribution. Keys are randomly scattered to [0, FLAGS_max_key]. It does
171// not ensure the order of the keys being generated and the keys does not have
172// the active range which is related to FLAGS_active_width.
173int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) {
174 const double completed_ratio =
175 static_cast<double>(iteration) / FLAGS_ops_per_thread;
176 const int64_t base_key = static_cast<int64_t>(
177 completed_ratio * (FLAGS_max_key - FLAGS_active_width));
178 int64_t rand_seed = base_key + thread->rand.Next() % FLAGS_active_width;
179 int64_t cur_key = rand_seed;
180 if (FLAGS_hot_key_alpha != 0) {
181 // If set the Zipfian distribution Alpha to non 0, use Zipfian
182 double float_rand =
183 (static_cast<double>(thread->rand.Next() % FLAGS_max_key)) /
184 FLAGS_max_key;
185 cur_key = GetOneHotKeyID(float_rand, FLAGS_max_key);
186 }
187 return cur_key;
188}
189
190// Note that if hot_key_alpha != 0, it generates the key based on Zipfian
191// distribution. Keys being generated are in random order.
192// If user want to generate keys based on uniform distribution, user needs to
193// set hot_key_alpha == 0. It will generate the random keys in increasing
194// order in the key array (ensure key[i] >= key[i+1]) and constrained in a
195// range related to FLAGS_active_width.
196std::vector<int64_t> GenerateNKeys(ThreadState* thread, int num_keys,
197 uint64_t iteration) {
198 const double completed_ratio =
199 static_cast<double>(iteration) / FLAGS_ops_per_thread;
200 const int64_t base_key = static_cast<int64_t>(
201 completed_ratio * (FLAGS_max_key - FLAGS_active_width));
202 std::vector<int64_t> keys;
203 keys.reserve(num_keys);
204 int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width;
205 keys.push_back(next_key);
206 for (int i = 1; i < num_keys; ++i) {
207 // Generate the key follows zipfian distribution
208 if (FLAGS_hot_key_alpha != 0) {
209 double float_rand =
210 (static_cast<double>(thread->rand.Next() % FLAGS_max_key)) /
211 FLAGS_max_key;
212 next_key = GetOneHotKeyID(float_rand, FLAGS_max_key);
213 } else {
214 // This may result in some duplicate keys
215 next_key = next_key + thread->rand.Next() %
216 (FLAGS_active_width - (next_key - base_key));
217 }
218 keys.push_back(next_key);
219 }
220 return keys;
221}
222
223size_t GenerateValue(uint32_t rand, char* v, size_t max_sz) {
224 size_t value_sz =
225 ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult;
226 assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t));
227 (void)max_sz;
228 *((uint32_t*)v) = rand;
229 for (size_t i = sizeof(uint32_t); i < value_sz; i++) {
230 v[i] = (char)(rand ^ i);
231 }
232 v[value_sz] = '\0';
233 return value_sz; // the size of the value set.
234}
20effc67
TL
235
236namespace {
237
238class MyXXH64Checksum : public FileChecksumGenerator {
239 public:
240 explicit MyXXH64Checksum(bool big) : big_(big) {
241 state_ = XXH64_createState();
242 XXH64_reset(state_, 0);
243 }
244
245 virtual ~MyXXH64Checksum() override { XXH64_freeState(state_); }
246
247 void Update(const char* data, size_t n) override {
248 XXH64_update(state_, data, n);
249 }
250
251 void Finalize() override {
252 assert(str_.empty());
253 uint64_t digest = XXH64_digest(state_);
254 // Store as little endian raw bytes
255 PutFixed64(&str_, digest);
256 if (big_) {
257 // Throw in some more data for stress testing (448 bits total)
258 PutFixed64(&str_, GetSliceHash64(str_));
259 PutFixed64(&str_, GetSliceHash64(str_));
260 PutFixed64(&str_, GetSliceHash64(str_));
261 PutFixed64(&str_, GetSliceHash64(str_));
262 PutFixed64(&str_, GetSliceHash64(str_));
263 PutFixed64(&str_, GetSliceHash64(str_));
264 }
265 }
266
267 std::string GetChecksum() const override {
268 assert(!str_.empty());
269 return str_;
270 }
271
272 const char* Name() const override {
273 return big_ ? "MyBigChecksum" : "MyXXH64Checksum";
274 }
275
276 private:
277 bool big_;
278 XXH64_state_t* state_;
279 std::string str_;
280};
281
282class DbStressChecksumGenFactory : public FileChecksumGenFactory {
283 std::string default_func_name_;
284
285 std::unique_ptr<FileChecksumGenerator> CreateFromFuncName(
286 const std::string& func_name) {
287 std::unique_ptr<FileChecksumGenerator> rv;
288 if (func_name == "FileChecksumCrc32c") {
289 rv.reset(new FileChecksumGenCrc32c(FileChecksumGenContext()));
290 } else if (func_name == "MyXXH64Checksum") {
291 rv.reset(new MyXXH64Checksum(false /* big */));
292 } else if (func_name == "MyBigChecksum") {
293 rv.reset(new MyXXH64Checksum(true /* big */));
294 } else {
295 // Should be a recognized function when we get here
296 assert(false);
297 }
298 return rv;
299 }
300
301 public:
302 explicit DbStressChecksumGenFactory(const std::string& default_func_name)
303 : default_func_name_(default_func_name) {}
304
305 std::unique_ptr<FileChecksumGenerator> CreateFileChecksumGenerator(
306 const FileChecksumGenContext& context) override {
307 if (context.requested_checksum_func_name.empty()) {
308 return CreateFromFuncName(default_func_name_);
309 } else {
310 return CreateFromFuncName(context.requested_checksum_func_name);
311 }
312 }
313
314 const char* Name() const override { return "FileChecksumGenCrc32cFactory"; }
315};
316
317} // namespace
318
319std::shared_ptr<FileChecksumGenFactory> GetFileChecksumImpl(
320 const std::string& name) {
321 // Translate from friendly names to internal names
322 std::string internal_name;
323 if (name == "crc32c") {
324 internal_name = "FileChecksumCrc32c";
325 } else if (name == "xxh64") {
326 internal_name = "MyXXH64Checksum";
327 } else if (name == "big") {
328 internal_name = "MyBigChecksum";
329 } else {
330 assert(name.empty() || name == "none");
331 return nullptr;
332 }
333 return std::make_shared<DbStressChecksumGenFactory>(internal_name);
334}
335
f67539c2
TL
336} // namespace ROCKSDB_NAMESPACE
337#endif // GFLAGS