]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/util/thread_list_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / util / thread_list_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include <mutex>
7 #include <condition_variable>
8
9 #include "monitoring/thread_status_updater.h"
10 #include "rocksdb/db.h"
11 #include "util/testharness.h"
12
13 #ifdef ROCKSDB_USING_THREAD_STATUS
14
15 namespace rocksdb {
16
17 class SimulatedBackgroundTask {
18 public:
19 SimulatedBackgroundTask(
20 const void* db_key, const std::string& db_name,
21 const void* cf_key, const std::string& cf_name,
22 const ThreadStatus::OperationType operation_type =
23 ThreadStatus::OP_UNKNOWN,
24 const ThreadStatus::StateType state_type =
25 ThreadStatus::STATE_UNKNOWN)
26 : db_key_(db_key), db_name_(db_name),
27 cf_key_(cf_key), cf_name_(cf_name),
28 operation_type_(operation_type), state_type_(state_type),
29 should_run_(true), running_count_(0) {
30 Env::Default()->GetThreadStatusUpdater()->NewColumnFamilyInfo(
31 db_key_, db_name_, cf_key_, cf_name_);
32 }
33
34 ~SimulatedBackgroundTask() {
35 Env::Default()->GetThreadStatusUpdater()->EraseDatabaseInfo(db_key_);
36 }
37
38 void Run() {
39 std::unique_lock<std::mutex> l(mutex_);
40 running_count_++;
41 Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(cf_key_);
42 Env::Default()->GetThreadStatusUpdater()->SetThreadOperation(
43 operation_type_);
44 Env::Default()->GetThreadStatusUpdater()->SetThreadState(state_type_);
45 while (should_run_) {
46 bg_cv_.wait(l);
47 }
48 Env::Default()->GetThreadStatusUpdater()->ClearThreadState();
49 Env::Default()->GetThreadStatusUpdater()->ClearThreadOperation();
50 Env::Default()->GetThreadStatusUpdater()->SetColumnFamilyInfoKey(nullptr);
51 running_count_--;
52 bg_cv_.notify_all();
53 }
54
55 void FinishAllTasks() {
56 std::unique_lock<std::mutex> l(mutex_);
57 should_run_ = false;
58 bg_cv_.notify_all();
59 }
60
61 void WaitUntilScheduled(int job_count, Env* env) {
62 while (running_count_ < job_count) {
63 env->SleepForMicroseconds(1000);
64 }
65 }
66
67 void WaitUntilDone() {
68 std::unique_lock<std::mutex> l(mutex_);
69 while (running_count_ > 0) {
70 bg_cv_.wait(l);
71 }
72 }
73
74 static void DoSimulatedTask(void* arg) {
75 reinterpret_cast<SimulatedBackgroundTask*>(arg)->Run();
76 }
77
78 private:
79 const void* db_key_;
80 const std::string db_name_;
81 const void* cf_key_;
82 const std::string cf_name_;
83 const ThreadStatus::OperationType operation_type_;
84 const ThreadStatus::StateType state_type_;
85 std::mutex mutex_;
86 std::condition_variable bg_cv_;
87 bool should_run_;
88 std::atomic<int> running_count_;
89 };
90
91 class ThreadListTest : public testing::Test {
92 public:
93 ThreadListTest() {
94 }
95 };
96
97 TEST_F(ThreadListTest, GlobalTables) {
98 // verify the global tables for operations and states are properly indexed.
99 for (int type = 0; type != ThreadStatus::NUM_OP_TYPES; ++type) {
100 ASSERT_EQ(global_operation_table[type].type, type);
101 ASSERT_EQ(global_operation_table[type].name,
102 ThreadStatus::GetOperationName(
103 ThreadStatus::OperationType(type)));
104 }
105
106 for (int type = 0; type != ThreadStatus::NUM_STATE_TYPES; ++type) {
107 ASSERT_EQ(global_state_table[type].type, type);
108 ASSERT_EQ(global_state_table[type].name,
109 ThreadStatus::GetStateName(
110 ThreadStatus::StateType(type)));
111 }
112
113 for (int stage = 0; stage != ThreadStatus::NUM_OP_STAGES; ++stage) {
114 ASSERT_EQ(global_op_stage_table[stage].stage, stage);
115 ASSERT_EQ(global_op_stage_table[stage].name,
116 ThreadStatus::GetOperationStageName(
117 ThreadStatus::OperationStage(stage)));
118 }
119 }
120
121 TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) {
122 Env* env = Env::Default();
123 const int kHighPriorityThreads = 3;
124 const int kLowPriorityThreads = 5;
125 const int kSimulatedHighPriThreads = kHighPriorityThreads - 1;
126 const int kSimulatedLowPriThreads = kLowPriorityThreads / 3;
127 env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH);
128 env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW);
129
130 SimulatedBackgroundTask running_task(
131 reinterpret_cast<void*>(1234), "running",
132 reinterpret_cast<void*>(5678), "pikachu");
133
134 for (int test = 0; test < kSimulatedHighPriThreads; ++test) {
135 env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
136 &running_task, Env::Priority::HIGH);
137 }
138 for (int test = 0; test < kSimulatedLowPriThreads; ++test) {
139 env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
140 &running_task, Env::Priority::LOW);
141 }
142 running_task.WaitUntilScheduled(
143 kSimulatedHighPriThreads + kSimulatedLowPriThreads, env);
144
145 std::vector<ThreadStatus> thread_list;
146
147 // Verify the number of running threads in each pool.
148 env->GetThreadList(&thread_list);
149 int running_count[ThreadStatus::NUM_THREAD_TYPES] = {0};
150 for (auto thread_status : thread_list) {
151 if (thread_status.cf_name == "pikachu" &&
152 thread_status.db_name == "running") {
153 running_count[thread_status.thread_type]++;
154 }
155 }
156 ASSERT_EQ(
157 running_count[ThreadStatus::HIGH_PRIORITY],
158 kSimulatedHighPriThreads);
159 ASSERT_EQ(
160 running_count[ThreadStatus::LOW_PRIORITY],
161 kSimulatedLowPriThreads);
162 ASSERT_EQ(
163 running_count[ThreadStatus::USER], 0);
164
165 running_task.FinishAllTasks();
166 running_task.WaitUntilDone();
167
168 // Verify none of the threads are running
169 env->GetThreadList(&thread_list);
170
171 for (int i = 0; i < ThreadStatus::NUM_THREAD_TYPES; ++i) {
172 running_count[i] = 0;
173 }
174 for (auto thread_status : thread_list) {
175 if (thread_status.cf_name == "pikachu" &&
176 thread_status.db_name == "running") {
177 running_count[thread_status.thread_type]++;
178 }
179 }
180
181 ASSERT_EQ(
182 running_count[ThreadStatus::HIGH_PRIORITY], 0);
183 ASSERT_EQ(
184 running_count[ThreadStatus::LOW_PRIORITY], 0);
185 ASSERT_EQ(
186 running_count[ThreadStatus::USER], 0);
187 }
188
189 namespace {
190 void UpdateStatusCounts(
191 const std::vector<ThreadStatus>& thread_list,
192 int operation_counts[], int state_counts[]) {
193 for (auto thread_status : thread_list) {
194 operation_counts[thread_status.operation_type]++;
195 state_counts[thread_status.state_type]++;
196 }
197 }
198
199 void VerifyAndResetCounts(
200 const int correct_counts[], int collected_counts[], int size) {
201 for (int i = 0; i < size; ++i) {
202 ASSERT_EQ(collected_counts[i], correct_counts[i]);
203 collected_counts[i] = 0;
204 }
205 }
206
207 void UpdateCount(
208 int operation_counts[], int from_event, int to_event, int amount) {
209 operation_counts[from_event] -= amount;
210 operation_counts[to_event] += amount;
211 }
212 } // namespace
213
214 TEST_F(ThreadListTest, SimpleEventTest) {
215 Env* env = Env::Default();
216
217 // simulated tasks
218 const int kFlushWriteTasks = 3;
219 SimulatedBackgroundTask flush_write_task(
220 reinterpret_cast<void*>(1234), "running",
221 reinterpret_cast<void*>(5678), "pikachu",
222 ThreadStatus::OP_FLUSH);
223
224 const int kCompactionWriteTasks = 4;
225 SimulatedBackgroundTask compaction_write_task(
226 reinterpret_cast<void*>(1234), "running",
227 reinterpret_cast<void*>(5678), "pikachu",
228 ThreadStatus::OP_COMPACTION);
229
230 const int kCompactionReadTasks = 5;
231 SimulatedBackgroundTask compaction_read_task(
232 reinterpret_cast<void*>(1234), "running",
233 reinterpret_cast<void*>(5678), "pikachu",
234 ThreadStatus::OP_COMPACTION);
235
236 const int kCompactionWaitTasks = 6;
237 SimulatedBackgroundTask compaction_wait_task(
238 reinterpret_cast<void*>(1234), "running",
239 reinterpret_cast<void*>(5678), "pikachu",
240 ThreadStatus::OP_COMPACTION);
241
242 // setup right answers
243 int correct_operation_counts[ThreadStatus::NUM_OP_TYPES] = {0};
244 correct_operation_counts[ThreadStatus::OP_FLUSH] =
245 kFlushWriteTasks;
246 correct_operation_counts[ThreadStatus::OP_COMPACTION] =
247 kCompactionWriteTasks + kCompactionReadTasks + kCompactionWaitTasks;
248
249 env->SetBackgroundThreads(
250 correct_operation_counts[ThreadStatus::OP_FLUSH], Env::HIGH);
251 env->SetBackgroundThreads(
252 correct_operation_counts[ThreadStatus::OP_COMPACTION], Env::LOW);
253
254 // schedule the simulated tasks
255 for (int t = 0; t < kFlushWriteTasks; ++t) {
256 env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
257 &flush_write_task, Env::Priority::HIGH);
258 }
259 flush_write_task.WaitUntilScheduled(kFlushWriteTasks, env);
260
261 for (int t = 0; t < kCompactionWriteTasks; ++t) {
262 env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
263 &compaction_write_task, Env::Priority::LOW);
264 }
265 compaction_write_task.WaitUntilScheduled(kCompactionWriteTasks, env);
266
267 for (int t = 0; t < kCompactionReadTasks; ++t) {
268 env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
269 &compaction_read_task, Env::Priority::LOW);
270 }
271 compaction_read_task.WaitUntilScheduled(kCompactionReadTasks, env);
272
273 for (int t = 0; t < kCompactionWaitTasks; ++t) {
274 env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask,
275 &compaction_wait_task, Env::Priority::LOW);
276 }
277 compaction_wait_task.WaitUntilScheduled(kCompactionWaitTasks, env);
278
279 // verify the thread-status
280 int operation_counts[ThreadStatus::NUM_OP_TYPES] = {0};
281 int state_counts[ThreadStatus::NUM_STATE_TYPES] = {0};
282
283 std::vector<ThreadStatus> thread_list;
284 env->GetThreadList(&thread_list);
285 UpdateStatusCounts(thread_list, operation_counts, state_counts);
286 VerifyAndResetCounts(correct_operation_counts, operation_counts,
287 ThreadStatus::NUM_OP_TYPES);
288
289 // terminate compaction-wait tasks and see if the thread-status
290 // reflects this update
291 compaction_wait_task.FinishAllTasks();
292 compaction_wait_task.WaitUntilDone();
293 UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
294 ThreadStatus::OP_UNKNOWN, kCompactionWaitTasks);
295
296 env->GetThreadList(&thread_list);
297 UpdateStatusCounts(thread_list, operation_counts, state_counts);
298 VerifyAndResetCounts(correct_operation_counts, operation_counts,
299 ThreadStatus::NUM_OP_TYPES);
300
301 // terminate flush-write tasks and see if the thread-status
302 // reflects this update
303 flush_write_task.FinishAllTasks();
304 flush_write_task.WaitUntilDone();
305 UpdateCount(correct_operation_counts, ThreadStatus::OP_FLUSH,
306 ThreadStatus::OP_UNKNOWN, kFlushWriteTasks);
307
308 env->GetThreadList(&thread_list);
309 UpdateStatusCounts(thread_list, operation_counts, state_counts);
310 VerifyAndResetCounts(correct_operation_counts, operation_counts,
311 ThreadStatus::NUM_OP_TYPES);
312
313 // terminate compaction-write tasks and see if the thread-status
314 // reflects this update
315 compaction_write_task.FinishAllTasks();
316 compaction_write_task.WaitUntilDone();
317 UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
318 ThreadStatus::OP_UNKNOWN, kCompactionWriteTasks);
319
320 env->GetThreadList(&thread_list);
321 UpdateStatusCounts(thread_list, operation_counts, state_counts);
322 VerifyAndResetCounts(correct_operation_counts, operation_counts,
323 ThreadStatus::NUM_OP_TYPES);
324
325 // terminate compaction-write tasks and see if the thread-status
326 // reflects this update
327 compaction_read_task.FinishAllTasks();
328 compaction_read_task.WaitUntilDone();
329 UpdateCount(correct_operation_counts, ThreadStatus::OP_COMPACTION,
330 ThreadStatus::OP_UNKNOWN, kCompactionReadTasks);
331
332 env->GetThreadList(&thread_list);
333 UpdateStatusCounts(thread_list, operation_counts, state_counts);
334 VerifyAndResetCounts(correct_operation_counts, operation_counts,
335 ThreadStatus::NUM_OP_TYPES);
336 }
337
338 } // namespace rocksdb
339
340 int main(int argc, char** argv) {
341 ::testing::InitGoogleTest(&argc, argv);
342 return RUN_ALL_TESTS();
343 }
344
345 #else
346
347 int main(int argc, char** argv) {
348 ::testing::InitGoogleTest(&argc, argv);
349 return 0;
350 }
351
352 #endif // ROCKSDB_USING_THREAD_STATUS