1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 #include "monitoring/thread_status_updater.h"
8 #include "port/likely.h"
9 #include "rocksdb/env.h"
10 #include "util/mutexlock.h"
14 #ifdef ROCKSDB_USING_THREAD_STATUS
16 __thread ThreadStatusData
* ThreadStatusUpdater::thread_status_data_
= nullptr;
18 void ThreadStatusUpdater::RegisterThread(
19 ThreadStatus::ThreadType ttype
, uint64_t thread_id
) {
20 if (UNLIKELY(thread_status_data_
== nullptr)) {
21 thread_status_data_
= new ThreadStatusData();
22 thread_status_data_
->thread_type
= ttype
;
23 thread_status_data_
->thread_id
= thread_id
;
24 std::lock_guard
<std::mutex
> lck(thread_list_mutex_
);
25 thread_data_set_
.insert(thread_status_data_
);
28 ClearThreadOperationProperties();
31 void ThreadStatusUpdater::UnregisterThread() {
32 if (thread_status_data_
!= nullptr) {
33 std::lock_guard
<std::mutex
> lck(thread_list_mutex_
);
34 thread_data_set_
.erase(thread_status_data_
);
35 delete thread_status_data_
;
36 thread_status_data_
= nullptr;
40 void ThreadStatusUpdater::ResetThreadStatus() {
42 ClearThreadOperation();
43 SetColumnFamilyInfoKey(nullptr);
46 void ThreadStatusUpdater::SetColumnFamilyInfoKey(
49 if (data
== nullptr) {
52 // set the tracking flag based on whether cf_key is non-null or not.
53 // If enable_thread_tracking is set to false, the input cf_key
55 data
->enable_tracking
= (cf_key
!= nullptr);
56 data
->cf_key
.store(const_cast<void*>(cf_key
), std::memory_order_relaxed
);
59 const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() {
60 auto* data
= GetLocalThreadStatus();
61 if (data
== nullptr) {
64 return data
->cf_key
.load(std::memory_order_relaxed
);
67 void ThreadStatusUpdater::SetThreadOperation(
68 const ThreadStatus::OperationType type
) {
69 auto* data
= GetLocalThreadStatus();
70 if (data
== nullptr) {
73 // NOTE: Our practice here is to set all the thread operation properties
74 // and stage before we set thread operation, and thread operation
75 // will be set in std::memory_order_release. This is to ensure
76 // whenever a thread operation is not OP_UNKNOWN, we will always
77 // have a consistent information on its properties.
78 data
->operation_type
.store(type
, std::memory_order_release
);
79 if (type
== ThreadStatus::OP_UNKNOWN
) {
80 data
->operation_stage
.store(ThreadStatus::STAGE_UNKNOWN
,
81 std::memory_order_relaxed
);
82 ClearThreadOperationProperties();
86 void ThreadStatusUpdater::SetThreadOperationProperty(
87 int i
, uint64_t value
) {
88 auto* data
= GetLocalThreadStatus();
89 if (data
== nullptr) {
92 data
->op_properties
[i
].store(value
, std::memory_order_relaxed
);
95 void ThreadStatusUpdater::IncreaseThreadOperationProperty(
96 int i
, uint64_t delta
) {
97 auto* data
= GetLocalThreadStatus();
98 if (data
== nullptr) {
101 data
->op_properties
[i
].fetch_add(delta
, std::memory_order_relaxed
);
104 void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time
) {
105 auto* data
= GetLocalThreadStatus();
106 if (data
== nullptr) {
109 data
->op_start_time
.store(start_time
, std::memory_order_relaxed
);
112 void ThreadStatusUpdater::ClearThreadOperation() {
113 auto* data
= GetLocalThreadStatus();
114 if (data
== nullptr) {
117 data
->operation_stage
.store(ThreadStatus::STAGE_UNKNOWN
,
118 std::memory_order_relaxed
);
119 data
->operation_type
.store(
120 ThreadStatus::OP_UNKNOWN
, std::memory_order_relaxed
);
121 ClearThreadOperationProperties();
124 void ThreadStatusUpdater::ClearThreadOperationProperties() {
125 auto* data
= GetLocalThreadStatus();
126 if (data
== nullptr) {
129 for (int i
= 0; i
< ThreadStatus::kNumOperationProperties
; ++i
) {
130 data
->op_properties
[i
].store(0, std::memory_order_relaxed
);
134 ThreadStatus::OperationStage
ThreadStatusUpdater::SetThreadOperationStage(
135 ThreadStatus::OperationStage stage
) {
136 auto* data
= GetLocalThreadStatus();
137 if (data
== nullptr) {
138 return ThreadStatus::STAGE_UNKNOWN
;
140 return data
->operation_stage
.exchange(
141 stage
, std::memory_order_relaxed
);
144 void ThreadStatusUpdater::SetThreadState(
145 const ThreadStatus::StateType type
) {
146 auto* data
= GetLocalThreadStatus();
147 if (data
== nullptr) {
150 data
->state_type
.store(type
, std::memory_order_relaxed
);
153 void ThreadStatusUpdater::ClearThreadState() {
154 auto* data
= GetLocalThreadStatus();
155 if (data
== nullptr) {
158 data
->state_type
.store(
159 ThreadStatus::STATE_UNKNOWN
, std::memory_order_relaxed
);
162 Status
ThreadStatusUpdater::GetThreadList(
163 std::vector
<ThreadStatus
>* thread_list
) {
164 thread_list
->clear();
165 std::vector
<std::shared_ptr
<ThreadStatusData
>> valid_list
;
166 uint64_t now_micros
= Env::Default()->NowMicros();
168 std::lock_guard
<std::mutex
> lck(thread_list_mutex_
);
169 for (auto* thread_data
: thread_data_set_
) {
171 auto thread_id
= thread_data
->thread_id
.load(
172 std::memory_order_relaxed
);
173 auto thread_type
= thread_data
->thread_type
.load(
174 std::memory_order_relaxed
);
175 // Since any change to cf_info_map requires thread_list_mutex,
176 // which is currently held by GetThreadList(), here we can safely
177 // use "memory_order_relaxed" to load the cf_key.
178 auto cf_key
= thread_data
->cf_key
.load(
179 std::memory_order_relaxed
);
180 auto iter
= cf_info_map_
.find(cf_key
);
181 auto* cf_info
= iter
!= cf_info_map_
.end() ?
182 iter
->second
.get() : nullptr;
183 const std::string
* db_name
= nullptr;
184 const std::string
* cf_name
= nullptr;
185 ThreadStatus::OperationType op_type
= ThreadStatus::OP_UNKNOWN
;
186 ThreadStatus::OperationStage op_stage
= ThreadStatus::STAGE_UNKNOWN
;
187 ThreadStatus::StateType state_type
= ThreadStatus::STATE_UNKNOWN
;
188 uint64_t op_elapsed_micros
= 0;
189 uint64_t op_props
[ThreadStatus::kNumOperationProperties
] = {0};
190 if (cf_info
!= nullptr) {
191 db_name
= &cf_info
->db_name
;
192 cf_name
= &cf_info
->cf_name
;
193 op_type
= thread_data
->operation_type
.load(
194 std::memory_order_acquire
);
195 // display lower-level info only when higher-level info is available.
196 if (op_type
!= ThreadStatus::OP_UNKNOWN
) {
197 op_elapsed_micros
= now_micros
- thread_data
->op_start_time
.load(
198 std::memory_order_relaxed
);
199 op_stage
= thread_data
->operation_stage
.load(
200 std::memory_order_relaxed
);
201 state_type
= thread_data
->state_type
.load(
202 std::memory_order_relaxed
);
203 for (int i
= 0; i
< ThreadStatus::kNumOperationProperties
; ++i
) {
204 op_props
[i
] = thread_data
->op_properties
[i
].load(
205 std::memory_order_relaxed
);
209 thread_list
->emplace_back(
210 thread_id
, thread_type
,
211 db_name
? *db_name
: "",
212 cf_name
? *cf_name
: "",
213 op_type
, op_elapsed_micros
, op_stage
, op_props
,
220 ThreadStatusData
* ThreadStatusUpdater::GetLocalThreadStatus() {
221 if (thread_status_data_
== nullptr) {
224 if (!thread_status_data_
->enable_tracking
) {
225 assert(thread_status_data_
->cf_key
.load(
226 std::memory_order_relaxed
) == nullptr);
229 return thread_status_data_
;
232 void ThreadStatusUpdater::NewColumnFamilyInfo(
233 const void* db_key
, const std::string
& db_name
,
234 const void* cf_key
, const std::string
& cf_name
) {
235 // Acquiring same lock as GetThreadList() to guarantee
236 // a consistent view of global column family table (cf_info_map).
237 std::lock_guard
<std::mutex
> lck(thread_list_mutex_
);
239 cf_info_map_
[cf_key
].reset(
240 new ConstantColumnFamilyInfo(db_key
, db_name
, cf_name
));
241 db_key_map_
[db_key
].insert(cf_key
);
244 void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key
) {
245 // Acquiring same lock as GetThreadList() to guarantee
246 // a consistent view of global column family table (cf_info_map).
247 std::lock_guard
<std::mutex
> lck(thread_list_mutex_
);
248 auto cf_pair
= cf_info_map_
.find(cf_key
);
249 if (cf_pair
== cf_info_map_
.end()) {
253 auto* cf_info
= cf_pair
->second
.get();
256 // Remove its entry from db_key_map_ by the following steps:
257 // 1. Obtain the entry in db_key_map_ whose set contains cf_key
258 // 2. Remove it from the set.
259 auto db_pair
= db_key_map_
.find(cf_info
->db_key
);
260 assert(db_pair
!= db_key_map_
.end());
261 size_t result
__attribute__((unused
)) = db_pair
->second
.erase(cf_key
);
264 cf_pair
->second
.reset();
265 result
= cf_info_map_
.erase(cf_key
);
269 void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key
) {
270 // Acquiring same lock as GetThreadList() to guarantee
271 // a consistent view of global column family table (cf_info_map).
272 std::lock_guard
<std::mutex
> lck(thread_list_mutex_
);
273 auto db_pair
= db_key_map_
.find(db_key
);
274 if (UNLIKELY(db_pair
== db_key_map_
.end())) {
275 // In some occasional cases such as DB::Open fails, we won't
276 // register ColumnFamilyInfo for a db.
280 size_t result
__attribute__((unused
)) = 0;
281 for (auto cf_key
: db_pair
->second
) {
282 auto cf_pair
= cf_info_map_
.find(cf_key
);
283 if (cf_pair
== cf_info_map_
.end()) {
286 cf_pair
->second
.reset();
287 result
= cf_info_map_
.erase(cf_key
);
290 db_key_map_
.erase(db_key
);
295 void ThreadStatusUpdater::RegisterThread(
296 ThreadStatus::ThreadType ttype
, uint64_t thread_id
) {
299 void ThreadStatusUpdater::UnregisterThread() {
302 void ThreadStatusUpdater::ResetThreadStatus() {
305 void ThreadStatusUpdater::SetColumnFamilyInfoKey(
306 const void* cf_key
) {
309 void ThreadStatusUpdater::SetThreadOperation(
310 const ThreadStatus::OperationType type
) {
313 void ThreadStatusUpdater::ClearThreadOperation() {
316 void ThreadStatusUpdater::SetThreadState(
317 const ThreadStatus::StateType type
) {
320 void ThreadStatusUpdater::ClearThreadState() {
323 Status
ThreadStatusUpdater::GetThreadList(
324 std::vector
<ThreadStatus
>* thread_list
) {
325 return Status::NotSupported(
326 "GetThreadList is not supported in the current running environment.");
329 void ThreadStatusUpdater::NewColumnFamilyInfo(
330 const void* db_key
, const std::string
& db_name
,
331 const void* cf_key
, const std::string
& cf_name
) {
334 void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key
) {
337 void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key
) {
340 void ThreadStatusUpdater::SetThreadOperationProperty(
341 int i
, uint64_t value
) {
344 void ThreadStatusUpdater::IncreaseThreadOperationProperty(
345 int i
, uint64_t delta
) {
348 #endif // ROCKSDB_USING_THREAD_STATUS
349 } // namespace rocksdb