]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/monitoring/thread_status_updater.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / monitoring / thread_status_updater.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5
6 #include "monitoring/thread_status_updater.h"
7 #include <memory>
8 #include "port/likely.h"
9 #include "rocksdb/env.h"
10 #include "util/mutexlock.h"
11
12 namespace rocksdb {
13
14 #ifdef ROCKSDB_USING_THREAD_STATUS
15
16 __thread ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = nullptr;
17
18 void ThreadStatusUpdater::RegisterThread(
19 ThreadStatus::ThreadType ttype, uint64_t thread_id) {
20 if (UNLIKELY(thread_status_data_ == nullptr)) {
21 thread_status_data_ = new ThreadStatusData();
22 thread_status_data_->thread_type = ttype;
23 thread_status_data_->thread_id = thread_id;
24 std::lock_guard<std::mutex> lck(thread_list_mutex_);
25 thread_data_set_.insert(thread_status_data_);
26 }
27
28 ClearThreadOperationProperties();
29 }
30
31 void ThreadStatusUpdater::UnregisterThread() {
32 if (thread_status_data_ != nullptr) {
33 std::lock_guard<std::mutex> lck(thread_list_mutex_);
34 thread_data_set_.erase(thread_status_data_);
35 delete thread_status_data_;
36 thread_status_data_ = nullptr;
37 }
38 }
39
40 void ThreadStatusUpdater::ResetThreadStatus() {
41 ClearThreadState();
42 ClearThreadOperation();
43 SetColumnFamilyInfoKey(nullptr);
44 }
45
46 void ThreadStatusUpdater::SetColumnFamilyInfoKey(
47 const void* cf_key) {
48 auto* data = Get();
49 if (data == nullptr) {
50 return;
51 }
52 // set the tracking flag based on whether cf_key is non-null or not.
53 // If enable_thread_tracking is set to false, the input cf_key
54 // would be nullptr.
55 data->enable_tracking = (cf_key != nullptr);
56 data->cf_key.store(const_cast<void*>(cf_key), std::memory_order_relaxed);
57 }
58
59 const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() {
60 auto* data = GetLocalThreadStatus();
61 if (data == nullptr) {
62 return nullptr;
63 }
64 return data->cf_key.load(std::memory_order_relaxed);
65 }
66
67 void ThreadStatusUpdater::SetThreadOperation(
68 const ThreadStatus::OperationType type) {
69 auto* data = GetLocalThreadStatus();
70 if (data == nullptr) {
71 return;
72 }
73 // NOTE: Our practice here is to set all the thread operation properties
74 // and stage before we set thread operation, and thread operation
75 // will be set in std::memory_order_release. This is to ensure
76 // whenever a thread operation is not OP_UNKNOWN, we will always
77 // have a consistent information on its properties.
78 data->operation_type.store(type, std::memory_order_release);
79 if (type == ThreadStatus::OP_UNKNOWN) {
80 data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
81 std::memory_order_relaxed);
82 ClearThreadOperationProperties();
83 }
84 }
85
86 void ThreadStatusUpdater::SetThreadOperationProperty(
87 int i, uint64_t value) {
88 auto* data = GetLocalThreadStatus();
89 if (data == nullptr) {
90 return;
91 }
92 data->op_properties[i].store(value, std::memory_order_relaxed);
93 }
94
95 void ThreadStatusUpdater::IncreaseThreadOperationProperty(
96 int i, uint64_t delta) {
97 auto* data = GetLocalThreadStatus();
98 if (data == nullptr) {
99 return;
100 }
101 data->op_properties[i].fetch_add(delta, std::memory_order_relaxed);
102 }
103
104 void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) {
105 auto* data = GetLocalThreadStatus();
106 if (data == nullptr) {
107 return;
108 }
109 data->op_start_time.store(start_time, std::memory_order_relaxed);
110 }
111
112 void ThreadStatusUpdater::ClearThreadOperation() {
113 auto* data = GetLocalThreadStatus();
114 if (data == nullptr) {
115 return;
116 }
117 data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
118 std::memory_order_relaxed);
119 data->operation_type.store(
120 ThreadStatus::OP_UNKNOWN, std::memory_order_relaxed);
121 ClearThreadOperationProperties();
122 }
123
124 void ThreadStatusUpdater::ClearThreadOperationProperties() {
125 auto* data = GetLocalThreadStatus();
126 if (data == nullptr) {
127 return;
128 }
129 for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
130 data->op_properties[i].store(0, std::memory_order_relaxed);
131 }
132 }
133
134 ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage(
135 ThreadStatus::OperationStage stage) {
136 auto* data = GetLocalThreadStatus();
137 if (data == nullptr) {
138 return ThreadStatus::STAGE_UNKNOWN;
139 }
140 return data->operation_stage.exchange(
141 stage, std::memory_order_relaxed);
142 }
143
144 void ThreadStatusUpdater::SetThreadState(
145 const ThreadStatus::StateType type) {
146 auto* data = GetLocalThreadStatus();
147 if (data == nullptr) {
148 return;
149 }
150 data->state_type.store(type, std::memory_order_relaxed);
151 }
152
153 void ThreadStatusUpdater::ClearThreadState() {
154 auto* data = GetLocalThreadStatus();
155 if (data == nullptr) {
156 return;
157 }
158 data->state_type.store(
159 ThreadStatus::STATE_UNKNOWN, std::memory_order_relaxed);
160 }
161
162 Status ThreadStatusUpdater::GetThreadList(
163 std::vector<ThreadStatus>* thread_list) {
164 thread_list->clear();
165 std::vector<std::shared_ptr<ThreadStatusData>> valid_list;
166 uint64_t now_micros = Env::Default()->NowMicros();
167
168 std::lock_guard<std::mutex> lck(thread_list_mutex_);
169 for (auto* thread_data : thread_data_set_) {
170 assert(thread_data);
171 auto thread_id = thread_data->thread_id.load(
172 std::memory_order_relaxed);
173 auto thread_type = thread_data->thread_type.load(
174 std::memory_order_relaxed);
175 // Since any change to cf_info_map requires thread_list_mutex,
176 // which is currently held by GetThreadList(), here we can safely
177 // use "memory_order_relaxed" to load the cf_key.
178 auto cf_key = thread_data->cf_key.load(
179 std::memory_order_relaxed);
180 auto iter = cf_info_map_.find(cf_key);
181 auto* cf_info = iter != cf_info_map_.end() ?
182 iter->second.get() : nullptr;
183 const std::string* db_name = nullptr;
184 const std::string* cf_name = nullptr;
185 ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN;
186 ThreadStatus::OperationStage op_stage = ThreadStatus::STAGE_UNKNOWN;
187 ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN;
188 uint64_t op_elapsed_micros = 0;
189 uint64_t op_props[ThreadStatus::kNumOperationProperties] = {0};
190 if (cf_info != nullptr) {
191 db_name = &cf_info->db_name;
192 cf_name = &cf_info->cf_name;
193 op_type = thread_data->operation_type.load(
194 std::memory_order_acquire);
195 // display lower-level info only when higher-level info is available.
196 if (op_type != ThreadStatus::OP_UNKNOWN) {
197 op_elapsed_micros = now_micros - thread_data->op_start_time.load(
198 std::memory_order_relaxed);
199 op_stage = thread_data->operation_stage.load(
200 std::memory_order_relaxed);
201 state_type = thread_data->state_type.load(
202 std::memory_order_relaxed);
203 for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
204 op_props[i] = thread_data->op_properties[i].load(
205 std::memory_order_relaxed);
206 }
207 }
208 }
209 thread_list->emplace_back(
210 thread_id, thread_type,
211 db_name ? *db_name : "",
212 cf_name ? *cf_name : "",
213 op_type, op_elapsed_micros, op_stage, op_props,
214 state_type);
215 }
216
217 return Status::OK();
218 }
219
220 ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() {
221 if (thread_status_data_ == nullptr) {
222 return nullptr;
223 }
224 if (!thread_status_data_->enable_tracking) {
225 assert(thread_status_data_->cf_key.load(
226 std::memory_order_relaxed) == nullptr);
227 return nullptr;
228 }
229 return thread_status_data_;
230 }
231
232 void ThreadStatusUpdater::NewColumnFamilyInfo(
233 const void* db_key, const std::string& db_name,
234 const void* cf_key, const std::string& cf_name) {
235 // Acquiring same lock as GetThreadList() to guarantee
236 // a consistent view of global column family table (cf_info_map).
237 std::lock_guard<std::mutex> lck(thread_list_mutex_);
238
239 cf_info_map_[cf_key].reset(
240 new ConstantColumnFamilyInfo(db_key, db_name, cf_name));
241 db_key_map_[db_key].insert(cf_key);
242 }
243
244 void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) {
245 // Acquiring same lock as GetThreadList() to guarantee
246 // a consistent view of global column family table (cf_info_map).
247 std::lock_guard<std::mutex> lck(thread_list_mutex_);
248 auto cf_pair = cf_info_map_.find(cf_key);
249 if (cf_pair == cf_info_map_.end()) {
250 return;
251 }
252
253 auto* cf_info = cf_pair->second.get();
254 assert(cf_info);
255
256 // Remove its entry from db_key_map_ by the following steps:
257 // 1. Obtain the entry in db_key_map_ whose set contains cf_key
258 // 2. Remove it from the set.
259 auto db_pair = db_key_map_.find(cf_info->db_key);
260 assert(db_pair != db_key_map_.end());
261 size_t result __attribute__((unused)) = db_pair->second.erase(cf_key);
262 assert(result);
263
264 cf_pair->second.reset();
265 result = cf_info_map_.erase(cf_key);
266 assert(result);
267 }
268
269 void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) {
270 // Acquiring same lock as GetThreadList() to guarantee
271 // a consistent view of global column family table (cf_info_map).
272 std::lock_guard<std::mutex> lck(thread_list_mutex_);
273 auto db_pair = db_key_map_.find(db_key);
274 if (UNLIKELY(db_pair == db_key_map_.end())) {
275 // In some occasional cases such as DB::Open fails, we won't
276 // register ColumnFamilyInfo for a db.
277 return;
278 }
279
280 size_t result __attribute__((unused)) = 0;
281 for (auto cf_key : db_pair->second) {
282 auto cf_pair = cf_info_map_.find(cf_key);
283 if (cf_pair == cf_info_map_.end()) {
284 continue;
285 }
286 cf_pair->second.reset();
287 result = cf_info_map_.erase(cf_key);
288 assert(result);
289 }
290 db_key_map_.erase(db_key);
291 }
292
293 #else
294
295 void ThreadStatusUpdater::RegisterThread(
296 ThreadStatus::ThreadType ttype, uint64_t thread_id) {
297 }
298
299 void ThreadStatusUpdater::UnregisterThread() {
300 }
301
302 void ThreadStatusUpdater::ResetThreadStatus() {
303 }
304
305 void ThreadStatusUpdater::SetColumnFamilyInfoKey(
306 const void* cf_key) {
307 }
308
309 void ThreadStatusUpdater::SetThreadOperation(
310 const ThreadStatus::OperationType type) {
311 }
312
313 void ThreadStatusUpdater::ClearThreadOperation() {
314 }
315
316 void ThreadStatusUpdater::SetThreadState(
317 const ThreadStatus::StateType type) {
318 }
319
320 void ThreadStatusUpdater::ClearThreadState() {
321 }
322
323 Status ThreadStatusUpdater::GetThreadList(
324 std::vector<ThreadStatus>* thread_list) {
325 return Status::NotSupported(
326 "GetThreadList is not supported in the current running environment.");
327 }
328
329 void ThreadStatusUpdater::NewColumnFamilyInfo(
330 const void* db_key, const std::string& db_name,
331 const void* cf_key, const std::string& cf_name) {
332 }
333
334 void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) {
335 }
336
337 void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) {
338 }
339
340 void ThreadStatusUpdater::SetThreadOperationProperty(
341 int i, uint64_t value) {
342 }
343
344 void ThreadStatusUpdater::IncreaseThreadOperationProperty(
345 int i, uint64_t delta) {
346 }
347
348 #endif // ROCKSDB_USING_THREAD_STATUS
349 } // namespace rocksdb