1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "monitoring/thread_status_updater.h"
10 #include "port/likely.h"
11 #include "rocksdb/env.h"
12 #include "rocksdb/system_clock.h"
13 #include "util/mutexlock.h"
15 namespace ROCKSDB_NAMESPACE
{
17 #ifdef ROCKSDB_USING_THREAD_STATUS
19 thread_local ThreadStatusData
* ThreadStatusUpdater::thread_status_data_
=
22 void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType ttype
,
24 if (UNLIKELY(thread_status_data_
== nullptr)) {
25 thread_status_data_
= new ThreadStatusData();
26 thread_status_data_
->thread_type
= ttype
;
27 thread_status_data_
->thread_id
= thread_id
;
28 std::lock_guard
<std::mutex
> lck(thread_list_mutex_
);
29 thread_data_set_
.insert(thread_status_data_
);
32 ClearThreadOperationProperties();
35 void ThreadStatusUpdater::UnregisterThread() {
36 if (thread_status_data_
!= nullptr) {
37 std::lock_guard
<std::mutex
> lck(thread_list_mutex_
);
38 thread_data_set_
.erase(thread_status_data_
);
39 delete thread_status_data_
;
40 thread_status_data_
= nullptr;
44 void ThreadStatusUpdater::ResetThreadStatus() {
46 ClearThreadOperation();
47 SetColumnFamilyInfoKey(nullptr);
50 void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* cf_key
) {
52 if (data
== nullptr) {
55 // set the tracking flag based on whether cf_key is non-null or not.
56 // If enable_thread_tracking is set to false, the input cf_key
58 data
->enable_tracking
= (cf_key
!= nullptr);
59 data
->cf_key
.store(const_cast<void*>(cf_key
), std::memory_order_relaxed
);
62 const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() {
63 auto* data
= GetLocalThreadStatus();
64 if (data
== nullptr) {
67 return data
->cf_key
.load(std::memory_order_relaxed
);
70 void ThreadStatusUpdater::SetThreadOperation(
71 const ThreadStatus::OperationType type
) {
72 auto* data
= GetLocalThreadStatus();
73 if (data
== nullptr) {
76 // NOTE: Our practice here is to set all the thread operation properties
77 // and stage before we set thread operation, and thread operation
78 // will be set in std::memory_order_release. This is to ensure
79 // whenever a thread operation is not OP_UNKNOWN, we will always
80 // have a consistent information on its properties.
81 data
->operation_type
.store(type
, std::memory_order_release
);
82 if (type
== ThreadStatus::OP_UNKNOWN
) {
83 data
->operation_stage
.store(ThreadStatus::STAGE_UNKNOWN
,
84 std::memory_order_relaxed
);
85 ClearThreadOperationProperties();
89 void ThreadStatusUpdater::SetThreadOperationProperty(int i
, uint64_t value
) {
90 auto* data
= GetLocalThreadStatus();
91 if (data
== nullptr) {
94 data
->op_properties
[i
].store(value
, std::memory_order_relaxed
);
97 void ThreadStatusUpdater::IncreaseThreadOperationProperty(int i
,
99 auto* data
= GetLocalThreadStatus();
100 if (data
== nullptr) {
103 data
->op_properties
[i
].fetch_add(delta
, std::memory_order_relaxed
);
106 void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time
) {
107 auto* data
= GetLocalThreadStatus();
108 if (data
== nullptr) {
111 data
->op_start_time
.store(start_time
, std::memory_order_relaxed
);
114 void ThreadStatusUpdater::ClearThreadOperation() {
115 auto* data
= GetLocalThreadStatus();
116 if (data
== nullptr) {
119 data
->operation_stage
.store(ThreadStatus::STAGE_UNKNOWN
,
120 std::memory_order_relaxed
);
121 data
->operation_type
.store(ThreadStatus::OP_UNKNOWN
,
122 std::memory_order_relaxed
);
123 ClearThreadOperationProperties();
126 void ThreadStatusUpdater::ClearThreadOperationProperties() {
127 auto* data
= GetLocalThreadStatus();
128 if (data
== nullptr) {
131 for (int i
= 0; i
< ThreadStatus::kNumOperationProperties
; ++i
) {
132 data
->op_properties
[i
].store(0, std::memory_order_relaxed
);
136 ThreadStatus::OperationStage
ThreadStatusUpdater::SetThreadOperationStage(
137 ThreadStatus::OperationStage stage
) {
138 auto* data
= GetLocalThreadStatus();
139 if (data
== nullptr) {
140 return ThreadStatus::STAGE_UNKNOWN
;
142 return data
->operation_stage
.exchange(stage
, std::memory_order_relaxed
);
145 void ThreadStatusUpdater::SetThreadState(const ThreadStatus::StateType type
) {
146 auto* data
= GetLocalThreadStatus();
147 if (data
== nullptr) {
150 data
->state_type
.store(type
, std::memory_order_relaxed
);
153 void ThreadStatusUpdater::ClearThreadState() {
154 auto* data
= GetLocalThreadStatus();
155 if (data
== nullptr) {
158 data
->state_type
.store(ThreadStatus::STATE_UNKNOWN
,
159 std::memory_order_relaxed
);
162 Status
ThreadStatusUpdater::GetThreadList(
163 std::vector
<ThreadStatus
>* thread_list
) {
164 thread_list
->clear();
165 std::vector
<std::shared_ptr
<ThreadStatusData
>> valid_list
;
166 uint64_t now_micros
= SystemClock::Default()->NowMicros();
168 std::lock_guard
<std::mutex
> lck(thread_list_mutex_
);
169 for (auto* thread_data
: thread_data_set_
) {
171 auto thread_id
= thread_data
->thread_id
.load(std::memory_order_relaxed
);
172 auto thread_type
= thread_data
->thread_type
.load(std::memory_order_relaxed
);
173 // Since any change to cf_info_map requires thread_list_mutex,
174 // which is currently held by GetThreadList(), here we can safely
175 // use "memory_order_relaxed" to load the cf_key.
176 auto cf_key
= thread_data
->cf_key
.load(std::memory_order_relaxed
);
178 ThreadStatus::OperationType op_type
= ThreadStatus::OP_UNKNOWN
;
179 ThreadStatus::OperationStage op_stage
= ThreadStatus::STAGE_UNKNOWN
;
180 ThreadStatus::StateType state_type
= ThreadStatus::STATE_UNKNOWN
;
181 uint64_t op_elapsed_micros
= 0;
182 uint64_t op_props
[ThreadStatus::kNumOperationProperties
] = {0};
184 auto iter
= cf_info_map_
.find(cf_key
);
185 if (iter
!= cf_info_map_
.end()) {
186 op_type
= thread_data
->operation_type
.load(std::memory_order_acquire
);
187 // display lower-level info only when higher-level info is available.
188 if (op_type
!= ThreadStatus::OP_UNKNOWN
) {
189 op_elapsed_micros
= now_micros
- thread_data
->op_start_time
.load(
190 std::memory_order_relaxed
);
191 op_stage
= thread_data
->operation_stage
.load(std::memory_order_relaxed
);
192 state_type
= thread_data
->state_type
.load(std::memory_order_relaxed
);
193 for (int i
= 0; i
< ThreadStatus::kNumOperationProperties
; ++i
) {
195 thread_data
->op_properties
[i
].load(std::memory_order_relaxed
);
200 thread_list
->emplace_back(
201 thread_id
, thread_type
,
202 iter
!= cf_info_map_
.end() ? iter
->second
.db_name
: "",
203 iter
!= cf_info_map_
.end() ? iter
->second
.cf_name
: "", op_type
,
204 op_elapsed_micros
, op_stage
, op_props
, state_type
);
210 ThreadStatusData
* ThreadStatusUpdater::GetLocalThreadStatus() {
211 if (thread_status_data_
== nullptr) {
214 if (!thread_status_data_
->enable_tracking
) {
215 assert(thread_status_data_
->cf_key
.load(std::memory_order_relaxed
) ==
219 return thread_status_data_
;
222 void ThreadStatusUpdater::NewColumnFamilyInfo(const void* db_key
,
223 const std::string
& db_name
,
225 const std::string
& cf_name
) {
226 // Acquiring same lock as GetThreadList() to guarantee
227 // a consistent view of global column family table (cf_info_map).
228 std::lock_guard
<std::mutex
> lck(thread_list_mutex_
);
230 cf_info_map_
.emplace(std::piecewise_construct
, std::make_tuple(cf_key
),
231 std::make_tuple(db_key
, db_name
, cf_name
));
232 db_key_map_
[db_key
].insert(cf_key
);
235 void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key
) {
236 // Acquiring same lock as GetThreadList() to guarantee
237 // a consistent view of global column family table (cf_info_map).
238 std::lock_guard
<std::mutex
> lck(thread_list_mutex_
);
240 auto cf_pair
= cf_info_map_
.find(cf_key
);
241 if (cf_pair
!= cf_info_map_
.end()) {
242 // Remove its entry from db_key_map_ by the following steps:
243 // 1. Obtain the entry in db_key_map_ whose set contains cf_key
244 // 2. Remove it from the set.
245 ConstantColumnFamilyInfo
& cf_info
= cf_pair
->second
;
246 auto db_pair
= db_key_map_
.find(cf_info
.db_key
);
247 assert(db_pair
!= db_key_map_
.end());
248 size_t result
__attribute__((__unused__
));
249 result
= db_pair
->second
.erase(cf_key
);
251 cf_info_map_
.erase(cf_pair
);
255 void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key
) {
256 // Acquiring same lock as GetThreadList() to guarantee
257 // a consistent view of global column family table (cf_info_map).
258 std::lock_guard
<std::mutex
> lck(thread_list_mutex_
);
259 auto db_pair
= db_key_map_
.find(db_key
);
260 if (UNLIKELY(db_pair
== db_key_map_
.end())) {
261 // In some occasional cases such as DB::Open fails, we won't
262 // register ColumnFamilyInfo for a db.
266 for (auto cf_key
: db_pair
->second
) {
267 auto cf_pair
= cf_info_map_
.find(cf_key
);
268 if (cf_pair
!= cf_info_map_
.end()) {
269 cf_info_map_
.erase(cf_pair
);
272 db_key_map_
.erase(db_key
);
277 void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType
/*ttype*/,
278 uint64_t /*thread_id*/) {}
280 void ThreadStatusUpdater::UnregisterThread() {}
282 void ThreadStatusUpdater::ResetThreadStatus() {}
284 void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* /*cf_key*/) {}
286 void ThreadStatusUpdater::SetThreadOperation(
287 const ThreadStatus::OperationType
/*type*/) {}
289 void ThreadStatusUpdater::ClearThreadOperation() {}
291 void ThreadStatusUpdater::SetThreadState(
292 const ThreadStatus::StateType
/*type*/) {}
294 void ThreadStatusUpdater::ClearThreadState() {}
296 Status
ThreadStatusUpdater::GetThreadList(
297 std::vector
<ThreadStatus
>* /*thread_list*/) {
298 return Status::NotSupported(
299 "GetThreadList is not supported in the current running environment.");
302 void ThreadStatusUpdater::NewColumnFamilyInfo(const void* /*db_key*/,
303 const std::string
& /*db_name*/,
304 const void* /*cf_key*/,
305 const std::string
& /*cf_name*/) {}
307 void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* /*cf_key*/) {}
309 void ThreadStatusUpdater::EraseDatabaseInfo(const void* /*db_key*/) {}
311 void ThreadStatusUpdater::SetThreadOperationProperty(int /*i*/,
312 uint64_t /*value*/) {}
314 void ThreadStatusUpdater::IncreaseThreadOperationProperty(int /*i*/,
315 uint64_t /*delta*/) {}
317 #endif // ROCKSDB_USING_THREAD_STATUS
318 } // namespace ROCKSDB_NAMESPACE