]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | // | |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | // | |
10 | ||
11 | #ifdef GFLAGS | |
12 | #include "db_stress_tool/db_stress_common.h" | |
20effc67 | 13 | |
f67539c2 TL |
14 | #include <cmath> |
15 | ||
20effc67 TL |
16 | #include "util/file_checksum_helper.h" |
17 | #include "util/xxhash.h" | |
18 | ||
f67539c2 | 19 | ROCKSDB_NAMESPACE::DbStressEnvWrapper* db_stress_env = nullptr; |
20effc67 TL |
20 | #ifndef NDEBUG |
21 | // If non-null, injects read error at a rate specified by the | |
22 | // read_fault_one_in flag | |
23 | std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard; | |
24 | #endif // NDEBUG | |
f67539c2 TL |
25 | enum ROCKSDB_NAMESPACE::CompressionType compression_type_e = |
26 | ROCKSDB_NAMESPACE::kSnappyCompression; | |
27 | enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e = | |
28 | ROCKSDB_NAMESPACE::kSnappyCompression; | |
29 | enum ROCKSDB_NAMESPACE::ChecksumType checksum_type_e = | |
30 | ROCKSDB_NAMESPACE::kCRC32c; | |
31 | enum RepFactory FLAGS_rep_factory = kSkipList; | |
32 | std::vector<double> sum_probs(100001); | |
33 | int64_t zipf_sum_size = 100000; | |
34 | ||
35 | namespace ROCKSDB_NAMESPACE { | |
36 | ||
37 | // Zipfian distribution is generated based on a pre-calculated array. | |
38 | // It should be used before start the stress test. | |
39 | // First, the probability distribution function (PDF) of this Zipfian follows | |
40 | // power low. P(x) = 1/(x^alpha). | |
41 | // So we calculate the PDF when x is from 0 to zipf_sum_size in first for loop | |
42 | // and add the PDF value togetger as c. So we get the total probability in c. | |
43 | // Next, we calculate inverse CDF of Zipfian and store the value of each in | |
44 | // an array (sum_probs). The rank is from 0 to zipf_sum_size. For example, for | |
45 | // integer k, its Zipfian CDF value is sum_probs[k]. | |
46 | // Third, when we need to get an integer whose probability follows Zipfian | |
47 | // distribution, we use a rand_seed [0,1] which follows uniform distribution | |
48 | // as a seed and search it in the sum_probs via binary search. When we find | |
49 | // the closest sum_probs[i] of rand_seed, i is the integer that in | |
50 | // [0, zipf_sum_size] following Zipfian distribution with parameter alpha. | |
51 | // Finally, we can scale i to [0, max_key] scale. | |
52 | // In order to avoid that hot keys are close to each other and skew towards 0, | |
53 | // we use Rando64 to shuffle it. | |
54 | void InitializeHotKeyGenerator(double alpha) { | |
55 | double c = 0; | |
56 | for (int64_t i = 1; i <= zipf_sum_size; i++) { | |
57 | c = c + (1.0 / std::pow(static_cast<double>(i), alpha)); | |
58 | } | |
59 | c = 1.0 / c; | |
60 | ||
61 | sum_probs[0] = 0; | |
62 | for (int64_t i = 1; i <= zipf_sum_size; i++) { | |
63 | sum_probs[i] = | |
64 | sum_probs[i - 1] + c / std::pow(static_cast<double>(i), alpha); | |
65 | } | |
66 | } | |
67 | ||
68 | // Generate one key that follows the Zipfian distribution. The skewness | |
69 | // is decided by the parameter alpha. Input is the rand_seed [0,1] and | |
70 | // the max of the key to be generated. If we directly return tmp_zipf_seed, | |
71 | // the closer to 0, the higher probability will be. To randomly distribute | |
72 | // the hot keys in [0, max_key], we use Random64 to shuffle it. | |
73 | int64_t GetOneHotKeyID(double rand_seed, int64_t max_key) { | |
74 | int64_t low = 1, mid, high = zipf_sum_size, zipf = 0; | |
75 | while (low <= high) { | |
76 | mid = (low + high) / 2; | |
77 | if (sum_probs[mid] >= rand_seed && sum_probs[mid - 1] < rand_seed) { | |
78 | zipf = mid; | |
79 | break; | |
80 | } else if (sum_probs[mid] >= rand_seed) { | |
81 | high = mid - 1; | |
82 | } else { | |
83 | low = mid + 1; | |
84 | } | |
85 | } | |
86 | int64_t tmp_zipf_seed = zipf * max_key / zipf_sum_size; | |
87 | Random64 rand_local(tmp_zipf_seed); | |
88 | return rand_local.Next() % max_key; | |
89 | } | |
90 | ||
91 | void PoolSizeChangeThread(void* v) { | |
92 | assert(FLAGS_compaction_thread_pool_adjust_interval > 0); | |
93 | ThreadState* thread = reinterpret_cast<ThreadState*>(v); | |
94 | SharedState* shared = thread->shared; | |
95 | ||
96 | while (true) { | |
97 | { | |
98 | MutexLock l(shared->GetMutex()); | |
99 | if (shared->ShouldStopBgThread()) { | |
100 | shared->IncBgThreadsFinished(); | |
101 | if (shared->BgThreadsFinished()) { | |
102 | shared->GetCondVar()->SignalAll(); | |
103 | } | |
104 | return; | |
105 | } | |
106 | } | |
107 | ||
108 | auto thread_pool_size_base = FLAGS_max_background_compactions; | |
109 | auto thread_pool_size_var = FLAGS_compaction_thread_pool_variations; | |
110 | int new_thread_pool_size = | |
111 | thread_pool_size_base - thread_pool_size_var + | |
112 | thread->rand.Next() % (thread_pool_size_var * 2 + 1); | |
113 | if (new_thread_pool_size < 1) { | |
114 | new_thread_pool_size = 1; | |
115 | } | |
116 | db_stress_env->SetBackgroundThreads(new_thread_pool_size, | |
117 | ROCKSDB_NAMESPACE::Env::Priority::LOW); | |
118 | // Sleep up to 3 seconds | |
119 | db_stress_env->SleepForMicroseconds( | |
120 | thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval * | |
121 | 1000 + | |
122 | 1); | |
123 | } | |
124 | } | |
125 | ||
126 | void DbVerificationThread(void* v) { | |
127 | assert(FLAGS_continuous_verification_interval > 0); | |
128 | auto* thread = reinterpret_cast<ThreadState*>(v); | |
129 | SharedState* shared = thread->shared; | |
130 | StressTest* stress_test = shared->GetStressTest(); | |
131 | assert(stress_test != nullptr); | |
132 | while (true) { | |
133 | { | |
134 | MutexLock l(shared->GetMutex()); | |
135 | if (shared->ShouldStopBgThread()) { | |
136 | shared->IncBgThreadsFinished(); | |
137 | if (shared->BgThreadsFinished()) { | |
138 | shared->GetCondVar()->SignalAll(); | |
139 | } | |
140 | return; | |
141 | } | |
142 | } | |
143 | if (!shared->HasVerificationFailedYet()) { | |
144 | stress_test->ContinuouslyVerifyDb(thread); | |
145 | } | |
146 | db_stress_env->SleepForMicroseconds( | |
147 | thread->rand.Next() % FLAGS_continuous_verification_interval * 1000 + | |
148 | 1); | |
149 | } | |
150 | } | |
151 | ||
152 | void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) { | |
153 | if (!FLAGS_verbose) { | |
154 | return; | |
155 | } | |
156 | std::string tmp; | |
157 | tmp.reserve(sz * 2 + 16); | |
158 | char buf[4]; | |
159 | for (size_t i = 0; i < sz; i++) { | |
160 | snprintf(buf, 4, "%X", value[i]); | |
161 | tmp.append(buf); | |
162 | } | |
20effc67 TL |
163 | auto key_str = Key(key); |
164 | Slice key_slice = key_str; | |
165 | fprintf(stdout, "[CF %d] %s (%" PRIi64 ") == > (%" ROCKSDB_PRIszt ") %s\n", | |
166 | cf, key_slice.ToString(true).c_str(), key, sz, tmp.c_str()); | |
f67539c2 TL |
167 | } |
168 | ||
169 | // Note that if hot_key_alpha != 0, it generates the key based on Zipfian | |
170 | // distribution. Keys are randomly scattered to [0, FLAGS_max_key]. It does | |
171 | // not ensure the order of the keys being generated and the keys does not have | |
172 | // the active range which is related to FLAGS_active_width. | |
173 | int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) { | |
174 | const double completed_ratio = | |
175 | static_cast<double>(iteration) / FLAGS_ops_per_thread; | |
176 | const int64_t base_key = static_cast<int64_t>( | |
177 | completed_ratio * (FLAGS_max_key - FLAGS_active_width)); | |
178 | int64_t rand_seed = base_key + thread->rand.Next() % FLAGS_active_width; | |
179 | int64_t cur_key = rand_seed; | |
180 | if (FLAGS_hot_key_alpha != 0) { | |
181 | // If set the Zipfian distribution Alpha to non 0, use Zipfian | |
182 | double float_rand = | |
183 | (static_cast<double>(thread->rand.Next() % FLAGS_max_key)) / | |
184 | FLAGS_max_key; | |
185 | cur_key = GetOneHotKeyID(float_rand, FLAGS_max_key); | |
186 | } | |
187 | return cur_key; | |
188 | } | |
189 | ||
190 | // Note that if hot_key_alpha != 0, it generates the key based on Zipfian | |
191 | // distribution. Keys being generated are in random order. | |
192 | // If user want to generate keys based on uniform distribution, user needs to | |
193 | // set hot_key_alpha == 0. It will generate the random keys in increasing | |
194 | // order in the key array (ensure key[i] >= key[i+1]) and constrained in a | |
195 | // range related to FLAGS_active_width. | |
196 | std::vector<int64_t> GenerateNKeys(ThreadState* thread, int num_keys, | |
197 | uint64_t iteration) { | |
198 | const double completed_ratio = | |
199 | static_cast<double>(iteration) / FLAGS_ops_per_thread; | |
200 | const int64_t base_key = static_cast<int64_t>( | |
201 | completed_ratio * (FLAGS_max_key - FLAGS_active_width)); | |
202 | std::vector<int64_t> keys; | |
203 | keys.reserve(num_keys); | |
204 | int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width; | |
205 | keys.push_back(next_key); | |
206 | for (int i = 1; i < num_keys; ++i) { | |
207 | // Generate the key follows zipfian distribution | |
208 | if (FLAGS_hot_key_alpha != 0) { | |
209 | double float_rand = | |
210 | (static_cast<double>(thread->rand.Next() % FLAGS_max_key)) / | |
211 | FLAGS_max_key; | |
212 | next_key = GetOneHotKeyID(float_rand, FLAGS_max_key); | |
213 | } else { | |
214 | // This may result in some duplicate keys | |
215 | next_key = next_key + thread->rand.Next() % | |
216 | (FLAGS_active_width - (next_key - base_key)); | |
217 | } | |
218 | keys.push_back(next_key); | |
219 | } | |
220 | return keys; | |
221 | } | |
222 | ||
223 | size_t GenerateValue(uint32_t rand, char* v, size_t max_sz) { | |
224 | size_t value_sz = | |
225 | ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult; | |
226 | assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t)); | |
227 | (void)max_sz; | |
228 | *((uint32_t*)v) = rand; | |
229 | for (size_t i = sizeof(uint32_t); i < value_sz; i++) { | |
230 | v[i] = (char)(rand ^ i); | |
231 | } | |
232 | v[value_sz] = '\0'; | |
233 | return value_sz; // the size of the value set. | |
234 | } | |
20effc67 TL |
235 | |
236 | namespace { | |
237 | ||
238 | class MyXXH64Checksum : public FileChecksumGenerator { | |
239 | public: | |
240 | explicit MyXXH64Checksum(bool big) : big_(big) { | |
241 | state_ = XXH64_createState(); | |
242 | XXH64_reset(state_, 0); | |
243 | } | |
244 | ||
245 | virtual ~MyXXH64Checksum() override { XXH64_freeState(state_); } | |
246 | ||
247 | void Update(const char* data, size_t n) override { | |
248 | XXH64_update(state_, data, n); | |
249 | } | |
250 | ||
251 | void Finalize() override { | |
252 | assert(str_.empty()); | |
253 | uint64_t digest = XXH64_digest(state_); | |
254 | // Store as little endian raw bytes | |
255 | PutFixed64(&str_, digest); | |
256 | if (big_) { | |
257 | // Throw in some more data for stress testing (448 bits total) | |
258 | PutFixed64(&str_, GetSliceHash64(str_)); | |
259 | PutFixed64(&str_, GetSliceHash64(str_)); | |
260 | PutFixed64(&str_, GetSliceHash64(str_)); | |
261 | PutFixed64(&str_, GetSliceHash64(str_)); | |
262 | PutFixed64(&str_, GetSliceHash64(str_)); | |
263 | PutFixed64(&str_, GetSliceHash64(str_)); | |
264 | } | |
265 | } | |
266 | ||
267 | std::string GetChecksum() const override { | |
268 | assert(!str_.empty()); | |
269 | return str_; | |
270 | } | |
271 | ||
272 | const char* Name() const override { | |
273 | return big_ ? "MyBigChecksum" : "MyXXH64Checksum"; | |
274 | } | |
275 | ||
276 | private: | |
277 | bool big_; | |
278 | XXH64_state_t* state_; | |
279 | std::string str_; | |
280 | }; | |
281 | ||
282 | class DbStressChecksumGenFactory : public FileChecksumGenFactory { | |
283 | std::string default_func_name_; | |
284 | ||
285 | std::unique_ptr<FileChecksumGenerator> CreateFromFuncName( | |
286 | const std::string& func_name) { | |
287 | std::unique_ptr<FileChecksumGenerator> rv; | |
288 | if (func_name == "FileChecksumCrc32c") { | |
289 | rv.reset(new FileChecksumGenCrc32c(FileChecksumGenContext())); | |
290 | } else if (func_name == "MyXXH64Checksum") { | |
291 | rv.reset(new MyXXH64Checksum(false /* big */)); | |
292 | } else if (func_name == "MyBigChecksum") { | |
293 | rv.reset(new MyXXH64Checksum(true /* big */)); | |
294 | } else { | |
295 | // Should be a recognized function when we get here | |
296 | assert(false); | |
297 | } | |
298 | return rv; | |
299 | } | |
300 | ||
301 | public: | |
302 | explicit DbStressChecksumGenFactory(const std::string& default_func_name) | |
303 | : default_func_name_(default_func_name) {} | |
304 | ||
305 | std::unique_ptr<FileChecksumGenerator> CreateFileChecksumGenerator( | |
306 | const FileChecksumGenContext& context) override { | |
307 | if (context.requested_checksum_func_name.empty()) { | |
308 | return CreateFromFuncName(default_func_name_); | |
309 | } else { | |
310 | return CreateFromFuncName(context.requested_checksum_func_name); | |
311 | } | |
312 | } | |
313 | ||
314 | const char* Name() const override { return "FileChecksumGenCrc32cFactory"; } | |
315 | }; | |
316 | ||
317 | } // namespace | |
318 | ||
319 | std::shared_ptr<FileChecksumGenFactory> GetFileChecksumImpl( | |
320 | const std::string& name) { | |
321 | // Translate from friendly names to internal names | |
322 | std::string internal_name; | |
323 | if (name == "crc32c") { | |
324 | internal_name = "FileChecksumCrc32c"; | |
325 | } else if (name == "xxh64") { | |
326 | internal_name = "MyXXH64Checksum"; | |
327 | } else if (name == "big") { | |
328 | internal_name = "MyBigChecksum"; | |
329 | } else { | |
330 | assert(name.empty() || name == "none"); | |
331 | return nullptr; | |
332 | } | |
333 | return std::make_shared<DbStressChecksumGenFactory>(internal_name); | |
334 | } | |
335 | ||
f67539c2 TL |
336 | } // namespace ROCKSDB_NAMESPACE |
337 | #endif // GFLAGS |