]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/monitoring/thread_status_updater.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / monitoring / thread_status_updater.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "monitoring/thread_status_updater.h"
7
8 #include <memory>
9
10 #include "port/likely.h"
11 #include "rocksdb/env.h"
12 #include "rocksdb/system_clock.h"
13 #include "util/mutexlock.h"
14
15 namespace ROCKSDB_NAMESPACE {
16
17 #ifdef ROCKSDB_USING_THREAD_STATUS
18
19 thread_local ThreadStatusData* ThreadStatusUpdater::thread_status_data_ =
20 nullptr;
21
22 void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType ttype,
23 uint64_t thread_id) {
24 if (UNLIKELY(thread_status_data_ == nullptr)) {
25 thread_status_data_ = new ThreadStatusData();
26 thread_status_data_->thread_type = ttype;
27 thread_status_data_->thread_id = thread_id;
28 std::lock_guard<std::mutex> lck(thread_list_mutex_);
29 thread_data_set_.insert(thread_status_data_);
30 }
31
32 ClearThreadOperationProperties();
33 }
34
35 void ThreadStatusUpdater::UnregisterThread() {
36 if (thread_status_data_ != nullptr) {
37 std::lock_guard<std::mutex> lck(thread_list_mutex_);
38 thread_data_set_.erase(thread_status_data_);
39 delete thread_status_data_;
40 thread_status_data_ = nullptr;
41 }
42 }
43
44 void ThreadStatusUpdater::ResetThreadStatus() {
45 ClearThreadState();
46 ClearThreadOperation();
47 SetColumnFamilyInfoKey(nullptr);
48 }
49
50 void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* cf_key) {
51 auto* data = Get();
52 if (data == nullptr) {
53 return;
54 }
55 // set the tracking flag based on whether cf_key is non-null or not.
56 // If enable_thread_tracking is set to false, the input cf_key
57 // would be nullptr.
58 data->enable_tracking = (cf_key != nullptr);
59 data->cf_key.store(const_cast<void*>(cf_key), std::memory_order_relaxed);
60 }
61
62 const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() {
63 auto* data = GetLocalThreadStatus();
64 if (data == nullptr) {
65 return nullptr;
66 }
67 return data->cf_key.load(std::memory_order_relaxed);
68 }
69
70 void ThreadStatusUpdater::SetThreadOperation(
71 const ThreadStatus::OperationType type) {
72 auto* data = GetLocalThreadStatus();
73 if (data == nullptr) {
74 return;
75 }
76 // NOTE: Our practice here is to set all the thread operation properties
77 // and stage before we set thread operation, and thread operation
78 // will be set in std::memory_order_release. This is to ensure
79 // whenever a thread operation is not OP_UNKNOWN, we will always
80 // have a consistent information on its properties.
81 data->operation_type.store(type, std::memory_order_release);
82 if (type == ThreadStatus::OP_UNKNOWN) {
83 data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
84 std::memory_order_relaxed);
85 ClearThreadOperationProperties();
86 }
87 }
88
89 void ThreadStatusUpdater::SetThreadOperationProperty(int i, uint64_t value) {
90 auto* data = GetLocalThreadStatus();
91 if (data == nullptr) {
92 return;
93 }
94 data->op_properties[i].store(value, std::memory_order_relaxed);
95 }
96
97 void ThreadStatusUpdater::IncreaseThreadOperationProperty(int i,
98 uint64_t delta) {
99 auto* data = GetLocalThreadStatus();
100 if (data == nullptr) {
101 return;
102 }
103 data->op_properties[i].fetch_add(delta, std::memory_order_relaxed);
104 }
105
106 void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) {
107 auto* data = GetLocalThreadStatus();
108 if (data == nullptr) {
109 return;
110 }
111 data->op_start_time.store(start_time, std::memory_order_relaxed);
112 }
113
114 void ThreadStatusUpdater::ClearThreadOperation() {
115 auto* data = GetLocalThreadStatus();
116 if (data == nullptr) {
117 return;
118 }
119 data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
120 std::memory_order_relaxed);
121 data->operation_type.store(ThreadStatus::OP_UNKNOWN,
122 std::memory_order_relaxed);
123 ClearThreadOperationProperties();
124 }
125
126 void ThreadStatusUpdater::ClearThreadOperationProperties() {
127 auto* data = GetLocalThreadStatus();
128 if (data == nullptr) {
129 return;
130 }
131 for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
132 data->op_properties[i].store(0, std::memory_order_relaxed);
133 }
134 }
135
136 ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage(
137 ThreadStatus::OperationStage stage) {
138 auto* data = GetLocalThreadStatus();
139 if (data == nullptr) {
140 return ThreadStatus::STAGE_UNKNOWN;
141 }
142 return data->operation_stage.exchange(stage, std::memory_order_relaxed);
143 }
144
145 void ThreadStatusUpdater::SetThreadState(const ThreadStatus::StateType type) {
146 auto* data = GetLocalThreadStatus();
147 if (data == nullptr) {
148 return;
149 }
150 data->state_type.store(type, std::memory_order_relaxed);
151 }
152
153 void ThreadStatusUpdater::ClearThreadState() {
154 auto* data = GetLocalThreadStatus();
155 if (data == nullptr) {
156 return;
157 }
158 data->state_type.store(ThreadStatus::STATE_UNKNOWN,
159 std::memory_order_relaxed);
160 }
161
162 Status ThreadStatusUpdater::GetThreadList(
163 std::vector<ThreadStatus>* thread_list) {
164 thread_list->clear();
165 std::vector<std::shared_ptr<ThreadStatusData>> valid_list;
166 uint64_t now_micros = SystemClock::Default()->NowMicros();
167
168 std::lock_guard<std::mutex> lck(thread_list_mutex_);
169 for (auto* thread_data : thread_data_set_) {
170 assert(thread_data);
171 auto thread_id = thread_data->thread_id.load(std::memory_order_relaxed);
172 auto thread_type = thread_data->thread_type.load(std::memory_order_relaxed);
173 // Since any change to cf_info_map requires thread_list_mutex,
174 // which is currently held by GetThreadList(), here we can safely
175 // use "memory_order_relaxed" to load the cf_key.
176 auto cf_key = thread_data->cf_key.load(std::memory_order_relaxed);
177
178 ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN;
179 ThreadStatus::OperationStage op_stage = ThreadStatus::STAGE_UNKNOWN;
180 ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN;
181 uint64_t op_elapsed_micros = 0;
182 uint64_t op_props[ThreadStatus::kNumOperationProperties] = {0};
183
184 auto iter = cf_info_map_.find(cf_key);
185 if (iter != cf_info_map_.end()) {
186 op_type = thread_data->operation_type.load(std::memory_order_acquire);
187 // display lower-level info only when higher-level info is available.
188 if (op_type != ThreadStatus::OP_UNKNOWN) {
189 op_elapsed_micros = now_micros - thread_data->op_start_time.load(
190 std::memory_order_relaxed);
191 op_stage = thread_data->operation_stage.load(std::memory_order_relaxed);
192 state_type = thread_data->state_type.load(std::memory_order_relaxed);
193 for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
194 op_props[i] =
195 thread_data->op_properties[i].load(std::memory_order_relaxed);
196 }
197 }
198 }
199
200 thread_list->emplace_back(
201 thread_id, thread_type,
202 iter != cf_info_map_.end() ? iter->second.db_name : "",
203 iter != cf_info_map_.end() ? iter->second.cf_name : "", op_type,
204 op_elapsed_micros, op_stage, op_props, state_type);
205 }
206
207 return Status::OK();
208 }
209
210 ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() {
211 if (thread_status_data_ == nullptr) {
212 return nullptr;
213 }
214 if (!thread_status_data_->enable_tracking) {
215 assert(thread_status_data_->cf_key.load(std::memory_order_relaxed) ==
216 nullptr);
217 return nullptr;
218 }
219 return thread_status_data_;
220 }
221
222 void ThreadStatusUpdater::NewColumnFamilyInfo(const void* db_key,
223 const std::string& db_name,
224 const void* cf_key,
225 const std::string& cf_name) {
226 // Acquiring same lock as GetThreadList() to guarantee
227 // a consistent view of global column family table (cf_info_map).
228 std::lock_guard<std::mutex> lck(thread_list_mutex_);
229
230 cf_info_map_.emplace(std::piecewise_construct, std::make_tuple(cf_key),
231 std::make_tuple(db_key, db_name, cf_name));
232 db_key_map_[db_key].insert(cf_key);
233 }
234
235 void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) {
236 // Acquiring same lock as GetThreadList() to guarantee
237 // a consistent view of global column family table (cf_info_map).
238 std::lock_guard<std::mutex> lck(thread_list_mutex_);
239
240 auto cf_pair = cf_info_map_.find(cf_key);
241 if (cf_pair != cf_info_map_.end()) {
242 // Remove its entry from db_key_map_ by the following steps:
243 // 1. Obtain the entry in db_key_map_ whose set contains cf_key
244 // 2. Remove it from the set.
245 ConstantColumnFamilyInfo& cf_info = cf_pair->second;
246 auto db_pair = db_key_map_.find(cf_info.db_key);
247 assert(db_pair != db_key_map_.end());
248 size_t result __attribute__((__unused__));
249 result = db_pair->second.erase(cf_key);
250 assert(result);
251 cf_info_map_.erase(cf_pair);
252 }
253 }
254
255 void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) {
256 // Acquiring same lock as GetThreadList() to guarantee
257 // a consistent view of global column family table (cf_info_map).
258 std::lock_guard<std::mutex> lck(thread_list_mutex_);
259 auto db_pair = db_key_map_.find(db_key);
260 if (UNLIKELY(db_pair == db_key_map_.end())) {
261 // In some occasional cases such as DB::Open fails, we won't
262 // register ColumnFamilyInfo for a db.
263 return;
264 }
265
266 for (auto cf_key : db_pair->second) {
267 auto cf_pair = cf_info_map_.find(cf_key);
268 if (cf_pair != cf_info_map_.end()) {
269 cf_info_map_.erase(cf_pair);
270 }
271 }
272 db_key_map_.erase(db_key);
273 }
274
275 #else
276
277 void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType /*ttype*/,
278 uint64_t /*thread_id*/) {}
279
280 void ThreadStatusUpdater::UnregisterThread() {}
281
282 void ThreadStatusUpdater::ResetThreadStatus() {}
283
284 void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* /*cf_key*/) {}
285
286 void ThreadStatusUpdater::SetThreadOperation(
287 const ThreadStatus::OperationType /*type*/) {}
288
289 void ThreadStatusUpdater::ClearThreadOperation() {}
290
291 void ThreadStatusUpdater::SetThreadState(
292 const ThreadStatus::StateType /*type*/) {}
293
294 void ThreadStatusUpdater::ClearThreadState() {}
295
296 Status ThreadStatusUpdater::GetThreadList(
297 std::vector<ThreadStatus>* /*thread_list*/) {
298 return Status::NotSupported(
299 "GetThreadList is not supported in the current running environment.");
300 }
301
302 void ThreadStatusUpdater::NewColumnFamilyInfo(const void* /*db_key*/,
303 const std::string& /*db_name*/,
304 const void* /*cf_key*/,
305 const std::string& /*cf_name*/) {}
306
307 void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* /*cf_key*/) {}
308
309 void ThreadStatusUpdater::EraseDatabaseInfo(const void* /*db_key*/) {}
310
311 void ThreadStatusUpdater::SetThreadOperationProperty(int /*i*/,
312 uint64_t /*value*/) {}
313
314 void ThreadStatusUpdater::IncreaseThreadOperationProperty(int /*i*/,
315 uint64_t /*delta*/) {}
316
317 #endif // ROCKSDB_USING_THREAD_STATUS
318 } // namespace ROCKSDB_NAMESPACE