]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #include <mutex> | |
7 | #include <condition_variable> | |
8 | ||
9 | #include "monitoring/thread_status_updater.h" | |
10 | #include "rocksdb/db.h" | |
11 | #include "util/testharness.h" | |
12 | ||
13 | #ifdef ROCKSDB_USING_THREAD_STATUS | |
14 | ||
15 | namespace rocksdb { | |
16 | ||
17 | class SimulatedBackgroundTask { | |
18 | public: | |
19 | SimulatedBackgroundTask( | |
20 | const void* db_key, const std::string& db_name, | |
21 | const void* cf_key, const std::string& cf_name, | |
22 | const ThreadStatus::OperationType operation_type = | |
23 | ThreadStatus::OP_UNKNOWN, | |
24 | const ThreadStatus::StateType state_type = | |
25 | ThreadStatus::STATE_UNKNOWN) | |
26 | : db_key_(db_key), db_name_(db_name), | |
27 | cf_key_(cf_key), cf_name_(cf_name), | |
28 | operation_type_(operation_type), state_type_(state_type), | |
29 | should_run_(true), running_count_(0) { | |
30 | Env::Default()->GetThreadStatusUpdater()->NewColumnFamilyInfo( | |
31 | db_key_, db_name_, cf_key_, cf_name_); | |
32 | } | |
33 | ||
34 | ~SimulatedBackgroundTask() { | |
35 | Env::Default()->GetThreadStatusUpdater()->EraseDatabaseInfo(db_key_); | |
36 | } | |
37 | ||
38 | void Run() { | |
39 | std::unique_lock<std::mutex> l(mutex_); | |
40 | running_count_++; | |
41 | Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_); | |
42 | Env::Default()->GetThreadStatusUpdater()->SetThreadOperation( | |
43 | operation_type_); | |
44 | Env::Default()->GetThreadStatusUpdater()->SetThreadState(state_type_); | |
45 | while (should_run_) { | |
46 | bg_cv_.wait(l); | |
47 | } | |
48 | Env::Default()->GetThreadStatusUpdater()->ClearThreadState(); | |
49 | Env::Default()->GetThreadStatusUpdater()->ClearThreadOperation(); | |
11fdf7f2 | 50 | Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(nullptr); |
7c673cae FG |
51 | running_count_--; |
52 | bg_cv_.notify_all(); | |
53 | } | |
54 | ||
55 | void FinishAllTasks() { | |
56 | std::unique_lock<std::mutex> l(mutex_); | |
57 | should_run_ = false; | |
58 | bg_cv_.notify_all(); | |
59 | } | |
60 | ||
61 | void WaitUntilScheduled(int job_count, Env* env) { | |
62 | while (running_count_ < job_count) { | |
63 | env->SleepForMicroseconds(1000); | |
64 | } | |
65 | } | |
66 | ||
67 | void WaitUntilDone() { | |
68 | std::unique_lock<std::mutex> l(mutex_); | |
69 | while (running_count_ > 0) { | |
70 | bg_cv_.wait(l); | |
71 | } | |
72 | } | |
73 | ||
74 | static void DoSimulatedTask(void* arg) { | |
75 | reinterpret_cast<SimulatedBackgroundTask*>(arg)->Run(); | |
76 | } | |
77 | ||
78 | private: | |
79 | const void* db_key_; | |
80 | const std::string db_name_; | |
81 | const void* cf_key_; | |
82 | const std::string cf_name_; | |
83 | const ThreadStatus::OperationType operation_type_; | |
84 | const ThreadStatus::StateType state_type_; | |
85 | std::mutex mutex_; | |
86 | std::condition_variable bg_cv_; | |
87 | bool should_run_; | |
88 | std::atomic<int> running_count_; | |
89 | }; | |
90 | ||
91 | class ThreadListTest : public testing::Test { | |
92 | public: | |
93 | ThreadListTest() { | |
94 | } | |
95 | }; | |
96 | ||
97 | TEST_F(ThreadListTest, GlobalTables) { | |
98 | // verify the global tables for operations and states are properly indexed. | |
99 | for (int type = 0; type != ThreadStatus::NUM_OP_TYPES; ++type) { | |
100 | ASSERT_EQ(global_operation_table[type].type, type); | |
101 | ASSERT_EQ(global_operation_table[type].name, | |
102 | ThreadStatus::GetOperationName( | |
103 | ThreadStatus::OperationType(type))); | |
104 | } | |
105 | ||
106 | for (int type = 0; type != ThreadStatus::NUM_STATE_TYPES; ++type) { | |
107 | ASSERT_EQ(global_state_table[type].type, type); | |
108 | ASSERT_EQ(global_state_table[type].name, | |
109 | ThreadStatus::GetStateName( | |
110 | ThreadStatus::StateType(type))); | |
111 | } | |
112 | ||
113 | for (int stage = 0; stage != ThreadStatus::NUM_OP_STAGES; ++stage) { | |
114 | ASSERT_EQ(global_op_stage_table[stage].stage, stage); | |
115 | ASSERT_EQ(global_op_stage_table[stage].name, | |
116 | ThreadStatus::GetOperationStageName( | |
117 | ThreadStatus::OperationStage(stage))); | |
118 | } | |
119 | } | |
120 | ||
121 | TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) { | |
122 | Env* env = Env::Default(); | |
123 | const int kHighPriorityThreads = 3; | |
124 | const int kLowPriorityThreads = 5; | |
125 | const int kSimulatedHighPriThreads = kHighPriorityThreads - 1; | |
126 | const int kSimulatedLowPriThreads = kLowPriorityThreads / 3; | |
127 | env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH); | |
128 | env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW); | |
129 | ||
130 | SimulatedBackgroundTask running_task( | |
131 | reinterpret_cast<void*>(1234), "running", | |
132 | reinterpret_cast<void*>(5678), "pikachu"); | |
133 | ||
134 | for (int test = 0; test < kSimulatedHighPriThreads; ++test) { | |
135 | env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, | |
136 | &running_task, Env::Priority::HIGH); | |
137 | } | |
138 | for (int test = 0; test < kSimulatedLowPriThreads; ++test) { | |
139 | env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, | |
140 | &running_task, Env::Priority::LOW); | |
141 | } | |
142 | running_task.WaitUntilScheduled( | |
143 | kSimulatedHighPriThreads + kSimulatedLowPriThreads, env); | |
144 | ||
145 | std::vector<ThreadStatus> thread_list; | |
146 | ||
147 | // Verify the number of running threads in each pool. | |
148 | env->GetThreadList(&thread_list); | |
149 | int running_count[ThreadStatus::NUM_THREAD_TYPES] = {0}; | |
150 | for (auto thread_status : thread_list) { | |
151 | if (thread_status.cf_name == "pikachu" && | |
152 | thread_status.db_name == "running") { | |
153 | running_count[thread_status.thread_type]++; | |
154 | } | |
155 | } | |
156 | ASSERT_EQ( | |
157 | running_count[ThreadStatus::HIGH_PRIORITY], | |
158 | kSimulatedHighPriThreads); | |
159 | ASSERT_EQ( | |
160 | running_count[ThreadStatus::LOW_PRIORITY], | |
161 | kSimulatedLowPriThreads); | |
162 | ASSERT_EQ( | |
163 | running_count[ThreadStatus::USER], 0); | |
164 | ||
165 | running_task.FinishAllTasks(); | |
166 | running_task.WaitUntilDone(); | |
167 | ||
168 | // Verify none of the threads are running | |
169 | env->GetThreadList(&thread_list); | |
170 | ||
171 | for (int i = 0; i < ThreadStatus::NUM_THREAD_TYPES; ++i) { | |
172 | running_count[i] = 0; | |
173 | } | |
174 | for (auto thread_status : thread_list) { | |
175 | if (thread_status.cf_name == "pikachu" && | |
176 | thread_status.db_name == "running") { | |
177 | running_count[thread_status.thread_type]++; | |
178 | } | |
179 | } | |
180 | ||
181 | ASSERT_EQ( | |
182 | running_count[ThreadStatus::HIGH_PRIORITY], 0); | |
183 | ASSERT_EQ( | |
184 | running_count[ThreadStatus::LOW_PRIORITY], 0); | |
185 | ASSERT_EQ( | |
186 | running_count[ThreadStatus::USER], 0); | |
187 | } | |
188 | ||
189 | namespace { | |
190 | void UpdateStatusCounts( | |
191 | const std::vector<ThreadStatus>& thread_list, | |
192 | int operation_counts[], int state_counts[]) { | |
193 | for (auto thread_status : thread_list) { | |
194 | operation_counts[thread_status.operation_type]++; | |
195 | state_counts[thread_status.state_type]++; | |
196 | } | |
197 | } | |
198 | ||
199 | void VerifyAndResetCounts( | |
200 | const int correct_counts[], int collected_counts[], int size) { | |
201 | for (int i = 0; i < size; ++i) { | |
202 | ASSERT_EQ(collected_counts[i], correct_counts[i]); | |
203 | collected_counts[i] = 0; | |
204 | } | |
205 | } | |
206 | ||
207 | void UpdateCount( | |
208 | int operation_counts[], int from_event, int to_event, int amount) { | |
209 | operation_counts[from_event] -= amount; | |
210 | operation_counts[to_event] += amount; | |
211 | } | |
212 | } // namespace | |
213 | ||
214 | TEST_F(ThreadListTest, SimpleEventTest) { | |
215 | Env* env = Env::Default(); | |
216 | ||
217 | // simulated tasks | |
218 | const int kFlushWriteTasks = 3; | |
219 | SimulatedBackgroundTask flush_write_task( | |
220 | reinterpret_cast<void*>(1234), "running", | |
221 | reinterpret_cast<void*>(5678), "pikachu", | |
222 | ThreadStatus::OP_FLUSH); | |
223 | ||
224 | const int kCompactionWriteTasks = 4; | |
225 | SimulatedBackgroundTask compaction_write_task( | |
226 | reinterpret_cast<void*>(1234), "running", | |
227 | reinterpret_cast<void*>(5678), "pikachu", | |
228 | ThreadStatus::OP_COMPACTION); | |
229 | ||
230 | const int kCompactionReadTasks = 5; | |
231 | SimulatedBackgroundTask compaction_read_task( | |
232 | reinterpret_cast<void*>(1234), "running", | |
233 | reinterpret_cast<void*>(5678), "pikachu", | |
234 | ThreadStatus::OP_COMPACTION); | |
235 | ||
236 | const int kCompactionWaitTasks = 6; | |
237 | SimulatedBackgroundTask compaction_wait_task( | |
238 | reinterpret_cast<void*>(1234), "running", | |
239 | reinterpret_cast<void*>(5678), "pikachu", | |
240 | ThreadStatus::OP_COMPACTION); | |
241 | ||
242 | // setup right answers | |
243 | int correct_operation_counts[ThreadStatus::NUM_OP_TYPES] = {0}; | |
244 | correct_operation_counts[ThreadStatus::OP_FLUSH] = | |
245 | kFlushWriteTasks; | |
246 | correct_operation_counts[ThreadStatus::OP_COMPACTION] = | |
247 | kCompactionWriteTasks + kCompactionReadTasks + kCompactionWaitTasks; | |
248 | ||
249 | env->SetBackgroundThreads( | |
250 | correct_operation_counts[ThreadStatus::OP_FLUSH], Env::HIGH); | |
251 | env->SetBackgroundThreads( | |
252 | correct_operation_counts[ThreadStatus::OP_COMPACTION], Env::LOW); | |
253 | ||
254 | // schedule the simulated tasks | |
255 | for (int t = 0; t < kFlushWriteTasks; ++t) { | |
256 | env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, | |
257 | &flush_write_task, Env::Priority::HIGH); | |
258 | } | |
259 | flush_write_task.WaitUntilScheduled(kFlushWriteTasks, env); | |
260 | ||
261 | for (int t = 0; t < kCompactionWriteTasks; ++t) { | |
262 | env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, | |
263 | &compaction_write_task, Env::Priority::LOW); | |
264 | } | |
265 | compaction_write_task.WaitUntilScheduled(kCompactionWriteTasks, env); | |
266 | ||
267 | for (int t = 0; t < kCompactionReadTasks; ++t) { | |
268 | env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, | |
269 | &compaction_read_task, Env::Priority::LOW); | |
270 | } | |
271 | compaction_read_task.WaitUntilScheduled(kCompactionReadTasks, env); | |
272 | ||
273 | for (int t = 0; t < kCompactionWaitTasks; ++t) { | |
274 | env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, | |
275 | &compaction_wait_task, Env::Priority::LOW); | |
276 | } | |
277 | compaction_wait_task.WaitUntilScheduled(kCompactionWaitTasks, env); | |
278 | ||
279 | // verify the thread-status | |
280 | int operation_counts[ThreadStatus::NUM_OP_TYPES] = {0}; | |
281 | int state_counts[ThreadStatus::NUM_STATE_TYPES] = {0}; | |
282 | ||
283 | std::vector<ThreadStatus> thread_list; | |
284 | env->GetThreadList(&thread_list); | |
285 | UpdateStatusCounts(thread_list, operation_counts, state_counts); | |
286 | VerifyAndResetCounts(correct_operation_counts, operation_counts, | |
287 | ThreadStatus::NUM_OP_TYPES); | |
288 | ||
289 | // terminate compaction-wait tasks and see if the thread-status | |
290 | // reflects this update | |
291 | compaction_wait_task.FinishAllTasks(); | |
292 | compaction_wait_task.WaitUntilDone(); | |
293 | UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION, | |
294 | ThreadStatus::OP_UNKNOWN, kCompactionWaitTasks); | |
295 | ||
296 | env->GetThreadList(&thread_list); | |
297 | UpdateStatusCounts(thread_list, operation_counts, state_counts); | |
298 | VerifyAndResetCounts(correct_operation_counts, operation_counts, | |
299 | ThreadStatus::NUM_OP_TYPES); | |
300 | ||
301 | // terminate flush-write tasks and see if the thread-status | |
302 | // reflects this update | |
303 | flush_write_task.FinishAllTasks(); | |
304 | flush_write_task.WaitUntilDone(); | |
305 | UpdateCount(correct_operation_counts, ThreadStatus::OP_FLUSH, | |
306 | ThreadStatus::OP_UNKNOWN, kFlushWriteTasks); | |
307 | ||
308 | env->GetThreadList(&thread_list); | |
309 | UpdateStatusCounts(thread_list, operation_counts, state_counts); | |
310 | VerifyAndResetCounts(correct_operation_counts, operation_counts, | |
311 | ThreadStatus::NUM_OP_TYPES); | |
312 | ||
313 | // terminate compaction-write tasks and see if the thread-status | |
314 | // reflects this update | |
315 | compaction_write_task.FinishAllTasks(); | |
316 | compaction_write_task.WaitUntilDone(); | |
317 | UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION, | |
318 | ThreadStatus::OP_UNKNOWN, kCompactionWriteTasks); | |
319 | ||
320 | env->GetThreadList(&thread_list); | |
321 | UpdateStatusCounts(thread_list, operation_counts, state_counts); | |
322 | VerifyAndResetCounts(correct_operation_counts, operation_counts, | |
323 | ThreadStatus::NUM_OP_TYPES); | |
324 | ||
325 | // terminate compaction-write tasks and see if the thread-status | |
326 | // reflects this update | |
327 | compaction_read_task.FinishAllTasks(); | |
328 | compaction_read_task.WaitUntilDone(); | |
329 | UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION, | |
330 | ThreadStatus::OP_UNKNOWN, kCompactionReadTasks); | |
331 | ||
332 | env->GetThreadList(&thread_list); | |
333 | UpdateStatusCounts(thread_list, operation_counts, state_counts); | |
334 | VerifyAndResetCounts(correct_operation_counts, operation_counts, | |
335 | ThreadStatus::NUM_OP_TYPES); | |
336 | } | |
337 | ||
338 | } // namespace rocksdb | |
339 | ||
340 | int main(int argc, char** argv) { | |
341 | ::testing::InitGoogleTest(&argc, argv); | |
342 | return RUN_ALL_TESTS(); | |
343 | } | |
344 | ||
345 | #else | |
346 | ||
347 | int main(int argc, char** argv) { | |
348 | ::testing::InitGoogleTest(&argc, argv); | |
349 | return 0; | |
350 | } | |
351 | ||
352 | #endif // ROCKSDB_USING_THREAD_STATUS |