1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
12 #include "db_stress_tool/db_stress_common.h"
13 #include "utilities/fault_injection_fs.h"
15 namespace ROCKSDB_NAMESPACE
{
16 void ThreadBody(void* v
) {
17 ThreadState
* thread
= reinterpret_cast<ThreadState
*>(v
);
18 SharedState
* shared
= thread
->shared
;
20 if (!FLAGS_skip_verifydb
&& shared
->ShouldVerifyAtBeginning()) {
21 thread
->shared
->GetStressTest()->VerifyDb(thread
);
24 MutexLock
l(shared
->GetMutex());
25 shared
->IncInitialized();
26 if (shared
->AllInitialized()) {
27 shared
->GetCondVar()->SignalAll();
29 while (!shared
->Started()) {
30 shared
->GetCondVar()->Wait();
33 thread
->shared
->GetStressTest()->OperateDb(thread
);
36 MutexLock
l(shared
->GetMutex());
37 shared
->IncOperated();
38 if (shared
->AllOperated()) {
39 shared
->GetCondVar()->SignalAll();
41 while (!shared
->VerifyStarted()) {
42 shared
->GetCondVar()->Wait();
46 if (!FLAGS_skip_verifydb
) {
47 thread
->shared
->GetStressTest()->VerifyDb(thread
);
51 MutexLock
l(shared
->GetMutex());
53 if (shared
->AllDone()) {
54 shared
->GetCondVar()->SignalAll();
59 bool RunStressTest(StressTest
* stress
) {
61 SharedState
shared(db_stress_env
, stress
);
62 stress
->FinishInitDb(&shared
);
65 if (FLAGS_sync_fault_injection
) {
66 fault_fs_guard
->SetFilesystemDirectWritable(false);
70 uint32_t n
= shared
.GetNumThreads();
72 uint64_t now
= db_stress_env
->NowMicros();
73 fprintf(stdout
, "%s Initializing worker threads\n",
74 db_stress_env
->TimeToString(now
/ 1000000).c_str());
75 std::vector
<ThreadState
*> threads(n
);
76 for (uint32_t i
= 0; i
< n
; i
++) {
77 threads
[i
] = new ThreadState(i
, &shared
);
78 db_stress_env
->StartThread(ThreadBody
, threads
[i
]);
80 ThreadState
bg_thread(0, &shared
);
81 if (FLAGS_compaction_thread_pool_adjust_interval
> 0) {
82 db_stress_env
->StartThread(PoolSizeChangeThread
, &bg_thread
);
84 ThreadState
continuous_verification_thread(0, &shared
);
85 if (FLAGS_continuous_verification_interval
> 0) {
86 db_stress_env
->StartThread(DbVerificationThread
,
87 &continuous_verification_thread
);
90 // Each thread goes through the following states:
91 // initializing -> wait for others to init -> read/populate/depopulate
92 // wait for others to operate -> verify -> done
95 MutexLock
l(shared
.GetMutex());
96 while (!shared
.AllInitialized()) {
97 shared
.GetCondVar()->Wait();
99 if (shared
.ShouldVerifyAtBeginning()) {
100 if (shared
.HasVerificationFailedYet()) {
101 fprintf(stderr
, "Crash-recovery verification failed :(\n");
103 fprintf(stdout
, "Crash-recovery verification passed :)\n");
107 now
= db_stress_env
->NowMicros();
108 fprintf(stdout
, "%s Starting database operations\n",
109 db_stress_env
->TimeToString(now
/ 1000000).c_str());
112 shared
.GetCondVar()->SignalAll();
113 while (!shared
.AllOperated()) {
114 shared
.GetCondVar()->Wait();
117 now
= db_stress_env
->NowMicros();
118 if (FLAGS_test_batches_snapshots
) {
119 fprintf(stdout
, "%s Limited verification already done during gets\n",
120 db_stress_env
->TimeToString((uint64_t)now
/ 1000000).c_str());
121 } else if (FLAGS_skip_verifydb
) {
122 fprintf(stdout
, "%s Verification skipped\n",
123 db_stress_env
->TimeToString((uint64_t)now
/ 1000000).c_str());
125 fprintf(stdout
, "%s Starting verification\n",
126 db_stress_env
->TimeToString((uint64_t)now
/ 1000000).c_str());
129 shared
.SetStartVerify();
130 shared
.GetCondVar()->SignalAll();
131 while (!shared
.AllDone()) {
132 shared
.GetCondVar()->Wait();
136 for (unsigned int i
= 1; i
< n
; i
++) {
137 threads
[0]->stats
.Merge(threads
[i
]->stats
);
139 threads
[0]->stats
.Report("Stress Test");
141 for (unsigned int i
= 0; i
< n
; i
++) {
143 threads
[i
] = nullptr;
145 now
= db_stress_env
->NowMicros();
146 if (!FLAGS_skip_verifydb
&& !FLAGS_test_batches_snapshots
&&
147 !shared
.HasVerificationFailedYet()) {
148 fprintf(stdout
, "%s Verification successful\n",
149 db_stress_env
->TimeToString(now
/ 1000000).c_str());
151 stress
->PrintStatistics();
153 if (FLAGS_compaction_thread_pool_adjust_interval
> 0 ||
154 FLAGS_continuous_verification_interval
> 0) {
155 MutexLock
l(shared
.GetMutex());
156 shared
.SetShouldStopBgThread();
157 while (!shared
.BgThreadsFinished()) {
158 shared
.GetCondVar()->Wait();
162 if (!stress
->VerifySecondaries()) {
166 if (shared
.HasVerificationFailedYet()) {
167 fprintf(stderr
, "Verification failed :(\n");
172 } // namespace ROCKSDB_NAMESPACE