]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | // | |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | // | |
10 | ||
11 | #ifdef GFLAGS | |
12 | #include "db_stress_tool/db_stress_common.h" | |
20effc67 | 13 | #include "utilities/fault_injection_fs.h" |
f67539c2 TL |
14 | |
15 | namespace ROCKSDB_NAMESPACE { | |
16 | void ThreadBody(void* v) { | |
17 | ThreadState* thread = reinterpret_cast<ThreadState*>(v); | |
18 | SharedState* shared = thread->shared; | |
19 | ||
20effc67 | 20 | if (!FLAGS_skip_verifydb && shared->ShouldVerifyAtBeginning()) { |
f67539c2 TL |
21 | thread->shared->GetStressTest()->VerifyDb(thread); |
22 | } | |
23 | { | |
24 | MutexLock l(shared->GetMutex()); | |
25 | shared->IncInitialized(); | |
26 | if (shared->AllInitialized()) { | |
27 | shared->GetCondVar()->SignalAll(); | |
28 | } | |
29 | while (!shared->Started()) { | |
30 | shared->GetCondVar()->Wait(); | |
31 | } | |
32 | } | |
33 | thread->shared->GetStressTest()->OperateDb(thread); | |
34 | ||
35 | { | |
36 | MutexLock l(shared->GetMutex()); | |
37 | shared->IncOperated(); | |
38 | if (shared->AllOperated()) { | |
39 | shared->GetCondVar()->SignalAll(); | |
40 | } | |
41 | while (!shared->VerifyStarted()) { | |
42 | shared->GetCondVar()->Wait(); | |
43 | } | |
44 | } | |
45 | ||
20effc67 TL |
46 | if (!FLAGS_skip_verifydb) { |
47 | thread->shared->GetStressTest()->VerifyDb(thread); | |
48 | } | |
f67539c2 TL |
49 | |
50 | { | |
51 | MutexLock l(shared->GetMutex()); | |
52 | shared->IncDone(); | |
53 | if (shared->AllDone()) { | |
54 | shared->GetCondVar()->SignalAll(); | |
55 | } | |
56 | } | |
57 | } | |
58 | ||
59 | bool RunStressTest(StressTest* stress) { | |
1e59de90 TL |
60 | SystemClock* clock = db_stress_env->GetSystemClock().get(); |
61 | ||
f67539c2 | 62 | SharedState shared(db_stress_env, stress); |
1e59de90 TL |
63 | |
64 | if (shared.ShouldVerifyAtBeginning() && FLAGS_preserve_unverified_changes) { | |
65 | Status s = InitUnverifiedSubdir(FLAGS_db); | |
66 | if (s.ok() && !FLAGS_expected_values_dir.empty()) { | |
67 | s = InitUnverifiedSubdir(FLAGS_expected_values_dir); | |
68 | } | |
69 | if (!s.ok()) { | |
70 | fprintf(stderr, "Failed to setup unverified state dir: %s\n", | |
71 | s.ToString().c_str()); | |
72 | exit(1); | |
73 | } | |
74 | } | |
75 | ||
76 | stress->InitDb(&shared); | |
20effc67 TL |
77 | stress->FinishInitDb(&shared); |
78 | ||
20effc67 TL |
79 | if (FLAGS_sync_fault_injection) { |
80 | fault_fs_guard->SetFilesystemDirectWritable(false); | |
f67539c2 | 81 | } |
1e59de90 TL |
82 | if (FLAGS_write_fault_one_in) { |
83 | fault_fs_guard->EnableWriteErrorInjection(); | |
84 | } | |
f67539c2 | 85 | |
1e59de90 TL |
86 | uint32_t n = FLAGS_threads; |
87 | uint64_t now = clock->NowMicros(); | |
f67539c2 | 88 | fprintf(stdout, "%s Initializing worker threads\n", |
1e59de90 TL |
89 | clock->TimeToString(now / 1000000).c_str()); |
90 | ||
91 | shared.SetThreads(n); | |
92 | ||
93 | if (FLAGS_compaction_thread_pool_adjust_interval > 0) { | |
94 | shared.IncBgThreads(); | |
95 | } | |
96 | ||
97 | if (FLAGS_continuous_verification_interval > 0) { | |
98 | shared.IncBgThreads(); | |
99 | } | |
100 | ||
f67539c2 TL |
101 | std::vector<ThreadState*> threads(n); |
102 | for (uint32_t i = 0; i < n; i++) { | |
103 | threads[i] = new ThreadState(i, &shared); | |
104 | db_stress_env->StartThread(ThreadBody, threads[i]); | |
105 | } | |
1e59de90 | 106 | |
f67539c2 TL |
107 | ThreadState bg_thread(0, &shared); |
108 | if (FLAGS_compaction_thread_pool_adjust_interval > 0) { | |
109 | db_stress_env->StartThread(PoolSizeChangeThread, &bg_thread); | |
110 | } | |
1e59de90 | 111 | |
f67539c2 TL |
112 | ThreadState continuous_verification_thread(0, &shared); |
113 | if (FLAGS_continuous_verification_interval > 0) { | |
114 | db_stress_env->StartThread(DbVerificationThread, | |
115 | &continuous_verification_thread); | |
116 | } | |
117 | ||
118 | // Each thread goes through the following states: | |
119 | // initializing -> wait for others to init -> read/populate/depopulate | |
120 | // wait for others to operate -> verify -> done | |
121 | ||
122 | { | |
123 | MutexLock l(shared.GetMutex()); | |
124 | while (!shared.AllInitialized()) { | |
125 | shared.GetCondVar()->Wait(); | |
126 | } | |
127 | if (shared.ShouldVerifyAtBeginning()) { | |
128 | if (shared.HasVerificationFailedYet()) { | |
129 | fprintf(stderr, "Crash-recovery verification failed :(\n"); | |
130 | } else { | |
131 | fprintf(stdout, "Crash-recovery verification passed :)\n"); | |
1e59de90 TL |
132 | Status s = DestroyUnverifiedSubdir(FLAGS_db); |
133 | if (s.ok() && !FLAGS_expected_values_dir.empty()) { | |
134 | s = DestroyUnverifiedSubdir(FLAGS_expected_values_dir); | |
135 | } | |
136 | if (!s.ok()) { | |
137 | fprintf(stderr, "Failed to cleanup unverified state dir: %s\n", | |
138 | s.ToString().c_str()); | |
139 | exit(1); | |
140 | } | |
f67539c2 TL |
141 | } |
142 | } | |
143 | ||
1e59de90 TL |
144 | // This is after the verification step to avoid making all those `Get()`s |
145 | // and `MultiGet()`s contend on the DB-wide trace mutex. | |
146 | if (!FLAGS_expected_values_dir.empty()) { | |
147 | stress->TrackExpectedState(&shared); | |
148 | } | |
149 | ||
150 | now = clock->NowMicros(); | |
f67539c2 | 151 | fprintf(stdout, "%s Starting database operations\n", |
1e59de90 | 152 | clock->TimeToString(now / 1000000).c_str()); |
f67539c2 TL |
153 | |
154 | shared.SetStart(); | |
155 | shared.GetCondVar()->SignalAll(); | |
156 | while (!shared.AllOperated()) { | |
157 | shared.GetCondVar()->Wait(); | |
158 | } | |
159 | ||
1e59de90 | 160 | now = clock->NowMicros(); |
f67539c2 TL |
161 | if (FLAGS_test_batches_snapshots) { |
162 | fprintf(stdout, "%s Limited verification already done during gets\n", | |
1e59de90 | 163 | clock->TimeToString((uint64_t)now / 1000000).c_str()); |
20effc67 TL |
164 | } else if (FLAGS_skip_verifydb) { |
165 | fprintf(stdout, "%s Verification skipped\n", | |
1e59de90 | 166 | clock->TimeToString((uint64_t)now / 1000000).c_str()); |
f67539c2 TL |
167 | } else { |
168 | fprintf(stdout, "%s Starting verification\n", | |
1e59de90 | 169 | clock->TimeToString((uint64_t)now / 1000000).c_str()); |
f67539c2 TL |
170 | } |
171 | ||
172 | shared.SetStartVerify(); | |
173 | shared.GetCondVar()->SignalAll(); | |
174 | while (!shared.AllDone()) { | |
175 | shared.GetCondVar()->Wait(); | |
176 | } | |
177 | } | |
178 | ||
179 | for (unsigned int i = 1; i < n; i++) { | |
180 | threads[0]->stats.Merge(threads[i]->stats); | |
181 | } | |
182 | threads[0]->stats.Report("Stress Test"); | |
183 | ||
184 | for (unsigned int i = 0; i < n; i++) { | |
185 | delete threads[i]; | |
186 | threads[i] = nullptr; | |
187 | } | |
1e59de90 | 188 | now = clock->NowMicros(); |
20effc67 TL |
189 | if (!FLAGS_skip_verifydb && !FLAGS_test_batches_snapshots && |
190 | !shared.HasVerificationFailedYet()) { | |
f67539c2 | 191 | fprintf(stdout, "%s Verification successful\n", |
1e59de90 | 192 | clock->TimeToString(now / 1000000).c_str()); |
f67539c2 TL |
193 | } |
194 | stress->PrintStatistics(); | |
195 | ||
196 | if (FLAGS_compaction_thread_pool_adjust_interval > 0 || | |
197 | FLAGS_continuous_verification_interval > 0) { | |
198 | MutexLock l(shared.GetMutex()); | |
199 | shared.SetShouldStopBgThread(); | |
200 | while (!shared.BgThreadsFinished()) { | |
201 | shared.GetCondVar()->Wait(); | |
202 | } | |
203 | } | |
204 | ||
f67539c2 TL |
205 | if (shared.HasVerificationFailedYet()) { |
206 | fprintf(stderr, "Verification failed :(\n"); | |
207 | return false; | |
208 | } | |
209 | return true; | |
210 | } | |
211 | } // namespace ROCKSDB_NAMESPACE | |
212 | #endif // GFLAGS |