8 from datetime
import datetime
, timedelta
9 from threading
import Condition
, Lock
, Thread
10 from typing
import cast
, Any
, Callable
, Dict
, List
, Optional
, Set
, Tuple
, Union
12 from .common
import (GLOBAL_POOL_KEY
, authorize_request
, extract_pool_key
,
13 get_rbd_pools
, PoolKeyT
)
15 QUERY_POOL_ID
= "pool_id"
16 QUERY_POOL_ID_MAP
= "pool_id_map"
17 QUERY_IDS
= "query_ids"
18 QUERY_SUM_POOL_COUNTERS
= "pool_counters"
19 QUERY_RAW_POOL_COUNTERS
= "raw_pool_counters"
20 QUERY_LAST_REQUEST
= "last_request"
22 OSD_PERF_QUERY_REGEX_MATCH_ALL
= '^(.*)$'
23 OSD_PERF_QUERY_COUNTERS
= ['write_ops',
29 OSD_PERF_QUERY_COUNTERS_INDICES
= {
30 OSD_PERF_QUERY_COUNTERS
[i
]: i
for i
in range(len(OSD_PERF_QUERY_COUNTERS
))}
32 OSD_PERF_QUERY_LATENCY_COUNTER_INDICES
= [4, 5]
33 OSD_PERF_QUERY_MAX_RESULTS
= 256
35 POOL_REFRESH_INTERVAL
= timedelta(minutes
=5)
36 QUERY_EXPIRE_INTERVAL
= timedelta(minutes
=1)
37 STATS_RATE_INTERVAL
= timedelta(minutes
=1)
39 REPORT_MAX_RESULTS
= 64
42 # {(pool_id, namespace)...}
43 ResolveImageNamesT
= Set
[Tuple
[int, str]]
46 PerfCounterT
= Tuple
[int, List
[int]]
48 RawImageCounterT
= Tuple
[PerfCounterT
, Optional
[PerfCounterT
]]
49 # image_id => perf_counter
50 RawImagesCounterT
= Dict
[str, RawImageCounterT
]
51 # namespace_counters => raw_images
52 RawNamespacesCountersT
= Dict
[str, RawImagesCounterT
]
53 # pool_id => namespaces_counters
54 RawPoolCountersT
= Dict
[int, RawNamespacesCountersT
]
56 SumImageCounterT
= List
[int]
57 # image_id => sum_image
58 SumImagesCounterT
= Dict
[str, SumImageCounterT
]
59 # namespace => sum_images
60 SumNamespacesCountersT
= Dict
[str, SumImagesCounterT
]
61 # pool_id, sum_namespaces
62 SumPoolCountersT
= Dict
[int, SumNamespacesCountersT
]
64 ExtractDataFuncT
= Callable
[[int, Optional
[RawImageCounterT
], SumImageCounterT
], float]
70 def prepare_regex(cls
, value
: Any
) -> str:
71 return '^({})$'.format(value
)
74 def prepare_osd_perf_query(cls
,
75 pool_id
: Optional
[int],
76 namespace
: Optional
[str],
77 counter_type
: str) -> Dict
[str, Any
]:
78 pool_id_regex
= OSD_PERF_QUERY_REGEX_MATCH_ALL
79 namespace_regex
= OSD_PERF_QUERY_REGEX_MATCH_ALL
81 pool_id_regex
= cls
.prepare_regex(pool_id
)
83 namespace_regex
= cls
.prepare_regex(namespace
)
87 {'type': 'pool_id', 'regex': pool_id_regex
},
88 {'type': 'namespace', 'regex': namespace_regex
},
89 {'type': 'object_name',
90 'regex': '^(?:rbd|journal)_data\\.(?:([0-9]+)\\.)?([^.]+)\\.'},
92 'performance_counter_descriptors': OSD_PERF_QUERY_COUNTERS
,
93 'limit': {'order_by': counter_type
,
94 'max_count': OSD_PERF_QUERY_MAX_RESULTS
},
98 def pool_spec_search_keys(cls
, pool_key
: str) -> List
[str]:
99 return [pool_key
[0:len(pool_key
) - x
]
100 for x
in range(0, len(pool_key
) + 1)]
103 def submatch_pool_key(cls
, pool_key
: PoolKeyT
, search_key
: str) -> bool:
104 return ((pool_key
[1] == search_key
[1] or not search_key
[1])
105 and (pool_key
[0] == search_key
[0] or not search_key
[0]))
107 def __init__(self
, module
: Any
) -> None:
108 self
.user_queries
: Dict
[PoolKeyT
, Dict
[str, Any
]] = {}
109 self
.image_cache
: Dict
[str, str] = {}
112 self
.query_condition
= Condition(self
.lock
)
113 self
.refresh_condition
= Condition(self
.lock
)
115 self
.image_name_cache
: Dict
[Tuple
[int, str], Dict
[str, str]] = {}
116 self
.image_name_refresh_time
= datetime
.fromtimestamp(0)
119 self
.log
= module
.log
121 self
.stop_thread
= False
122 self
.thread
= Thread(target
=self
.run
)
124 def setup(self
) -> None:
127 def shutdown(self
) -> None:
128 self
.log
.info("PerfHandler: shutting down")
129 self
.stop_thread
= True
130 if self
.thread
.is_alive():
131 self
.log
.debug("PerfHandler: joining thread")
133 self
.log
.info("PerfHandler: shut down")
135 def run(self
) -> None:
137 self
.log
.info("PerfHandler: starting")
138 while not self
.stop_thread
:
140 self
.scrub_expired_queries()
141 self
.process_raw_osd_perf_counters()
142 self
.refresh_condition
.notify()
144 stats_period
= self
.module
.get_ceph_option("mgr_stats_period")
145 self
.query_condition
.wait(stats_period
)
147 self
.log
.debug("PerfHandler: tick")
149 except (rados
.ConnectionShutdown
, rbd
.ConnectionShutdown
):
150 self
.log
.exception("PerfHandler: client blocklisted")
151 self
.module
.client_blocklisted
.set()
152 except Exception as ex
:
153 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
154 ex
, traceback
.format_exc()))
156 def merge_raw_osd_perf_counters(self
,
158 query
: Dict
[str, Any
],
160 resolve_image_names
: ResolveImageNamesT
) -> RawPoolCountersT
:
161 pool_id_map
= query
[QUERY_POOL_ID_MAP
]
163 # collect and combine the raw counters from all sort orders
164 raw_pool_counters
: Dict
[int, Dict
[str, Dict
[str, Any
]]] = query
.setdefault(QUERY_RAW_POOL_COUNTERS
, {})
165 for query_id
in query
[QUERY_IDS
]:
166 res
= self
.module
.get_osd_perf_counters(query_id
)
167 for counter
in res
['counters']:
168 # replace pool id from object name if it exists
170 pool_id
= int(k
[2][0]) if k
[2][0] else int(k
[0][0])
174 # ignore metrics from non-matching pools/namespaces
175 if pool_id
not in pool_id_map
:
177 if pool_key
[1] is not None and pool_key
[1] != namespace
:
180 # flag the pool (and namespace) for refresh if we cannot find
181 # image name in the cache
182 resolve_image_key
= (pool_id
, namespace
)
183 if image_id
not in self
.image_name_cache
.get(resolve_image_key
, {}):
184 resolve_image_names
.add(resolve_image_key
)
186 # copy the 'sum' counter values for each image (ignore count)
187 # if we haven't already processed it for this round
188 raw_namespaces
= raw_pool_counters
.setdefault(pool_id
, {})
189 raw_images
= raw_namespaces
.setdefault(namespace
, {})
190 raw_image
= raw_images
.get(image_id
)
191 # save the last two perf counters for each image
192 new_current
= (now_ts
, [int(x
[0]) for x
in counter
['c']])
194 old_current
, _
= raw_image
195 if old_current
[0] < now_ts
:
196 raw_images
[image_id
] = (new_current
, old_current
)
198 raw_images
[image_id
] = (new_current
, None)
200 self
.log
.debug("merge_raw_osd_perf_counters: {}".format(raw_pool_counters
))
201 return raw_pool_counters
203 def sum_osd_perf_counters(self
,
204 query
: Dict
[str, dict],
205 raw_pool_counters
: RawPoolCountersT
,
206 now_ts
: int) -> SumPoolCountersT
:
207 # update the cumulative counters for each image
208 sum_pool_counters
= query
.setdefault(QUERY_SUM_POOL_COUNTERS
, {})
209 for pool_id
, raw_namespaces
in raw_pool_counters
.items():
210 sum_namespaces
= sum_pool_counters
.setdefault(pool_id
, {})
211 for namespace
, raw_images
in raw_namespaces
.items():
212 sum_namespace
= sum_namespaces
.setdefault(namespace
, {})
213 for image_id
, raw_image
in raw_images
.items():
214 # zero-out non-updated raw counters
217 old_current
, _
= raw_image
218 if old_current
[0] < now_ts
:
219 new_current
= (now_ts
, [0] * len(old_current
[1]))
220 raw_images
[image_id
] = (new_current
, old_current
)
223 counters
= old_current
[1]
225 # copy raw counters if this is a newly discovered image or
226 # increment existing counters
227 sum_image
= sum_namespace
.setdefault(image_id
, None)
229 for i
in range(len(counters
)):
230 sum_image
[i
] += counters
[i
]
232 sum_namespace
[image_id
] = [x
for x
in counters
]
234 self
.log
.debug("sum_osd_perf_counters: {}".format(sum_pool_counters
))
235 return sum_pool_counters
237 def refresh_image_names(self
, resolve_image_names
: ResolveImageNamesT
) -> None:
238 for pool_id
, namespace
in resolve_image_names
:
239 image_key
= (pool_id
, namespace
)
240 images
= self
.image_name_cache
.setdefault(image_key
, {})
241 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
242 ioctx
.set_namespace(namespace
)
243 for image_meta
in rbd
.RBD().list2(ioctx
):
244 images
[image_meta
['id']] = image_meta
['name']
245 self
.log
.debug("resolve_image_names: {}={}".format(image_key
, images
))
247 def scrub_missing_images(self
) -> None:
248 for pool_key
, query
in self
.user_queries
.items():
249 raw_pool_counters
= query
.get(QUERY_RAW_POOL_COUNTERS
, {})
250 sum_pool_counters
= query
.get(QUERY_SUM_POOL_COUNTERS
, {})
251 for pool_id
, sum_namespaces
in sum_pool_counters
.items():
252 raw_namespaces
= raw_pool_counters
.get(pool_id
, {})
253 for namespace
, sum_images
in sum_namespaces
.items():
254 raw_images
= raw_namespaces
.get(namespace
, {})
256 image_key
= (pool_id
, namespace
)
257 image_names
= self
.image_name_cache
.get(image_key
, {})
258 for image_id
in list(sum_images
.keys()):
259 # scrub image counters if we failed to resolve image name
260 if image_id
not in image_names
:
261 self
.log
.debug("scrub_missing_images: dropping {}/{}".format(
262 image_key
, image_id
))
263 del sum_images
[image_id
]
264 if image_id
in raw_images
:
265 del raw_images
[image_id
]
267 def process_raw_osd_perf_counters(self
) -> None:
269 now_ts
= int(now
.strftime("%s"))
271 # clear the image name cache if we need to refresh all active pools
272 if self
.image_name_cache
and \
273 self
.image_name_refresh_time
+ POOL_REFRESH_INTERVAL
< now
:
274 self
.log
.debug("process_raw_osd_perf_counters: expiring image name cache")
275 self
.image_name_cache
= {}
277 resolve_image_names
: Set
[Tuple
[int, str]] = set()
278 for pool_key
, query
in self
.user_queries
.items():
279 if not query
[QUERY_IDS
]:
282 raw_pool_counters
= self
.merge_raw_osd_perf_counters(
283 pool_key
, query
, now_ts
, resolve_image_names
)
284 self
.sum_osd_perf_counters(query
, raw_pool_counters
, now_ts
)
286 if resolve_image_names
:
287 self
.image_name_refresh_time
= now
288 self
.refresh_image_names(resolve_image_names
)
289 self
.scrub_missing_images()
290 elif not self
.image_name_cache
:
291 self
.scrub_missing_images()
293 def resolve_pool_id(self
, pool_name
: str) -> int:
294 pool_id
= self
.module
.rados
.pool_lookup(pool_name
)
296 raise rados
.ObjectNotFound("Pool '{}' not found".format(pool_name
),
300 def scrub_expired_queries(self
) -> None:
301 # perf counters need to be periodically refreshed to continue
303 expire_time
= datetime
.now() - QUERY_EXPIRE_INTERVAL
304 for pool_key
in list(self
.user_queries
.keys()):
305 user_query
= self
.user_queries
[pool_key
]
306 if user_query
[QUERY_LAST_REQUEST
] < expire_time
:
307 self
.unregister_osd_perf_queries(pool_key
, user_query
[QUERY_IDS
])
308 del self
.user_queries
[pool_key
]
310 def register_osd_perf_queries(self
,
311 pool_id
: Optional
[int],
312 namespace
: Optional
[str]) -> List
[int]:
315 for counter
in OSD_PERF_QUERY_COUNTERS
:
316 query
= self
.prepare_osd_perf_query(pool_id
, namespace
, counter
)
317 self
.log
.debug("register_osd_perf_queries: {}".format(query
))
319 query_id
= self
.module
.add_osd_perf_query(query
)
321 raise RuntimeError('Failed to add OSD perf query: {}'.format(query
))
322 query_ids
.append(query_id
)
325 for query_id
in query_ids
:
326 self
.module
.remove_osd_perf_query(query_id
)
331 def unregister_osd_perf_queries(self
, pool_key
: PoolKeyT
, query_ids
: List
[int]) -> None:
332 self
.log
.info("unregister_osd_perf_queries: pool_key={}, query_ids={}".format(
333 pool_key
, query_ids
))
334 for query_id
in query_ids
:
335 self
.module
.remove_osd_perf_query(query_id
)
338 def register_query(self
, pool_key
: PoolKeyT
) -> Dict
[str, Any
]:
339 if pool_key
not in self
.user_queries
:
340 pool_name
, namespace
= pool_key
343 pool_id
= self
.resolve_pool_id(cast(str, pool_name
))
346 QUERY_POOL_ID
: pool_id
,
347 QUERY_POOL_ID_MAP
: {pool_id
: pool_name
},
348 QUERY_IDS
: self
.register_osd_perf_queries(pool_id
, namespace
),
349 QUERY_LAST_REQUEST
: datetime
.now()
352 self
.user_queries
[pool_key
] = user_query
354 # force an immediate stat pull if this is a new query
355 self
.query_condition
.notify()
356 self
.refresh_condition
.wait(5)
359 user_query
= self
.user_queries
[pool_key
]
361 # ensure query doesn't expire
362 user_query
[QUERY_LAST_REQUEST
] = datetime
.now()
364 if pool_key
== GLOBAL_POOL_KEY
:
365 # refresh the global pool id -> name map upon each
367 user_query
[QUERY_POOL_ID_MAP
] = {
368 pool_id
: pool_name
for pool_id
, pool_name
369 in get_rbd_pools(self
.module
).items()}
371 self
.log
.debug("register_query: pool_key={}, query_ids={}".format(
372 pool_key
, user_query
[QUERY_IDS
]))
376 def extract_stat(self
,
378 raw_image
: Optional
[RawImageCounterT
],
379 sum_image
: Any
) -> float:
380 # require two raw counters between a fixed time window
381 if not raw_image
or not raw_image
[0] or not raw_image
[1]:
384 current_counter
, previous_counter
= cast(Tuple
[PerfCounterT
, PerfCounterT
], raw_image
)
385 current_time
= current_counter
[0]
386 previous_time
= previous_counter
[0]
387 if current_time
<= previous_time
or \
388 current_time
- previous_time
> STATS_RATE_INTERVAL
.total_seconds():
391 current_value
= current_counter
[1][index
]
392 instant_rate
= float(current_value
) / (current_time
- previous_time
)
394 # convert latencies from sum to average per op
396 if OSD_PERF_QUERY_COUNTERS
[index
] == 'write_latency':
397 ops_index
= OSD_PERF_QUERY_COUNTERS_INDICES
['write_ops']
398 elif OSD_PERF_QUERY_COUNTERS
[index
] == 'read_latency':
399 ops_index
= OSD_PERF_QUERY_COUNTERS_INDICES
['read_ops']
401 if ops_index
is not None:
402 ops
= max(1, self
.extract_stat(ops_index
, raw_image
, sum_image
))
407 def extract_counter(self
,
409 raw_image
: Optional
[RawImageCounterT
],
410 sum_image
: List
[int]) -> int:
412 return sum_image
[index
]
415 def generate_report(self
,
416 query
: Dict
[str, Union
[Dict
[str, str],
417 Dict
[int, Dict
[str, dict]]]],
419 extract_data
: ExtractDataFuncT
) -> Tuple
[Dict
[int, str],
420 List
[Dict
[str, List
[float]]]]:
421 pool_id_map
= cast(Dict
[int, str], query
[QUERY_POOL_ID_MAP
])
422 sum_pool_counters
= cast(SumPoolCountersT
,
423 query
.setdefault(QUERY_SUM_POOL_COUNTERS
,
424 cast(SumPoolCountersT
, {})))
425 # pool_id => {namespace => {image_id => [counter..] }
426 raw_pool_counters
= cast(RawPoolCountersT
,
427 query
.setdefault(QUERY_RAW_POOL_COUNTERS
,
428 cast(RawPoolCountersT
, {})))
430 sort_by_index
= OSD_PERF_QUERY_COUNTERS
.index(sort_by
)
432 # pre-sort and limit the response
434 for pool_id
, sum_namespaces
in sum_pool_counters
.items():
435 if pool_id
not in pool_id_map
:
437 raw_namespaces
: RawNamespacesCountersT
= raw_pool_counters
.get(pool_id
, {})
438 for namespace
, sum_images
in sum_namespaces
.items():
439 raw_images
= raw_namespaces
.get(namespace
, {})
440 for image_id
, sum_image
in sum_images
.items():
441 raw_image
= raw_images
.get(image_id
)
443 # always sort by recent IO activity
444 results
.append(((pool_id
, namespace
, image_id
),
445 self
.extract_stat(sort_by_index
, raw_image
,
447 results
= sorted(results
, key
=lambda x
: x
[1], reverse
=True)[:REPORT_MAX_RESULTS
]
449 # build the report in sorted order
450 pool_descriptors
: Dict
[str, int] = {}
452 for key
, _
in results
:
454 pool_name
= pool_id_map
[pool_id
]
458 image_names
= self
.image_name_cache
.get((pool_id
, namespace
), {})
459 image_name
= image_names
[image_id
]
461 raw_namespaces
= raw_pool_counters
.get(pool_id
, {})
462 raw_images
= raw_namespaces
.get(namespace
, {})
463 raw_image
= raw_images
.get(image_id
)
465 sum_namespaces
= sum_pool_counters
[pool_id
]
466 sum_images
= sum_namespaces
[namespace
]
467 sum_image
= sum_images
.get(image_id
, [])
469 pool_descriptor
= pool_name
471 pool_descriptor
+= "/{}".format(namespace
)
472 pool_index
= pool_descriptors
.setdefault(pool_descriptor
,
473 len(pool_descriptors
))
474 image_descriptor
= "{}/{}".format(pool_index
, image_name
)
475 data
= [extract_data(i
, raw_image
, sum_image
)
476 for i
in range(len(OSD_PERF_QUERY_COUNTERS
))]
478 # skip if no data to report
479 if data
== [0 for i
in range(len(OSD_PERF_QUERY_COUNTERS
))]:
482 counters
.append({image_descriptor
: data
})
484 return {idx
: descriptor
for descriptor
, idx
485 in pool_descriptors
.items()}, \
488 def get_perf_data(self
,
490 pool_spec
: Optional
[str],
492 extract_data
: ExtractDataFuncT
) -> Tuple
[int, str, str]:
493 self
.log
.debug("get_perf_{}s: pool_spec={}, sort_by={}".format(
494 report
, pool_spec
, sort_by
))
495 self
.scrub_expired_queries()
497 pool_key
= extract_pool_key(pool_spec
)
498 authorize_request(self
.module
, pool_key
[0], pool_key
[1])
499 user_query
= self
.register_query(pool_key
)
502 pool_descriptors
, counters
= self
.generate_report(
503 user_query
, sort_by
, extract_data
)
506 'timestamp': time
.mktime(now
.timetuple()),
507 '{}_descriptors'.format(report
): OSD_PERF_QUERY_COUNTERS
,
508 'pool_descriptors': pool_descriptors
,
509 '{}s'.format(report
): counters
512 return 0, json
.dumps(report
), ""
514 def get_perf_stats(self
,
515 pool_spec
: Optional
[str],
516 sort_by
: str) -> Tuple
[int, str, str]:
517 return self
.get_perf_data(
518 "stat", pool_spec
, sort_by
, self
.extract_stat
)
520 def get_perf_counters(self
,
521 pool_spec
: Optional
[str],
522 sort_by
: str) -> Tuple
[int, str, str]:
523 return self
.get_perf_data(
524 "counter", pool_spec
, sort_by
, self
.extract_counter
)