]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/perf.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / rbd_support / perf.py
1 import errno
2 import json
3 import rados
4 import rbd
5 import time
6 import traceback
7
8 from datetime import datetime, timedelta
9 from threading import Condition, Lock, Thread
10 from typing import cast, Any, Callable, Dict, List, Optional, Set, Tuple, Union
11
12 from .common import (GLOBAL_POOL_KEY, authorize_request, extract_pool_key,
13 get_rbd_pools, PoolKeyT)
14
15 QUERY_POOL_ID = "pool_id"
16 QUERY_POOL_ID_MAP = "pool_id_map"
17 QUERY_IDS = "query_ids"
18 QUERY_SUM_POOL_COUNTERS = "pool_counters"
19 QUERY_RAW_POOL_COUNTERS = "raw_pool_counters"
20 QUERY_LAST_REQUEST = "last_request"
21
22 OSD_PERF_QUERY_REGEX_MATCH_ALL = '^(.*)$'
23 OSD_PERF_QUERY_COUNTERS = ['write_ops',
24 'read_ops',
25 'write_bytes',
26 'read_bytes',
27 'write_latency',
28 'read_latency']
29 OSD_PERF_QUERY_COUNTERS_INDICES = {
30 OSD_PERF_QUERY_COUNTERS[i]: i for i in range(len(OSD_PERF_QUERY_COUNTERS))}
31
32 OSD_PERF_QUERY_LATENCY_COUNTER_INDICES = [4, 5]
33 OSD_PERF_QUERY_MAX_RESULTS = 256
34
35 POOL_REFRESH_INTERVAL = timedelta(minutes=5)
36 QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
37 STATS_RATE_INTERVAL = timedelta(minutes=1)
38
39 REPORT_MAX_RESULTS = 64
40
41
42 # {(pool_id, namespace)...}
43 ResolveImageNamesT = Set[Tuple[int, str]]
44
45 # (time, [value,...])
46 PerfCounterT = Tuple[int, List[int]]
47 # current, previous
48 RawImageCounterT = Tuple[PerfCounterT, Optional[PerfCounterT]]
49 # image_id => perf_counter
50 RawImagesCounterT = Dict[str, RawImageCounterT]
51 # namespace_counters => raw_images
52 RawNamespacesCountersT = Dict[str, RawImagesCounterT]
53 # pool_id => namespaces_counters
54 RawPoolCountersT = Dict[int, RawNamespacesCountersT]
55
56 SumImageCounterT = List[int]
57 # image_id => sum_image
58 SumImagesCounterT = Dict[str, SumImageCounterT]
59 # namespace => sum_images
60 SumNamespacesCountersT = Dict[str, SumImagesCounterT]
61 # pool_id, sum_namespaces
62 SumPoolCountersT = Dict[int, SumNamespacesCountersT]
63
64 ExtractDataFuncT = Callable[[int, Optional[RawImageCounterT], SumImageCounterT], float]
65
66
67 class PerfHandler:
68
69 @classmethod
70 def prepare_regex(cls, value: Any) -> str:
71 return '^({})$'.format(value)
72
73 @classmethod
74 def prepare_osd_perf_query(cls,
75 pool_id: Optional[int],
76 namespace: Optional[str],
77 counter_type: str) -> Dict[str, Any]:
78 pool_id_regex = OSD_PERF_QUERY_REGEX_MATCH_ALL
79 namespace_regex = OSD_PERF_QUERY_REGEX_MATCH_ALL
80 if pool_id:
81 pool_id_regex = cls.prepare_regex(pool_id)
82 if namespace:
83 namespace_regex = cls.prepare_regex(namespace)
84
85 return {
86 'key_descriptor': [
87 {'type': 'pool_id', 'regex': pool_id_regex},
88 {'type': 'namespace', 'regex': namespace_regex},
89 {'type': 'object_name',
90 'regex': '^(?:rbd|journal)_data\\.(?:([0-9]+)\\.)?([^.]+)\\.'},
91 ],
92 'performance_counter_descriptors': OSD_PERF_QUERY_COUNTERS,
93 'limit': {'order_by': counter_type,
94 'max_count': OSD_PERF_QUERY_MAX_RESULTS},
95 }
96
97 @classmethod
98 def pool_spec_search_keys(cls, pool_key: str) -> List[str]:
99 return [pool_key[0:len(pool_key) - x]
100 for x in range(0, len(pool_key) + 1)]
101
102 @classmethod
103 def submatch_pool_key(cls, pool_key: PoolKeyT, search_key: str) -> bool:
104 return ((pool_key[1] == search_key[1] or not search_key[1])
105 and (pool_key[0] == search_key[0] or not search_key[0]))
106
107 def __init__(self, module: Any) -> None:
108 self.user_queries: Dict[PoolKeyT, Dict[str, Any]] = {}
109 self.image_cache: Dict[str, str] = {}
110
111 self.lock = Lock()
112 self.query_condition = Condition(self.lock)
113 self.refresh_condition = Condition(self.lock)
114
115 self.image_name_cache: Dict[Tuple[int, str], Dict[str, str]] = {}
116 self.image_name_refresh_time = datetime.fromtimestamp(0)
117
118 self.module = module
119 self.log = module.log
120
121 self.stop_thread = False
122 self.thread = Thread(target=self.run)
123
124 def setup(self) -> None:
125 self.thread.start()
126
127 def shutdown(self) -> None:
128 self.log.info("PerfHandler: shutting down")
129 self.stop_thread = True
130 if self.thread.is_alive():
131 self.log.debug("PerfHandler: joining thread")
132 self.thread.join()
133 self.log.info("PerfHandler: shut down")
134
135 def run(self) -> None:
136 try:
137 self.log.info("PerfHandler: starting")
138 while not self.stop_thread:
139 with self.lock:
140 self.scrub_expired_queries()
141 self.process_raw_osd_perf_counters()
142 self.refresh_condition.notify()
143
144 stats_period = self.module.get_ceph_option("mgr_stats_period")
145 self.query_condition.wait(stats_period)
146
147 self.log.debug("PerfHandler: tick")
148
149 except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
150 self.log.exception("PerfHandler: client blocklisted")
151 self.module.client_blocklisted.set()
152 except Exception as ex:
153 self.log.fatal("Fatal runtime error: {}\n{}".format(
154 ex, traceback.format_exc()))
155
156 def merge_raw_osd_perf_counters(self,
157 pool_key: PoolKeyT,
158 query: Dict[str, Any],
159 now_ts: int,
160 resolve_image_names: ResolveImageNamesT) -> RawPoolCountersT:
161 pool_id_map = query[QUERY_POOL_ID_MAP]
162
163 # collect and combine the raw counters from all sort orders
164 raw_pool_counters: Dict[int, Dict[str, Dict[str, Any]]] = query.setdefault(QUERY_RAW_POOL_COUNTERS, {})
165 for query_id in query[QUERY_IDS]:
166 res = self.module.get_osd_perf_counters(query_id)
167 for counter in res['counters']:
168 # replace pool id from object name if it exists
169 k = counter['k']
170 pool_id = int(k[2][0]) if k[2][0] else int(k[0][0])
171 namespace = k[1][0]
172 image_id = k[2][1]
173
174 # ignore metrics from non-matching pools/namespaces
175 if pool_id not in pool_id_map:
176 continue
177 if pool_key[1] is not None and pool_key[1] != namespace:
178 continue
179
180 # flag the pool (and namespace) for refresh if we cannot find
181 # image name in the cache
182 resolve_image_key = (pool_id, namespace)
183 if image_id not in self.image_name_cache.get(resolve_image_key, {}):
184 resolve_image_names.add(resolve_image_key)
185
186 # copy the 'sum' counter values for each image (ignore count)
187 # if we haven't already processed it for this round
188 raw_namespaces = raw_pool_counters.setdefault(pool_id, {})
189 raw_images = raw_namespaces.setdefault(namespace, {})
190 raw_image = raw_images.get(image_id)
191 # save the last two perf counters for each image
192 new_current = (now_ts, [int(x[0]) for x in counter['c']])
193 if raw_image:
194 old_current, _ = raw_image
195 if old_current[0] < now_ts:
196 raw_images[image_id] = (new_current, old_current)
197 else:
198 raw_images[image_id] = (new_current, None)
199
200 self.log.debug("merge_raw_osd_perf_counters: {}".format(raw_pool_counters))
201 return raw_pool_counters
202
203 def sum_osd_perf_counters(self,
204 query: Dict[str, dict],
205 raw_pool_counters: RawPoolCountersT,
206 now_ts: int) -> SumPoolCountersT:
207 # update the cumulative counters for each image
208 sum_pool_counters = query.setdefault(QUERY_SUM_POOL_COUNTERS, {})
209 for pool_id, raw_namespaces in raw_pool_counters.items():
210 sum_namespaces = sum_pool_counters.setdefault(pool_id, {})
211 for namespace, raw_images in raw_namespaces.items():
212 sum_namespace = sum_namespaces.setdefault(namespace, {})
213 for image_id, raw_image in raw_images.items():
214 # zero-out non-updated raw counters
215 if not raw_image[0]:
216 continue
217 old_current, _ = raw_image
218 if old_current[0] < now_ts:
219 new_current = (now_ts, [0] * len(old_current[1]))
220 raw_images[image_id] = (new_current, old_current)
221 continue
222
223 counters = old_current[1]
224
225 # copy raw counters if this is a newly discovered image or
226 # increment existing counters
227 sum_image = sum_namespace.setdefault(image_id, None)
228 if sum_image:
229 for i in range(len(counters)):
230 sum_image[i] += counters[i]
231 else:
232 sum_namespace[image_id] = [x for x in counters]
233
234 self.log.debug("sum_osd_perf_counters: {}".format(sum_pool_counters))
235 return sum_pool_counters
236
237 def refresh_image_names(self, resolve_image_names: ResolveImageNamesT) -> None:
238 for pool_id, namespace in resolve_image_names:
239 image_key = (pool_id, namespace)
240 images = self.image_name_cache.setdefault(image_key, {})
241 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
242 ioctx.set_namespace(namespace)
243 for image_meta in rbd.RBD().list2(ioctx):
244 images[image_meta['id']] = image_meta['name']
245 self.log.debug("resolve_image_names: {}={}".format(image_key, images))
246
247 def scrub_missing_images(self) -> None:
248 for pool_key, query in self.user_queries.items():
249 raw_pool_counters = query.get(QUERY_RAW_POOL_COUNTERS, {})
250 sum_pool_counters = query.get(QUERY_SUM_POOL_COUNTERS, {})
251 for pool_id, sum_namespaces in sum_pool_counters.items():
252 raw_namespaces = raw_pool_counters.get(pool_id, {})
253 for namespace, sum_images in sum_namespaces.items():
254 raw_images = raw_namespaces.get(namespace, {})
255
256 image_key = (pool_id, namespace)
257 image_names = self.image_name_cache.get(image_key, {})
258 for image_id in list(sum_images.keys()):
259 # scrub image counters if we failed to resolve image name
260 if image_id not in image_names:
261 self.log.debug("scrub_missing_images: dropping {}/{}".format(
262 image_key, image_id))
263 del sum_images[image_id]
264 if image_id in raw_images:
265 del raw_images[image_id]
266
267 def process_raw_osd_perf_counters(self) -> None:
268 now = datetime.now()
269 now_ts = int(now.strftime("%s"))
270
271 # clear the image name cache if we need to refresh all active pools
272 if self.image_name_cache and \
273 self.image_name_refresh_time + POOL_REFRESH_INTERVAL < now:
274 self.log.debug("process_raw_osd_perf_counters: expiring image name cache")
275 self.image_name_cache = {}
276
277 resolve_image_names: Set[Tuple[int, str]] = set()
278 for pool_key, query in self.user_queries.items():
279 if not query[QUERY_IDS]:
280 continue
281
282 raw_pool_counters = self.merge_raw_osd_perf_counters(
283 pool_key, query, now_ts, resolve_image_names)
284 self.sum_osd_perf_counters(query, raw_pool_counters, now_ts)
285
286 if resolve_image_names:
287 self.image_name_refresh_time = now
288 self.refresh_image_names(resolve_image_names)
289 self.scrub_missing_images()
290 elif not self.image_name_cache:
291 self.scrub_missing_images()
292
293 def resolve_pool_id(self, pool_name: str) -> int:
294 pool_id = self.module.rados.pool_lookup(pool_name)
295 if not pool_id:
296 raise rados.ObjectNotFound("Pool '{}' not found".format(pool_name),
297 errno.ENOENT)
298 return pool_id
299
300 def scrub_expired_queries(self) -> None:
301 # perf counters need to be periodically refreshed to continue
302 # to be registered
303 expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL
304 for pool_key in list(self.user_queries.keys()):
305 user_query = self.user_queries[pool_key]
306 if user_query[QUERY_LAST_REQUEST] < expire_time:
307 self.unregister_osd_perf_queries(pool_key, user_query[QUERY_IDS])
308 del self.user_queries[pool_key]
309
310 def register_osd_perf_queries(self,
311 pool_id: Optional[int],
312 namespace: Optional[str]) -> List[int]:
313 query_ids = []
314 try:
315 for counter in OSD_PERF_QUERY_COUNTERS:
316 query = self.prepare_osd_perf_query(pool_id, namespace, counter)
317 self.log.debug("register_osd_perf_queries: {}".format(query))
318
319 query_id = self.module.add_osd_perf_query(query)
320 if query_id is None:
321 raise RuntimeError('Failed to add OSD perf query: {}'.format(query))
322 query_ids.append(query_id)
323
324 except Exception:
325 for query_id in query_ids:
326 self.module.remove_osd_perf_query(query_id)
327 raise
328
329 return query_ids
330
331 def unregister_osd_perf_queries(self, pool_key: PoolKeyT, query_ids: List[int]) -> None:
332 self.log.info("unregister_osd_perf_queries: pool_key={}, query_ids={}".format(
333 pool_key, query_ids))
334 for query_id in query_ids:
335 self.module.remove_osd_perf_query(query_id)
336 query_ids[:] = []
337
338 def register_query(self, pool_key: PoolKeyT) -> Dict[str, Any]:
339 if pool_key not in self.user_queries:
340 pool_name, namespace = pool_key
341 pool_id = None
342 if pool_name:
343 pool_id = self.resolve_pool_id(cast(str, pool_name))
344
345 user_query = {
346 QUERY_POOL_ID: pool_id,
347 QUERY_POOL_ID_MAP: {pool_id: pool_name},
348 QUERY_IDS: self.register_osd_perf_queries(pool_id, namespace),
349 QUERY_LAST_REQUEST: datetime.now()
350 }
351
352 self.user_queries[pool_key] = user_query
353
354 # force an immediate stat pull if this is a new query
355 self.query_condition.notify()
356 self.refresh_condition.wait(5)
357
358 else:
359 user_query = self.user_queries[pool_key]
360
361 # ensure query doesn't expire
362 user_query[QUERY_LAST_REQUEST] = datetime.now()
363
364 if pool_key == GLOBAL_POOL_KEY:
365 # refresh the global pool id -> name map upon each
366 # processing period
367 user_query[QUERY_POOL_ID_MAP] = {
368 pool_id: pool_name for pool_id, pool_name
369 in get_rbd_pools(self.module).items()}
370
371 self.log.debug("register_query: pool_key={}, query_ids={}".format(
372 pool_key, user_query[QUERY_IDS]))
373
374 return user_query
375
376 def extract_stat(self,
377 index: int,
378 raw_image: Optional[RawImageCounterT],
379 sum_image: Any) -> float:
380 # require two raw counters between a fixed time window
381 if not raw_image or not raw_image[0] or not raw_image[1]:
382 return 0
383
384 current_counter, previous_counter = cast(Tuple[PerfCounterT, PerfCounterT], raw_image)
385 current_time = current_counter[0]
386 previous_time = previous_counter[0]
387 if current_time <= previous_time or \
388 current_time - previous_time > STATS_RATE_INTERVAL.total_seconds():
389 return 0
390
391 current_value = current_counter[1][index]
392 instant_rate = float(current_value) / (current_time - previous_time)
393
394 # convert latencies from sum to average per op
395 ops_index = None
396 if OSD_PERF_QUERY_COUNTERS[index] == 'write_latency':
397 ops_index = OSD_PERF_QUERY_COUNTERS_INDICES['write_ops']
398 elif OSD_PERF_QUERY_COUNTERS[index] == 'read_latency':
399 ops_index = OSD_PERF_QUERY_COUNTERS_INDICES['read_ops']
400
401 if ops_index is not None:
402 ops = max(1, self.extract_stat(ops_index, raw_image, sum_image))
403 instant_rate /= ops
404
405 return instant_rate
406
407 def extract_counter(self,
408 index: int,
409 raw_image: Optional[RawImageCounterT],
410 sum_image: List[int]) -> int:
411 if sum_image:
412 return sum_image[index]
413 return 0
414
415 def generate_report(self,
416 query: Dict[str, Union[Dict[str, str],
417 Dict[int, Dict[str, dict]]]],
418 sort_by: str,
419 extract_data: ExtractDataFuncT) -> Tuple[Dict[int, str],
420 List[Dict[str, List[float]]]]:
421 pool_id_map = cast(Dict[int, str], query[QUERY_POOL_ID_MAP])
422 sum_pool_counters = cast(SumPoolCountersT,
423 query.setdefault(QUERY_SUM_POOL_COUNTERS,
424 cast(SumPoolCountersT, {})))
425 # pool_id => {namespace => {image_id => [counter..] }
426 raw_pool_counters = cast(RawPoolCountersT,
427 query.setdefault(QUERY_RAW_POOL_COUNTERS,
428 cast(RawPoolCountersT, {})))
429
430 sort_by_index = OSD_PERF_QUERY_COUNTERS.index(sort_by)
431
432 # pre-sort and limit the response
433 results = []
434 for pool_id, sum_namespaces in sum_pool_counters.items():
435 if pool_id not in pool_id_map:
436 continue
437 raw_namespaces: RawNamespacesCountersT = raw_pool_counters.get(pool_id, {})
438 for namespace, sum_images in sum_namespaces.items():
439 raw_images = raw_namespaces.get(namespace, {})
440 for image_id, sum_image in sum_images.items():
441 raw_image = raw_images.get(image_id)
442
443 # always sort by recent IO activity
444 results.append(((pool_id, namespace, image_id),
445 self.extract_stat(sort_by_index, raw_image,
446 sum_image)))
447 results = sorted(results, key=lambda x: x[1], reverse=True)[:REPORT_MAX_RESULTS]
448
449 # build the report in sorted order
450 pool_descriptors: Dict[str, int] = {}
451 counters = []
452 for key, _ in results:
453 pool_id = key[0]
454 pool_name = pool_id_map[pool_id]
455
456 namespace = key[1]
457 image_id = key[2]
458 image_names = self.image_name_cache.get((pool_id, namespace), {})
459 image_name = image_names[image_id]
460
461 raw_namespaces = raw_pool_counters.get(pool_id, {})
462 raw_images = raw_namespaces.get(namespace, {})
463 raw_image = raw_images.get(image_id)
464
465 sum_namespaces = sum_pool_counters[pool_id]
466 sum_images = sum_namespaces[namespace]
467 sum_image = sum_images.get(image_id, [])
468
469 pool_descriptor = pool_name
470 if namespace:
471 pool_descriptor += "/{}".format(namespace)
472 pool_index = pool_descriptors.setdefault(pool_descriptor,
473 len(pool_descriptors))
474 image_descriptor = "{}/{}".format(pool_index, image_name)
475 data = [extract_data(i, raw_image, sum_image)
476 for i in range(len(OSD_PERF_QUERY_COUNTERS))]
477
478 # skip if no data to report
479 if data == [0 for i in range(len(OSD_PERF_QUERY_COUNTERS))]:
480 continue
481
482 counters.append({image_descriptor: data})
483
484 return {idx: descriptor for descriptor, idx
485 in pool_descriptors.items()}, \
486 counters
487
488 def get_perf_data(self,
489 report: str,
490 pool_spec: Optional[str],
491 sort_by: str,
492 extract_data: ExtractDataFuncT) -> Tuple[int, str, str]:
493 self.log.debug("get_perf_{}s: pool_spec={}, sort_by={}".format(
494 report, pool_spec, sort_by))
495 self.scrub_expired_queries()
496
497 pool_key = extract_pool_key(pool_spec)
498 authorize_request(self.module, pool_key[0], pool_key[1])
499 user_query = self.register_query(pool_key)
500
501 now = datetime.now()
502 pool_descriptors, counters = self.generate_report(
503 user_query, sort_by, extract_data)
504
505 report = {
506 'timestamp': time.mktime(now.timetuple()),
507 '{}_descriptors'.format(report): OSD_PERF_QUERY_COUNTERS,
508 'pool_descriptors': pool_descriptors,
509 '{}s'.format(report): counters
510 }
511
512 return 0, json.dumps(report), ""
513
514 def get_perf_stats(self,
515 pool_spec: Optional[str],
516 sort_by: str) -> Tuple[int, str, str]:
517 return self.get_perf_data(
518 "stat", pool_spec, sort_by, self.extract_stat)
519
520 def get_perf_counters(self,
521 pool_spec: Optional[str],
522 sort_by: str) -> Tuple[int, str, str]:
523 return self.get_perf_data(
524 "counter", pool_spec, sort_by, self.extract_counter)