1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
7 #include <condition_variable>
9 #include "monitoring/thread_status_updater.h"
10 #include "rocksdb/db.h"
11 #include "util/testharness.h"
13 #ifdef ROCKSDB_USING_THREAD_STATUS
17 class SimulatedBackgroundTask
{
19 SimulatedBackgroundTask(
20 const void* db_key
, const std::string
& db_name
,
21 const void* cf_key
, const std::string
& cf_name
,
22 const ThreadStatus::OperationType operation_type
=
23 ThreadStatus::OP_UNKNOWN
,
24 const ThreadStatus::StateType state_type
=
25 ThreadStatus::STATE_UNKNOWN
)
26 : db_key_(db_key
), db_name_(db_name
),
27 cf_key_(cf_key
), cf_name_(cf_name
),
28 operation_type_(operation_type
), state_type_(state_type
),
29 should_run_(true), running_count_(0) {
30 Env::Default()->GetThreadStatusUpdater()->NewColumnFamilyInfo(
31 db_key_
, db_name_
, cf_key_
, cf_name_
);
34 ~SimulatedBackgroundTask() {
35 Env::Default()->GetThreadStatusUpdater()->EraseDatabaseInfo(db_key_
);
39 std::unique_lock
<std::mutex
> l(mutex_
);
41 Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_
);
42 Env::Default()->GetThreadStatusUpdater()->SetThreadOperation(
44 Env::Default()->GetThreadStatusUpdater()->SetThreadState(state_type_
);
48 Env::Default()->GetThreadStatusUpdater()->ClearThreadState();
49 Env::Default()->GetThreadStatusUpdater()->ClearThreadOperation();
50 Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(nullptr);
55 void FinishAllTasks() {
56 std::unique_lock
<std::mutex
> l(mutex_
);
61 void WaitUntilScheduled(int job_count
, Env
* env
) {
62 while (running_count_
< job_count
) {
63 env
->SleepForMicroseconds(1000);
67 void WaitUntilDone() {
68 std::unique_lock
<std::mutex
> l(mutex_
);
69 while (running_count_
> 0) {
74 static void DoSimulatedTask(void* arg
) {
75 reinterpret_cast<SimulatedBackgroundTask
*>(arg
)->Run();
80 const std::string db_name_
;
82 const std::string cf_name_
;
83 const ThreadStatus::OperationType operation_type_
;
84 const ThreadStatus::StateType state_type_
;
86 std::condition_variable bg_cv_
;
88 std::atomic
<int> running_count_
;
91 class ThreadListTest
: public testing::Test
{
97 TEST_F(ThreadListTest
, GlobalTables
) {
98 // verify the global tables for operations and states are properly indexed.
99 for (int type
= 0; type
!= ThreadStatus::NUM_OP_TYPES
; ++type
) {
100 ASSERT_EQ(global_operation_table
[type
].type
, type
);
101 ASSERT_EQ(global_operation_table
[type
].name
,
102 ThreadStatus::GetOperationName(
103 ThreadStatus::OperationType(type
)));
106 for (int type
= 0; type
!= ThreadStatus::NUM_STATE_TYPES
; ++type
) {
107 ASSERT_EQ(global_state_table
[type
].type
, type
);
108 ASSERT_EQ(global_state_table
[type
].name
,
109 ThreadStatus::GetStateName(
110 ThreadStatus::StateType(type
)));
113 for (int stage
= 0; stage
!= ThreadStatus::NUM_OP_STAGES
; ++stage
) {
114 ASSERT_EQ(global_op_stage_table
[stage
].stage
, stage
);
115 ASSERT_EQ(global_op_stage_table
[stage
].name
,
116 ThreadStatus::GetOperationStageName(
117 ThreadStatus::OperationStage(stage
)));
121 TEST_F(ThreadListTest
, SimpleColumnFamilyInfoTest
) {
122 Env
* env
= Env::Default();
123 const int kHighPriorityThreads
= 3;
124 const int kLowPriorityThreads
= 5;
125 const int kSimulatedHighPriThreads
= kHighPriorityThreads
- 1;
126 const int kSimulatedLowPriThreads
= kLowPriorityThreads
/ 3;
127 env
->SetBackgroundThreads(kHighPriorityThreads
, Env::HIGH
);
128 env
->SetBackgroundThreads(kLowPriorityThreads
, Env::LOW
);
130 SimulatedBackgroundTask
running_task(
131 reinterpret_cast<void*>(1234), "running",
132 reinterpret_cast<void*>(5678), "pikachu");
134 for (int test
= 0; test
< kSimulatedHighPriThreads
; ++test
) {
135 env
->Schedule(&SimulatedBackgroundTask::DoSimulatedTask
,
136 &running_task
, Env::Priority::HIGH
);
138 for (int test
= 0; test
< kSimulatedLowPriThreads
; ++test
) {
139 env
->Schedule(&SimulatedBackgroundTask::DoSimulatedTask
,
140 &running_task
, Env::Priority::LOW
);
142 running_task
.WaitUntilScheduled(
143 kSimulatedHighPriThreads
+ kSimulatedLowPriThreads
, env
);
145 std::vector
<ThreadStatus
> thread_list
;
147 // Verify the number of running threads in each pool.
148 env
->GetThreadList(&thread_list
);
149 int running_count
[ThreadStatus::NUM_THREAD_TYPES
] = {0};
150 for (auto thread_status
: thread_list
) {
151 if (thread_status
.cf_name
== "pikachu" &&
152 thread_status
.db_name
== "running") {
153 running_count
[thread_status
.thread_type
]++;
157 running_count
[ThreadStatus::HIGH_PRIORITY
],
158 kSimulatedHighPriThreads
);
160 running_count
[ThreadStatus::LOW_PRIORITY
],
161 kSimulatedLowPriThreads
);
163 running_count
[ThreadStatus::USER
], 0);
165 running_task
.FinishAllTasks();
166 running_task
.WaitUntilDone();
168 // Verify none of the threads are running
169 env
->GetThreadList(&thread_list
);
171 for (int i
= 0; i
< ThreadStatus::NUM_THREAD_TYPES
; ++i
) {
172 running_count
[i
] = 0;
174 for (auto thread_status
: thread_list
) {
175 if (thread_status
.cf_name
== "pikachu" &&
176 thread_status
.db_name
== "running") {
177 running_count
[thread_status
.thread_type
]++;
182 running_count
[ThreadStatus::HIGH_PRIORITY
], 0);
184 running_count
[ThreadStatus::LOW_PRIORITY
], 0);
186 running_count
[ThreadStatus::USER
], 0);
190 void UpdateStatusCounts(
191 const std::vector
<ThreadStatus
>& thread_list
,
192 int operation_counts
[], int state_counts
[]) {
193 for (auto thread_status
: thread_list
) {
194 operation_counts
[thread_status
.operation_type
]++;
195 state_counts
[thread_status
.state_type
]++;
199 void VerifyAndResetCounts(
200 const int correct_counts
[], int collected_counts
[], int size
) {
201 for (int i
= 0; i
< size
; ++i
) {
202 ASSERT_EQ(collected_counts
[i
], correct_counts
[i
]);
203 collected_counts
[i
] = 0;
208 int operation_counts
[], int from_event
, int to_event
, int amount
) {
209 operation_counts
[from_event
] -= amount
;
210 operation_counts
[to_event
] += amount
;
214 TEST_F(ThreadListTest
, SimpleEventTest
) {
215 Env
* env
= Env::Default();
218 const int kFlushWriteTasks
= 3;
219 SimulatedBackgroundTask
flush_write_task(
220 reinterpret_cast<void*>(1234), "running",
221 reinterpret_cast<void*>(5678), "pikachu",
222 ThreadStatus::OP_FLUSH
);
224 const int kCompactionWriteTasks
= 4;
225 SimulatedBackgroundTask
compaction_write_task(
226 reinterpret_cast<void*>(1234), "running",
227 reinterpret_cast<void*>(5678), "pikachu",
228 ThreadStatus::OP_COMPACTION
);
230 const int kCompactionReadTasks
= 5;
231 SimulatedBackgroundTask
compaction_read_task(
232 reinterpret_cast<void*>(1234), "running",
233 reinterpret_cast<void*>(5678), "pikachu",
234 ThreadStatus::OP_COMPACTION
);
236 const int kCompactionWaitTasks
= 6;
237 SimulatedBackgroundTask
compaction_wait_task(
238 reinterpret_cast<void*>(1234), "running",
239 reinterpret_cast<void*>(5678), "pikachu",
240 ThreadStatus::OP_COMPACTION
);
242 // setup right answers
243 int correct_operation_counts
[ThreadStatus::NUM_OP_TYPES
] = {0};
244 correct_operation_counts
[ThreadStatus::OP_FLUSH
] =
246 correct_operation_counts
[ThreadStatus::OP_COMPACTION
] =
247 kCompactionWriteTasks
+ kCompactionReadTasks
+ kCompactionWaitTasks
;
249 env
->SetBackgroundThreads(
250 correct_operation_counts
[ThreadStatus::OP_FLUSH
], Env::HIGH
);
251 env
->SetBackgroundThreads(
252 correct_operation_counts
[ThreadStatus::OP_COMPACTION
], Env::LOW
);
254 // schedule the simulated tasks
255 for (int t
= 0; t
< kFlushWriteTasks
; ++t
) {
256 env
->Schedule(&SimulatedBackgroundTask::DoSimulatedTask
,
257 &flush_write_task
, Env::Priority::HIGH
);
259 flush_write_task
.WaitUntilScheduled(kFlushWriteTasks
, env
);
261 for (int t
= 0; t
< kCompactionWriteTasks
; ++t
) {
262 env
->Schedule(&SimulatedBackgroundTask::DoSimulatedTask
,
263 &compaction_write_task
, Env::Priority::LOW
);
265 compaction_write_task
.WaitUntilScheduled(kCompactionWriteTasks
, env
);
267 for (int t
= 0; t
< kCompactionReadTasks
; ++t
) {
268 env
->Schedule(&SimulatedBackgroundTask::DoSimulatedTask
,
269 &compaction_read_task
, Env::Priority::LOW
);
271 compaction_read_task
.WaitUntilScheduled(kCompactionReadTasks
, env
);
273 for (int t
= 0; t
< kCompactionWaitTasks
; ++t
) {
274 env
->Schedule(&SimulatedBackgroundTask::DoSimulatedTask
,
275 &compaction_wait_task
, Env::Priority::LOW
);
277 compaction_wait_task
.WaitUntilScheduled(kCompactionWaitTasks
, env
);
279 // verify the thread-status
280 int operation_counts
[ThreadStatus::NUM_OP_TYPES
] = {0};
281 int state_counts
[ThreadStatus::NUM_STATE_TYPES
] = {0};
283 std::vector
<ThreadStatus
> thread_list
;
284 env
->GetThreadList(&thread_list
);
285 UpdateStatusCounts(thread_list
, operation_counts
, state_counts
);
286 VerifyAndResetCounts(correct_operation_counts
, operation_counts
,
287 ThreadStatus::NUM_OP_TYPES
);
289 // terminate compaction-wait tasks and see if the thread-status
290 // reflects this update
291 compaction_wait_task
.FinishAllTasks();
292 compaction_wait_task
.WaitUntilDone();
293 UpdateCount(correct_operation_counts
, ThreadStatus::OP_COMPACTION
,
294 ThreadStatus::OP_UNKNOWN
, kCompactionWaitTasks
);
296 env
->GetThreadList(&thread_list
);
297 UpdateStatusCounts(thread_list
, operation_counts
, state_counts
);
298 VerifyAndResetCounts(correct_operation_counts
, operation_counts
,
299 ThreadStatus::NUM_OP_TYPES
);
301 // terminate flush-write tasks and see if the thread-status
302 // reflects this update
303 flush_write_task
.FinishAllTasks();
304 flush_write_task
.WaitUntilDone();
305 UpdateCount(correct_operation_counts
, ThreadStatus::OP_FLUSH
,
306 ThreadStatus::OP_UNKNOWN
, kFlushWriteTasks
);
308 env
->GetThreadList(&thread_list
);
309 UpdateStatusCounts(thread_list
, operation_counts
, state_counts
);
310 VerifyAndResetCounts(correct_operation_counts
, operation_counts
,
311 ThreadStatus::NUM_OP_TYPES
);
313 // terminate compaction-write tasks and see if the thread-status
314 // reflects this update
315 compaction_write_task
.FinishAllTasks();
316 compaction_write_task
.WaitUntilDone();
317 UpdateCount(correct_operation_counts
, ThreadStatus::OP_COMPACTION
,
318 ThreadStatus::OP_UNKNOWN
, kCompactionWriteTasks
);
320 env
->GetThreadList(&thread_list
);
321 UpdateStatusCounts(thread_list
, operation_counts
, state_counts
);
322 VerifyAndResetCounts(correct_operation_counts
, operation_counts
,
323 ThreadStatus::NUM_OP_TYPES
);
325 // terminate compaction-write tasks and see if the thread-status
326 // reflects this update
327 compaction_read_task
.FinishAllTasks();
328 compaction_read_task
.WaitUntilDone();
329 UpdateCount(correct_operation_counts
, ThreadStatus::OP_COMPACTION
,
330 ThreadStatus::OP_UNKNOWN
, kCompactionReadTasks
);
332 env
->GetThreadList(&thread_list
);
333 UpdateStatusCounts(thread_list
, operation_counts
, state_counts
);
334 VerifyAndResetCounts(correct_operation_counts
, operation_counts
,
335 ThreadStatus::NUM_OP_TYPES
);
338 } // namespace rocksdb
340 int main(int argc
, char** argv
) {
341 ::testing::InitGoogleTest(&argc
, argv
);
342 return RUN_ALL_TESTS();
347 int main(int argc
, char** argv
) {
348 ::testing::InitGoogleTest(&argc
, argv
);
352 #endif // ROCKSDB_USING_THREAD_STATUS