]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rbd_support/perf.py
import quincy 17.2.0
[ceph.git] / ceph / src / pybind / mgr / rbd_support / perf.py
CommitLineData
9f95a23c
TL
1import errno
2import json
3import rados
4import rbd
5import time
6import traceback
7
8from datetime import datetime, timedelta
9from threading import Condition, Lock, Thread
20effc67 10from typing import cast, Any, Callable, Dict, List, Optional, Set, Tuple, Union
9f95a23c
TL
11
12from .common import (GLOBAL_POOL_KEY, authorize_request, extract_pool_key,
20effc67 13 get_rbd_pools, PoolKeyT)
9f95a23c
TL
14
15QUERY_POOL_ID = "pool_id"
16QUERY_POOL_ID_MAP = "pool_id_map"
17QUERY_IDS = "query_ids"
18QUERY_SUM_POOL_COUNTERS = "pool_counters"
19QUERY_RAW_POOL_COUNTERS = "raw_pool_counters"
20QUERY_LAST_REQUEST = "last_request"
21
22OSD_PERF_QUERY_REGEX_MATCH_ALL = '^(.*)$'
23OSD_PERF_QUERY_COUNTERS = ['write_ops',
24 'read_ops',
25 'write_bytes',
26 'read_bytes',
27 'write_latency',
28 'read_latency']
29OSD_PERF_QUERY_COUNTERS_INDICES = {
30 OSD_PERF_QUERY_COUNTERS[i]: i for i in range(len(OSD_PERF_QUERY_COUNTERS))}
31
32OSD_PERF_QUERY_LATENCY_COUNTER_INDICES = [4, 5]
33OSD_PERF_QUERY_MAX_RESULTS = 256
34
35POOL_REFRESH_INTERVAL = timedelta(minutes=5)
36QUERY_EXPIRE_INTERVAL = timedelta(minutes=1)
37STATS_RATE_INTERVAL = timedelta(minutes=1)
38
39REPORT_MAX_RESULTS = 64
40
41
20effc67
TL
42# {(pool_id, namespace)...}
43ResolveImageNamesT = Set[Tuple[int, str]]
44
45# (time, [value,...])
46PerfCounterT = Tuple[int, List[int]]
47# current, previous
48RawImageCounterT = Tuple[PerfCounterT, Optional[PerfCounterT]]
49# image_id => perf_counter
50RawImagesCounterT = Dict[str, RawImageCounterT]
51# namespace_counters => raw_images
52RawNamespacesCountersT = Dict[str, RawImagesCounterT]
53# pool_id => namespaces_counters
54RawPoolCountersT = Dict[int, RawNamespacesCountersT]
55
56SumImageCounterT = List[int]
57# image_id => sum_image
58SumImagesCounterT = Dict[str, SumImageCounterT]
59# namespace => sum_images
60SumNamespacesCountersT = Dict[str, SumImagesCounterT]
61# pool_id, sum_namespaces
62SumPoolCountersT = Dict[int, SumNamespacesCountersT]
63
64ExtractDataFuncT = Callable[[int, Optional[RawImageCounterT], SumImageCounterT], float]
65
66
9f95a23c 67class PerfHandler:
20effc67
TL
68 user_queries: Dict[PoolKeyT, Dict[str, Any]] = {}
69 image_cache: Dict[str, str] = {}
9f95a23c
TL
70
71 lock = Lock()
72 query_condition = Condition(lock)
73 refresh_condition = Condition(lock)
74 thread = None
75
20effc67 76 image_name_cache: Dict[Tuple[int, str], Dict[str, str]] = {}
9f95a23c
TL
77 image_name_refresh_time = datetime.fromtimestamp(0)
78
79 @classmethod
20effc67 80 def prepare_regex(cls, value: Any) -> str:
9f95a23c
TL
81 return '^({})$'.format(value)
82
83 @classmethod
20effc67
TL
84 def prepare_osd_perf_query(cls,
85 pool_id: Optional[int],
86 namespace: Optional[str],
87 counter_type: str) -> Dict[str, Any]:
9f95a23c
TL
88 pool_id_regex = OSD_PERF_QUERY_REGEX_MATCH_ALL
89 namespace_regex = OSD_PERF_QUERY_REGEX_MATCH_ALL
90 if pool_id:
91 pool_id_regex = cls.prepare_regex(pool_id)
92 if namespace:
93 namespace_regex = cls.prepare_regex(namespace)
94
95 return {
96 'key_descriptor': [
97 {'type': 'pool_id', 'regex': pool_id_regex},
98 {'type': 'namespace', 'regex': namespace_regex},
99 {'type': 'object_name',
100 'regex': '^(?:rbd|journal)_data\\.(?:([0-9]+)\\.)?([^.]+)\\.'},
101 ],
102 'performance_counter_descriptors': OSD_PERF_QUERY_COUNTERS,
103 'limit': {'order_by': counter_type,
104 'max_count': OSD_PERF_QUERY_MAX_RESULTS},
105 }
106
107 @classmethod
20effc67 108 def pool_spec_search_keys(cls, pool_key: str) -> List[str]:
9f95a23c
TL
109 return [pool_key[0:len(pool_key) - x]
110 for x in range(0, len(pool_key) + 1)]
111
112 @classmethod
20effc67 113 def submatch_pool_key(cls, pool_key: PoolKeyT, search_key: str) -> bool:
9f95a23c
TL
114 return ((pool_key[1] == search_key[1] or not search_key[1])
115 and (pool_key[0] == search_key[0] or not search_key[0]))
116
20effc67 117 def __init__(self, module: Any) -> None:
9f95a23c
TL
118 self.module = module
119 self.log = module.log
120
121 self.thread = Thread(target=self.run)
122 self.thread.start()
123
20effc67 124 def run(self) -> None:
9f95a23c
TL
125 try:
126 self.log.info("PerfHandler: starting")
127 while True:
128 with self.lock:
129 self.scrub_expired_queries()
130 self.process_raw_osd_perf_counters()
131 self.refresh_condition.notify()
132
133 stats_period = self.module.get_ceph_option("mgr_stats_period")
134 self.query_condition.wait(stats_period)
135
136 self.log.debug("PerfHandler: tick")
137
138 except Exception as ex:
139 self.log.fatal("Fatal runtime error: {}\n{}".format(
140 ex, traceback.format_exc()))
141
20effc67
TL
142 def merge_raw_osd_perf_counters(self,
143 pool_key: PoolKeyT,
144 query: Dict[str, Any],
145 now_ts: int,
146 resolve_image_names: ResolveImageNamesT) -> RawPoolCountersT:
9f95a23c
TL
147 pool_id_map = query[QUERY_POOL_ID_MAP]
148
149 # collect and combine the raw counters from all sort orders
20effc67 150 raw_pool_counters: Dict[int, Dict[str, Dict[str, Any]]] = query.setdefault(QUERY_RAW_POOL_COUNTERS, {})
9f95a23c
TL
151 for query_id in query[QUERY_IDS]:
152 res = self.module.get_osd_perf_counters(query_id)
153 for counter in res['counters']:
154 # replace pool id from object name if it exists
155 k = counter['k']
156 pool_id = int(k[2][0]) if k[2][0] else int(k[0][0])
157 namespace = k[1][0]
158 image_id = k[2][1]
159
160 # ignore metrics from non-matching pools/namespaces
161 if pool_id not in pool_id_map:
162 continue
163 if pool_key[1] is not None and pool_key[1] != namespace:
164 continue
165
166 # flag the pool (and namespace) for refresh if we cannot find
167 # image name in the cache
168 resolve_image_key = (pool_id, namespace)
169 if image_id not in self.image_name_cache.get(resolve_image_key, {}):
170 resolve_image_names.add(resolve_image_key)
171
172 # copy the 'sum' counter values for each image (ignore count)
173 # if we haven't already processed it for this round
174 raw_namespaces = raw_pool_counters.setdefault(pool_id, {})
175 raw_images = raw_namespaces.setdefault(namespace, {})
20effc67 176 raw_image = raw_images.get(image_id)
9f95a23c 177 # save the last two perf counters for each image
20effc67
TL
178 new_current = (now_ts, [int(x[0]) for x in counter['c']])
179 if raw_image:
180 old_current, _ = raw_image
181 if old_current[0] < now_ts:
182 raw_images[image_id] = (new_current, old_current)
183 else:
184 raw_images[image_id] = (new_current, None)
9f95a23c
TL
185
186 self.log.debug("merge_raw_osd_perf_counters: {}".format(raw_pool_counters))
187 return raw_pool_counters
188
20effc67
TL
189 def sum_osd_perf_counters(self,
190 query: Dict[str, dict],
191 raw_pool_counters: RawPoolCountersT,
192 now_ts: int) -> SumPoolCountersT:
9f95a23c
TL
193 # update the cumulative counters for each image
194 sum_pool_counters = query.setdefault(QUERY_SUM_POOL_COUNTERS, {})
195 for pool_id, raw_namespaces in raw_pool_counters.items():
196 sum_namespaces = sum_pool_counters.setdefault(pool_id, {})
197 for namespace, raw_images in raw_namespaces.items():
198 sum_namespace = sum_namespaces.setdefault(namespace, {})
199 for image_id, raw_image in raw_images.items():
200 # zero-out non-updated raw counters
201 if not raw_image[0]:
202 continue
20effc67
TL
203 old_current, _ = raw_image
204 if old_current[0] < now_ts:
205 new_current = (now_ts, [0] * len(old_current[1]))
206 raw_images[image_id] = (new_current, old_current)
9f95a23c
TL
207 continue
208
20effc67 209 counters = old_current[1]
9f95a23c
TL
210
211 # copy raw counters if this is a newly discovered image or
212 # increment existing counters
213 sum_image = sum_namespace.setdefault(image_id, None)
214 if sum_image:
215 for i in range(len(counters)):
216 sum_image[i] += counters[i]
217 else:
218 sum_namespace[image_id] = [x for x in counters]
219
220 self.log.debug("sum_osd_perf_counters: {}".format(sum_pool_counters))
221 return sum_pool_counters
222
20effc67 223 def refresh_image_names(self, resolve_image_names: ResolveImageNamesT) -> None:
9f95a23c
TL
224 for pool_id, namespace in resolve_image_names:
225 image_key = (pool_id, namespace)
226 images = self.image_name_cache.setdefault(image_key, {})
227 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
228 ioctx.set_namespace(namespace)
229 for image_meta in rbd.RBD().list2(ioctx):
230 images[image_meta['id']] = image_meta['name']
231 self.log.debug("resolve_image_names: {}={}".format(image_key, images))
232
20effc67 233 def scrub_missing_images(self) -> None:
9f95a23c
TL
234 for pool_key, query in self.user_queries.items():
235 raw_pool_counters = query.get(QUERY_RAW_POOL_COUNTERS, {})
236 sum_pool_counters = query.get(QUERY_SUM_POOL_COUNTERS, {})
237 for pool_id, sum_namespaces in sum_pool_counters.items():
238 raw_namespaces = raw_pool_counters.get(pool_id, {})
239 for namespace, sum_images in sum_namespaces.items():
240 raw_images = raw_namespaces.get(namespace, {})
241
242 image_key = (pool_id, namespace)
243 image_names = self.image_name_cache.get(image_key, {})
244 for image_id in list(sum_images.keys()):
245 # scrub image counters if we failed to resolve image name
246 if image_id not in image_names:
247 self.log.debug("scrub_missing_images: dropping {}/{}".format(
248 image_key, image_id))
249 del sum_images[image_id]
250 if image_id in raw_images:
251 del raw_images[image_id]
252
20effc67 253 def process_raw_osd_perf_counters(self) -> None:
9f95a23c
TL
254 now = datetime.now()
255 now_ts = int(now.strftime("%s"))
256
257 # clear the image name cache if we need to refresh all active pools
258 if self.image_name_cache and \
259 self.image_name_refresh_time + POOL_REFRESH_INTERVAL < now:
260 self.log.debug("process_raw_osd_perf_counters: expiring image name cache")
261 self.image_name_cache = {}
262
20effc67 263 resolve_image_names: Set[Tuple[int, str]] = set()
9f95a23c
TL
264 for pool_key, query in self.user_queries.items():
265 if not query[QUERY_IDS]:
266 continue
267
268 raw_pool_counters = self.merge_raw_osd_perf_counters(
269 pool_key, query, now_ts, resolve_image_names)
270 self.sum_osd_perf_counters(query, raw_pool_counters, now_ts)
271
272 if resolve_image_names:
273 self.image_name_refresh_time = now
274 self.refresh_image_names(resolve_image_names)
275 self.scrub_missing_images()
276 elif not self.image_name_cache:
277 self.scrub_missing_images()
278
20effc67 279 def resolve_pool_id(self, pool_name: str) -> int:
9f95a23c
TL
280 pool_id = self.module.rados.pool_lookup(pool_name)
281 if not pool_id:
282 raise rados.ObjectNotFound("Pool '{}' not found".format(pool_name),
283 errno.ENOENT)
284 return pool_id
285
20effc67 286 def scrub_expired_queries(self) -> None:
9f95a23c
TL
287 # perf counters need to be periodically refreshed to continue
288 # to be registered
289 expire_time = datetime.now() - QUERY_EXPIRE_INTERVAL
290 for pool_key in list(self.user_queries.keys()):
291 user_query = self.user_queries[pool_key]
292 if user_query[QUERY_LAST_REQUEST] < expire_time:
293 self.unregister_osd_perf_queries(pool_key, user_query[QUERY_IDS])
294 del self.user_queries[pool_key]
295
20effc67
TL
296 def register_osd_perf_queries(self,
297 pool_id: Optional[int],
298 namespace: Optional[str]) -> List[int]:
9f95a23c
TL
299 query_ids = []
300 try:
301 for counter in OSD_PERF_QUERY_COUNTERS:
302 query = self.prepare_osd_perf_query(pool_id, namespace, counter)
303 self.log.debug("register_osd_perf_queries: {}".format(query))
304
305 query_id = self.module.add_osd_perf_query(query)
306 if query_id is None:
307 raise RuntimeError('Failed to add OSD perf query: {}'.format(query))
308 query_ids.append(query_id)
309
310 except Exception:
311 for query_id in query_ids:
312 self.module.remove_osd_perf_query(query_id)
313 raise
314
315 return query_ids
316
20effc67 317 def unregister_osd_perf_queries(self, pool_key: PoolKeyT, query_ids: List[int]) -> None:
9f95a23c
TL
318 self.log.info("unregister_osd_perf_queries: pool_key={}, query_ids={}".format(
319 pool_key, query_ids))
320 for query_id in query_ids:
321 self.module.remove_osd_perf_query(query_id)
322 query_ids[:] = []
323
20effc67 324 def register_query(self, pool_key: PoolKeyT) -> Dict[str, Any]:
9f95a23c 325 if pool_key not in self.user_queries:
20effc67 326 pool_name, namespace = pool_key
9f95a23c 327 pool_id = None
20effc67
TL
328 if pool_name:
329 pool_id = self.resolve_pool_id(cast(str, pool_name))
9f95a23c
TL
330
331 user_query = {
332 QUERY_POOL_ID: pool_id,
20effc67
TL
333 QUERY_POOL_ID_MAP: {pool_id: pool_name},
334 QUERY_IDS: self.register_osd_perf_queries(pool_id, namespace),
9f95a23c
TL
335 QUERY_LAST_REQUEST: datetime.now()
336 }
337
338 self.user_queries[pool_key] = user_query
339
340 # force an immediate stat pull if this is a new query
341 self.query_condition.notify()
342 self.refresh_condition.wait(5)
343
344 else:
345 user_query = self.user_queries[pool_key]
346
347 # ensure query doesn't expire
348 user_query[QUERY_LAST_REQUEST] = datetime.now()
349
350 if pool_key == GLOBAL_POOL_KEY:
351 # refresh the global pool id -> name map upon each
352 # processing period
353 user_query[QUERY_POOL_ID_MAP] = {
354 pool_id: pool_name for pool_id, pool_name
355 in get_rbd_pools(self.module).items()}
356
357 self.log.debug("register_query: pool_key={}, query_ids={}".format(
358 pool_key, user_query[QUERY_IDS]))
359
360 return user_query
361
20effc67
TL
362 def extract_stat(self,
363 index: int,
364 raw_image: Optional[RawImageCounterT],
365 sum_image: Any) -> float:
9f95a23c
TL
366 # require two raw counters between a fixed time window
367 if not raw_image or not raw_image[0] or not raw_image[1]:
368 return 0
369
20effc67
TL
370 current_counter, previous_counter = cast(Tuple[PerfCounterT, PerfCounterT], raw_image)
371 current_time = current_counter[0]
372 previous_time = previous_counter[0]
9f95a23c
TL
373 if current_time <= previous_time or \
374 current_time - previous_time > STATS_RATE_INTERVAL.total_seconds():
375 return 0
376
20effc67 377 current_value = current_counter[1][index]
9f95a23c
TL
378 instant_rate = float(current_value) / (current_time - previous_time)
379
380 # convert latencies from sum to average per op
381 ops_index = None
382 if OSD_PERF_QUERY_COUNTERS[index] == 'write_latency':
383 ops_index = OSD_PERF_QUERY_COUNTERS_INDICES['write_ops']
384 elif OSD_PERF_QUERY_COUNTERS[index] == 'read_latency':
385 ops_index = OSD_PERF_QUERY_COUNTERS_INDICES['read_ops']
386
387 if ops_index is not None:
388 ops = max(1, self.extract_stat(ops_index, raw_image, sum_image))
389 instant_rate /= ops
390
391 return instant_rate
392
20effc67
TL
393 def extract_counter(self,
394 index: int,
395 raw_image: Optional[RawImageCounterT],
396 sum_image: List[int]) -> int:
9f95a23c
TL
397 if sum_image:
398 return sum_image[index]
399 return 0
400
20effc67
TL
401 def generate_report(self,
402 query: Dict[str, Union[Dict[str, str],
403 Dict[int, Dict[str, dict]]]],
404 sort_by: str,
405 extract_data: ExtractDataFuncT) -> Tuple[Dict[int, str],
406 List[Dict[str, List[float]]]]:
407 pool_id_map = cast(Dict[int, str], query[QUERY_POOL_ID_MAP])
408 sum_pool_counters = cast(SumPoolCountersT,
409 query.setdefault(QUERY_SUM_POOL_COUNTERS,
410 cast(SumPoolCountersT, {})))
411 # pool_id => {namespace => {image_id => [counter..] }
412 raw_pool_counters = cast(RawPoolCountersT,
413 query.setdefault(QUERY_RAW_POOL_COUNTERS,
414 cast(RawPoolCountersT, {})))
9f95a23c
TL
415
416 sort_by_index = OSD_PERF_QUERY_COUNTERS.index(sort_by)
417
418 # pre-sort and limit the response
419 results = []
420 for pool_id, sum_namespaces in sum_pool_counters.items():
421 if pool_id not in pool_id_map:
422 continue
20effc67 423 raw_namespaces: RawNamespacesCountersT = raw_pool_counters.get(pool_id, {})
9f95a23c
TL
424 for namespace, sum_images in sum_namespaces.items():
425 raw_images = raw_namespaces.get(namespace, {})
426 for image_id, sum_image in sum_images.items():
20effc67 427 raw_image = raw_images.get(image_id)
9f95a23c
TL
428
429 # always sort by recent IO activity
20effc67 430 results.append(((pool_id, namespace, image_id),
9f95a23c 431 self.extract_stat(sort_by_index, raw_image,
20effc67 432 sum_image)))
9f95a23c
TL
433 results = sorted(results, key=lambda x: x[1], reverse=True)[:REPORT_MAX_RESULTS]
434
435 # build the report in sorted order
20effc67 436 pool_descriptors: Dict[str, int] = {}
9f95a23c
TL
437 counters = []
438 for key, _ in results:
439 pool_id = key[0]
440 pool_name = pool_id_map[pool_id]
441
442 namespace = key[1]
443 image_id = key[2]
444 image_names = self.image_name_cache.get((pool_id, namespace), {})
445 image_name = image_names[image_id]
446
447 raw_namespaces = raw_pool_counters.get(pool_id, {})
448 raw_images = raw_namespaces.get(namespace, {})
20effc67 449 raw_image = raw_images.get(image_id)
9f95a23c
TL
450
451 sum_namespaces = sum_pool_counters[pool_id]
452 sum_images = sum_namespaces[namespace]
453 sum_image = sum_images.get(image_id, [])
454
455 pool_descriptor = pool_name
456 if namespace:
457 pool_descriptor += "/{}".format(namespace)
458 pool_index = pool_descriptors.setdefault(pool_descriptor,
459 len(pool_descriptors))
460 image_descriptor = "{}/{}".format(pool_index, image_name)
461 data = [extract_data(i, raw_image, sum_image)
462 for i in range(len(OSD_PERF_QUERY_COUNTERS))]
463
464 # skip if no data to report
465 if data == [0 for i in range(len(OSD_PERF_QUERY_COUNTERS))]:
466 continue
467
468 counters.append({image_descriptor: data})
469
470 return {idx: descriptor for descriptor, idx
471 in pool_descriptors.items()}, \
472 counters
473
20effc67
TL
474 def get_perf_data(self,
475 report: str,
476 pool_spec: Optional[str],
477 sort_by: str,
478 extract_data: ExtractDataFuncT) -> Tuple[int, str, str]:
9f95a23c
TL
479 self.log.debug("get_perf_{}s: pool_spec={}, sort_by={}".format(
480 report, pool_spec, sort_by))
481 self.scrub_expired_queries()
482
483 pool_key = extract_pool_key(pool_spec)
484 authorize_request(self.module, pool_key[0], pool_key[1])
9f95a23c
TL
485 user_query = self.register_query(pool_key)
486
487 now = datetime.now()
488 pool_descriptors, counters = self.generate_report(
489 user_query, sort_by, extract_data)
490
491 report = {
492 'timestamp': time.mktime(now.timetuple()),
493 '{}_descriptors'.format(report): OSD_PERF_QUERY_COUNTERS,
494 'pool_descriptors': pool_descriptors,
495 '{}s'.format(report): counters
496 }
497
498 return 0, json.dumps(report), ""
499
20effc67
TL
500 def get_perf_stats(self,
501 pool_spec: Optional[str],
502 sort_by: str) -> Tuple[int, str, str]:
9f95a23c
TL
503 return self.get_perf_data(
504 "stat", pool_spec, sort_by, self.extract_stat)
505
20effc67
TL
506 def get_perf_counters(self,
507 pool_spec: Optional[str],
508 sort_by: str) -> Tuple[int, str, str]:
9f95a23c
TL
509 return self.get_perf_data(
510 "counter", pool_spec, sort_by, self.extract_counter)