]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/mgr_module.py
import ceph pacific 16.2.5
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
CommitLineData
3efd9988 1import ceph_module # noqa
3efd9988 2
f67539c2
TL
3from typing import cast, Tuple, Any, Dict, Generic, Optional, Callable, List, \
4 Mapping, NamedTuple, Sequence, Union, TYPE_CHECKING
5if TYPE_CHECKING:
6 import sys
7 if sys.version_info >= (3, 8):
8 from typing import Literal
9 else:
10 from typing_extensions import Literal
11
12import inspect
7c673cae 13import logging
9f95a23c 14import errno
f67539c2 15import functools
11fdf7f2 16import json
7c673cae 17import threading
f67539c2
TL
18from collections import defaultdict
19from enum import IntEnum
11fdf7f2 20import rados
81eedcae 21import re
b3b6e05e 22import socket
f67539c2 23import sys
11fdf7f2 24import time
f67539c2 25from ceph_argparse import CephArgtype
f6b5b4d7 26from mgr_util import profile_method
11fdf7f2 27
f67539c2
TL
28if sys.version_info >= (3, 8):
29 from typing import get_args, get_origin
30else:
31 def get_args(tp):
32 if tp is Generic:
33 return tp
34 else:
35 return getattr(tp, '__args__', ())
36
37 def get_origin(tp):
38 return getattr(tp, '__origin__', None)
39
40
cd265ab1
TL
41ERROR_MSG_EMPTY_INPUT_FILE = 'Empty content: please add a password/secret to the file.'
42ERROR_MSG_NO_INPUT_FILE = 'Please specify the file containing the password/secret with "-i" option.'
f6b5b4d7 43# Full list of strings in "osd_types.cc:pg_state_string()"
11fdf7f2
TL
44PG_STATES = [
45 "active",
46 "clean",
47 "down",
48 "recovery_unfound",
49 "backfill_unfound",
50 "scrubbing",
51 "degraded",
52 "inconsistent",
53 "peering",
54 "repair",
55 "recovering",
56 "forced_recovery",
57 "backfill_wait",
58 "incomplete",
59 "stale",
60 "remapped",
61 "deep",
62 "backfilling",
63 "forced_backfill",
64 "backfill_toofull",
65 "recovery_wait",
66 "recovery_toofull",
67 "undersized",
68 "activating",
69 "peered",
70 "snaptrim",
71 "snaptrim_wait",
72 "snaptrim_error",
73 "creating",
f6b5b4d7
TL
74 "unknown",
75 "premerge",
76 "failed_repair",
77 "laggy",
78 "wait",
79]
3efd9988
FG
80
81
7c673cae
FG
82class CommandResult(object):
83 """
84 Use with MgrModule.send_command
85 """
11fdf7f2 86
f67539c2 87 def __init__(self, tag: Optional[str] = None):
7c673cae
FG
88 self.ev = threading.Event()
89 self.outs = ""
90 self.outb = ""
91 self.r = 0
92
93 # This is just a convenience for notifications from
94 # C++ land, to avoid passing addresses around in messages.
11fdf7f2 95 self.tag = tag if tag else ""
7c673cae 96
f67539c2 97 def complete(self, r: int, outb: str, outs: str) -> None:
7c673cae
FG
98 self.r = r
99 self.outb = outb
100 self.outs = outs
101 self.ev.set()
102
f67539c2 103 def wait(self) -> Tuple[int, str, str]:
7c673cae
FG
104 self.ev.wait()
105 return self.r, self.outb, self.outs
106
107
f67539c2
TL
108class HandleCommandResult(NamedTuple):
109 """
110 Tuple containing the result of `handle_command()`
11fdf7f2 111
f67539c2 112 Only write to stderr if there is an error, or in extraordinary circumstances
11fdf7f2 113
f67539c2
TL
114 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
115 is critical information to include there.
11fdf7f2 116
f67539c2
TL
117 Everything programmatically consumable should be put on stdout
118 """
119 retval: int = 0 # return code. E.g. 0 or -errno.EINVAL
120 stdout: str = "" # data of this result.
121 stderr: str = "" # Typically used for error messages.
11fdf7f2
TL
122
123
e306af50
TL
124class MonCommandFailed(RuntimeError): pass
125
126
3efd9988 127class OSDMap(ceph_module.BasePyOSDMap):
f67539c2 128 def get_epoch(self) -> int:
3efd9988
FG
129 return self._get_epoch()
130
f67539c2 131 def get_crush_version(self) -> int:
3efd9988
FG
132 return self._get_crush_version()
133
f67539c2 134 def dump(self) -> Dict[str, Any]:
3efd9988
FG
135 return self._dump()
136
f67539c2 137 def get_pools(self) -> Dict[int, Dict[str, Any]]:
11fdf7f2
TL
138 # FIXME: efficient implementation
139 d = self._dump()
140 return dict([(p['pool'], p) for p in d['pools']])
141
f67539c2 142 def get_pools_by_name(self) -> Dict[str, Dict[str, Any]]:
11fdf7f2
TL
143 # FIXME: efficient implementation
144 d = self._dump()
145 return dict([(p['pool_name'], p) for p in d['pools']])
146
f67539c2 147 def new_incremental(self) -> 'OSDMapIncremental':
3efd9988
FG
148 return self._new_incremental()
149
f67539c2 150 def apply_incremental(self, inc: 'OSDMapIncremental') -> 'OSDMap':
3efd9988
FG
151 return self._apply_incremental(inc)
152
f67539c2 153 def get_crush(self) -> 'CRUSHMap':
3efd9988
FG
154 return self._get_crush()
155
f67539c2 156 def get_pools_by_take(self, take: int) -> List[int]:
3efd9988
FG
157 return self._get_pools_by_take(take).get('pools', [])
158
f67539c2
TL
159 def calc_pg_upmaps(self, inc: 'OSDMapIncremental',
160 max_deviation: int,
161 max_iterations: int = 10,
162 pools: Optional[List[str]] = None) -> int:
11fdf7f2
TL
163 if pools is None:
164 pools = []
3efd9988
FG
165 return self._calc_pg_upmaps(
166 inc,
167 max_deviation, max_iterations, pools)
168
f67539c2 169 def map_pool_pgs_up(self, poolid: int) -> List[int]:
3efd9988
FG
170 return self._map_pool_pgs_up(poolid)
171
f67539c2 172 def pg_to_up_acting_osds(self, pool_id: int, ps: int) -> Dict[str, Any]:
11fdf7f2
TL
173 return self._pg_to_up_acting_osds(pool_id, ps)
174
f67539c2 175 def pool_raw_used_rate(self, pool_id: int) -> float:
11fdf7f2
TL
176 return self._pool_raw_used_rate(pool_id)
177
f67539c2 178 def get_ec_profile(self, name: str) -> Optional[List[Dict[str, str]]]:
11fdf7f2
TL
179 # FIXME: efficient implementation
180 d = self._dump()
181 return d['erasure_code_profiles'].get(name, None)
182
f67539c2 183 def get_require_osd_release(self) -> str:
9f95a23c
TL
184 d = self._dump()
185 return d['require_osd_release']
186
11fdf7f2 187
3efd9988 188class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
f67539c2 189 def get_epoch(self) -> int:
3efd9988
FG
190 return self._get_epoch()
191
f67539c2 192 def dump(self) -> Dict[str, Any]:
3efd9988
FG
193 return self._dump()
194
f67539c2 195 def set_osd_reweights(self, weightmap: Dict[int, float]) -> None:
3efd9988
FG
196 """
197 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
198 """
199 return self._set_osd_reweights(weightmap)
200
f67539c2 201 def set_crush_compat_weight_set_weights(self, weightmap: Dict[str, float]) -> None:
3efd9988
FG
202 """
203 weightmap is a dict, int to float. devices only. e.g.,
204 { 0: 3.4, 1: 3.3, 2: 3.334 }
205 """
206 return self._set_crush_compat_weight_set_weights(weightmap)
207
11fdf7f2 208
3efd9988 209class CRUSHMap(ceph_module.BasePyCRUSH):
b32b8144 210 ITEM_NONE = 0x7fffffff
11fdf7f2 211 DEFAULT_CHOOSE_ARGS = '-1'
b32b8144 212
f67539c2 213 def dump(self) -> Dict[str, Any]:
3efd9988
FG
214 return self._dump()
215
f67539c2 216 def get_item_weight(self, item: int) -> Optional[int]:
3efd9988
FG
217 return self._get_item_weight(item)
218
f67539c2 219 def get_item_name(self, item: int) -> Optional[str]:
3efd9988
FG
220 return self._get_item_name(item)
221
f67539c2 222 def find_takes(self) -> List[int]:
3efd9988
FG
223 return self._find_takes().get('takes', [])
224
b3b6e05e
TL
225 def find_roots(self) -> List[int]:
226 return self._find_roots().get('roots', [])
227
f67539c2 228 def get_take_weight_osd_map(self, root: int) -> Dict[int, float]:
3efd9988 229 uglymap = self._get_take_weight_osd_map(root)
f67539c2 230 return {int(k): v for k, v in uglymap.get('weights', {}).items()}
11fdf7f2
TL
231
232 @staticmethod
f67539c2 233 def have_default_choose_args(dump: Dict[str, Any]) -> bool:
11fdf7f2
TL
234 return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
235
236 @staticmethod
f67539c2
TL
237 def get_default_choose_args(dump: Dict[str, Any]) -> List[Dict[str, Any]]:
238 choose_args = dump.get('choose_args')
239 assert isinstance(choose_args, dict)
240 return choose_args.get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
11fdf7f2 241
f67539c2 242 def get_rule(self, rule_name: str) -> Optional[Dict[str, Any]]:
11fdf7f2
TL
243 # TODO efficient implementation
244 for rule in self.dump()['rules']:
245 if rule_name == rule['rule_name']:
246 return rule
247
248 return None
249
f67539c2 250 def get_rule_by_id(self, rule_id: int) -> Optional[Dict[str, Any]]:
11fdf7f2
TL
251 for rule in self.dump()['rules']:
252 if rule['rule_id'] == rule_id:
253 return rule
254
255 return None
256
f67539c2 257 def get_rule_root(self, rule_name: str) -> Optional[int]:
11fdf7f2
TL
258 rule = self.get_rule(rule_name)
259 if rule is None:
260 return None
261
262 try:
f67539c2
TL
263 first_take = next(s for s in rule['steps'] if s.get('op') == 'take')
264 except StopIteration:
9f95a23c 265 logging.warning("CRUSH rule '{0}' has no 'take' step".format(
11fdf7f2
TL
266 rule_name))
267 return None
268 else:
269 return first_take['item']
270
f67539c2 271 def get_osds_under(self, root_id: int) -> List[int]:
11fdf7f2
TL
272 # TODO don't abuse dump like this
273 d = self.dump()
274 buckets = dict([(b['id'], b) for b in d['buckets']])
275
276 osd_list = []
277
f67539c2 278 def accumulate(b: Dict[str, Any]) -> None:
11fdf7f2
TL
279 for item in b['items']:
280 if item['id'] >= 0:
281 osd_list.append(item['id'])
282 else:
283 try:
284 accumulate(buckets[item['id']])
285 except KeyError:
286 pass
287
288 accumulate(buckets[root_id])
289
290 return osd_list
291
f67539c2 292 def device_class_counts(self) -> Dict[str, int]:
9f95a23c 293 result = defaultdict(int) # type: Dict[str, int]
11fdf7f2
TL
294 # TODO don't abuse dump like this
295 d = self.dump()
296 for device in d['devices']:
297 cls = device.get('class', None)
298 result[cls] += 1
299
300 return dict(result)
301
302
f67539c2
TL
303HandlerFuncType = Callable[..., Tuple[int, str, str]]
304
305
11fdf7f2 306class CLICommand(object):
9f95a23c 307 COMMANDS = {} # type: Dict[str, CLICommand]
11fdf7f2 308
f67539c2
TL
309 def __init__(self,
310 prefix: str,
311 perm: str = 'rw',
312 poll: bool = False):
11fdf7f2 313 self.prefix = prefix
11fdf7f2 314 self.perm = perm
f67539c2 315 self.poll = poll
9f95a23c 316 self.func = None # type: Optional[Callable]
f67539c2
TL
317 self.arg_spec = {} # type: Dict[str, Any]
318 self.first_default = -1
11fdf7f2 319
f67539c2
TL
320 KNOWN_ARGS = '_', 'self', 'mgr', 'inbuf', 'return'
321
322 @staticmethod
323 def load_func_metadata(f: HandlerFuncType) -> Tuple[str, Dict[str, Any], int, str]:
324 desc = inspect.getdoc(f) or ''
325 full_argspec = inspect.getfullargspec(f)
326 arg_spec = full_argspec.annotations
327 first_default = len(arg_spec)
328 if full_argspec.defaults:
329 first_default -= len(full_argspec.defaults)
330 args = []
331 for index, arg in enumerate(full_argspec.args):
332 if arg in CLICommand.KNOWN_ARGS:
333 continue
334 assert arg in arg_spec, \
335 f"'{arg}' is not annotated for {f}: {full_argspec}"
336 has_default = index >= first_default
337 args.append(CephArgtype.to_argdesc(arg_spec[arg],
338 dict(name=arg),
339 has_default))
340 return desc, arg_spec, first_default, ' '.join(args)
341
342 def store_func_metadata(self, f: HandlerFuncType) -> None:
343 self.desc, self.arg_spec, self.first_default, self.args = \
344 self.load_func_metadata(f)
345
346 def __call__(self, func: HandlerFuncType) -> HandlerFuncType:
347 self.store_func_metadata(func)
11fdf7f2
TL
348 self.func = func
349 self.COMMANDS[self.prefix] = self
350 return self.func
351
f67539c2
TL
352 def _get_arg_value(self, kwargs_switch: bool, key: str, val: Any) -> Tuple[bool, str, Any]:
353 def start_kwargs() -> bool:
354 if isinstance(val, str) and '=' in val:
355 k, v = val.split('=', 1)
356 if k in self.arg_spec:
357 return True
358 return False
359
360 if not kwargs_switch:
361 kwargs_switch = start_kwargs()
362
363 if kwargs_switch:
364 k, v = val.split('=', 1)
365 else:
366 k, v = key, val
367 return kwargs_switch, k.replace('-', '_'), v
368
369 def _collect_args_by_argspec(self, cmd_dict: Dict[str, Any]) -> Dict[str, Any]:
11fdf7f2 370 kwargs = {}
f67539c2
TL
371 kwargs_switch = False
372 for index, (name, tp) in enumerate(self.arg_spec.items()):
373 if name in CLICommand.KNOWN_ARGS:
11fdf7f2 374 continue
f67539c2
TL
375 assert self.first_default >= 0
376 raw_v = cmd_dict.get(name)
377 if index >= self.first_default:
378 if raw_v is None:
379 continue
380 kwargs_switch, k, v = self._get_arg_value(kwargs_switch,
381 name, raw_v)
382 kwargs[k] = CephArgtype.cast_to(tp, v)
383 return kwargs
384
385 def call(self,
386 mgr: Any,
387 cmd_dict: Dict[str, Any],
388 inbuf: Optional[str] = None) -> HandleCommandResult:
389 kwargs = self._collect_args_by_argspec(cmd_dict)
11fdf7f2
TL
390 if inbuf:
391 kwargs['inbuf'] = inbuf
9f95a23c 392 assert self.func
11fdf7f2
TL
393 return self.func(mgr, **kwargs)
394
f67539c2 395 def dump_cmd(self) -> Dict[str, Union[str, bool]]:
9f95a23c
TL
396 return {
397 'cmd': '{} {}'.format(self.prefix, self.args),
398 'desc': self.desc,
f67539c2
TL
399 'perm': self.perm,
400 'poll': self.poll,
9f95a23c
TL
401 }
402
11fdf7f2 403 @classmethod
f67539c2 404 def dump_cmd_list(cls) -> List[Dict[str, Union[str, bool]]]:
9f95a23c 405 return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
11fdf7f2
TL
406
407
f67539c2
TL
408def CLIReadCommand(prefix: str, poll: bool = False) -> CLICommand:
409 return CLICommand(prefix, "r", poll)
11fdf7f2
TL
410
411
f67539c2
TL
412def CLIWriteCommand(prefix: str, poll: bool = False) -> CLICommand:
413 return CLICommand(prefix, "w", poll)
11fdf7f2
TL
414
415
f67539c2
TL
416def CLICheckNonemptyFileInput(func: HandlerFuncType) -> HandlerFuncType:
417 @functools.wraps(func)
418 def check(*args: Any, **kwargs: Any) -> Tuple[int, str, str]:
419 if 'inbuf' not in kwargs:
cd265ab1
TL
420 return -errno.EINVAL, '', ERROR_MSG_NO_INPUT_FILE
421 if isinstance(kwargs['inbuf'], str):
422 # Delete new line separator at EOF (it may have been added by a text editor).
423 kwargs['inbuf'] = kwargs['inbuf'].rstrip('\r\n').rstrip('\n')
424 if not kwargs['inbuf']:
425 return -errno.EINVAL, '', ERROR_MSG_EMPTY_INPUT_FILE
426 return func(*args, **kwargs)
f67539c2 427 check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
cd265ab1
TL
428 return check
429
430
f67539c2 431def _get_localized_key(prefix: str, key: str) -> str:
11fdf7f2
TL
432 return '{}/{}'.format(prefix, key)
433
434
f67539c2
TL
435"""
436MODULE_OPTIONS types and Option Class
437"""
438if TYPE_CHECKING:
439 OptionTypeLabel = Literal[
440 'uint', 'int', 'str', 'float', 'bool', 'addr', 'addrvec', 'uuid', 'size', 'secs']
441
442
443# common/options.h: value_t
444OptionValue = Optional[Union[bool, int, float, str]]
11fdf7f2 445
11fdf7f2 446
f67539c2
TL
447class Option(Dict):
448 """
449 Helper class to declare options for MODULE_OPTIONS list.
450 TODO: Replace with typing.TypedDict when in python_version >= 3.8
11fdf7f2
TL
451 """
452
453 def __init__(
f67539c2
TL
454 self,
455 name: str,
456 default: OptionValue = None,
457 type: 'OptionTypeLabel' = 'str',
458 desc: Optional[str] = None,
459 long_desc: Optional[str] = None,
460 min: OptionValue = None,
461 max: OptionValue = None,
462 enum_allowed: Optional[List[str]] = None,
463 tags: Optional[List[str]] = None,
464 see_also: Optional[List[str]] = None,
465 runtime: bool = False,
11fdf7f2
TL
466 ):
467 super(Option, self).__init__(
468 (k, v) for k, v in vars().items()
469 if k != 'self' and v is not None)
470
f67539c2 471
92f5a8d4
TL
472class Command(dict):
473 """
474 Helper class to declare options for COMMANDS list.
475
476 It also allows to specify prefix and args separately, as well as storing a
477 handler callable.
478
479 Usage:
480 >>> Command(prefix="example",
481 ... args="name=arg,type=CephInt",
482 ... perm='w',
483 ... desc="Blah")
484 {'poll': False, 'cmd': 'example name=arg,type=CephInt', 'perm': 'w', 'desc': 'Blah'}
485 """
486
487 def __init__(
488 self,
f67539c2
TL
489 prefix: str,
490 handler: HandlerFuncType,
491 perm: str = "rw",
492 poll: bool = False,
92f5a8d4 493 ):
f67539c2
TL
494 super().__init__(perm=perm,
495 poll=poll)
92f5a8d4 496 self.prefix = prefix
92f5a8d4
TL
497 self.handler = handler
498
f67539c2
TL
499 @staticmethod
500 def returns_command_result(instance: Any,
501 f: HandlerFuncType) -> Callable[..., HandleCommandResult]:
502 @functools.wraps(f)
503 def wrapper(mgr: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
504 retval, stdout, stderr = f(instance or mgr, *args, **kwargs)
505 return HandleCommandResult(retval, stdout, stderr)
506 wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined]
507 return wrapper
508
509 def register(self, instance: bool = False) -> HandlerFuncType:
92f5a8d4
TL
510 """
511 Register a CLICommand handler. It allows an instance to register bound
512 methods. In that case, the mgr instance is not passed, and it's expected
513 to be available in the class instance.
514 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
515 items.
516 """
f67539c2
TL
517 cmd = CLICommand(prefix=self.prefix, perm=self['perm'])
518 return cmd(self.returns_command_result(instance, self.handler))
92f5a8d4 519
3efd9988 520
9f95a23c 521class CPlusPlusHandler(logging.Handler):
f67539c2 522 def __init__(self, module_inst: Any):
9f95a23c
TL
523 super(CPlusPlusHandler, self).__init__()
524 self._module = module_inst
525 self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
526 .format(module_inst.module_name)))
527
f67539c2 528 def emit(self, record: logging.LogRecord) -> None:
9f95a23c
TL
529 if record.levelno >= self.level:
530 self._module._ceph_log(self.format(record))
531
f67539c2 532
9f95a23c 533class ClusterLogHandler(logging.Handler):
f67539c2 534 def __init__(self, module_inst: Any):
9f95a23c
TL
535 super().__init__()
536 self._module = module_inst
537 self.setFormatter(logging.Formatter("%(message)s"))
538
f67539c2 539 def emit(self, record: logging.LogRecord) -> None:
9f95a23c 540 levelmap = {
f67539c2
TL
541 logging.DEBUG: MgrModule.ClusterLogPrio.DEBUG,
542 logging.INFO: MgrModule.ClusterLogPrio.INFO,
543 logging.WARNING: MgrModule.ClusterLogPrio.WARN,
544 logging.ERROR: MgrModule.ClusterLogPrio.ERROR,
545 logging.CRITICAL: MgrModule.ClusterLogPrio.ERROR,
9f95a23c 546 }
f67539c2 547 level = levelmap[record.levelno]
9f95a23c
TL
548 if record.levelno >= self.level:
549 self._module.cluster_log(self._module.module_name,
550 level,
551 self.format(record))
552
f67539c2 553
9f95a23c 554class FileHandler(logging.FileHandler):
f67539c2 555 def __init__(self, module_inst: Any):
9f95a23c
TL
556 path = module_inst.get_ceph_option("log_file")
557 idx = path.rfind(".log")
558 if idx != -1:
559 self.path = "{}.{}.log".format(path[:idx], module_inst.module_name)
560 else:
561 self.path = "{}.{}".format(path, module_inst.module_name)
562 super(FileHandler, self).__init__(self.path, delay=True)
563 self.setFormatter(logging.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
564
565
566class MgrModuleLoggingMixin(object):
f67539c2
TL
567 def _configure_logging(self,
568 mgr_level: str,
569 module_level: str,
570 cluster_level: str,
571 log_to_file: bool,
572 log_to_cluster: bool) -> None:
573 self._mgr_level: Optional[str] = None
574 self._module_level: Optional[str] = None
9f95a23c
TL
575 self._root_logger = logging.getLogger()
576
577 self._unconfigure_logging()
578
579 # the ceph log handler is initialized only once
580 self._mgr_log_handler = CPlusPlusHandler(self)
581 self._cluster_log_handler = ClusterLogHandler(self)
582 self._file_log_handler = FileHandler(self)
583
584 self.log_to_file = log_to_file
585 self.log_to_cluster = log_to_cluster
586
587 self._root_logger.addHandler(self._mgr_log_handler)
588 if log_to_file:
589 self._root_logger.addHandler(self._file_log_handler)
590 if log_to_cluster:
591 self._root_logger.addHandler(self._cluster_log_handler)
592
593 self._root_logger.setLevel(logging.NOTSET)
594 self._set_log_level(mgr_level, module_level, cluster_level)
595
f67539c2 596 def _unconfigure_logging(self) -> None:
9f95a23c
TL
597 # remove existing handlers:
598 rm_handlers = [
f67539c2
TL
599 h for h in self._root_logger.handlers
600 if (isinstance(h, CPlusPlusHandler) or
601 isinstance(h, FileHandler) or
602 isinstance(h, ClusterLogHandler))]
9f95a23c
TL
603 for h in rm_handlers:
604 self._root_logger.removeHandler(h)
605 self.log_to_file = False
606 self.log_to_cluster = False
607
f67539c2
TL
608 def _set_log_level(self,
609 mgr_level: str,
610 module_level: str,
611 cluster_level: str) -> None:
9f95a23c
TL
612 self._cluster_log_handler.setLevel(cluster_level.upper())
613
614 module_level = module_level.upper() if module_level else ''
615 if not self._module_level:
616 # using debug_mgr level
617 if not module_level and self._mgr_level == mgr_level:
618 # no change in module level neither in debug_mgr
619 return
620 else:
621 if self._module_level == module_level:
622 # no change in module level
623 return
624
625 if not self._module_level and not module_level:
626 level = self._ceph_log_level_to_python(mgr_level)
f67539c2
TL
627 self.getLogger().debug("setting log level based on debug_mgr: %s (%s)",
628 level, mgr_level)
9f95a23c
TL
629 elif self._module_level and not module_level:
630 level = self._ceph_log_level_to_python(mgr_level)
631 self.getLogger().warning("unsetting module log level, falling back to "
632 "debug_mgr level: %s (%s)", level, mgr_level)
633 elif module_level:
634 level = module_level
635 self.getLogger().debug("setting log level: %s", level)
636
637 self._module_level = module_level
638 self._mgr_level = mgr_level
639
640 self._mgr_log_handler.setLevel(level)
641 self._file_log_handler.setLevel(level)
642
f67539c2 643 def _enable_file_log(self) -> None:
9f95a23c
TL
644 # enable file log
645 self.getLogger().warning("enabling logging to file")
646 self.log_to_file = True
647 self._root_logger.addHandler(self._file_log_handler)
648
f67539c2 649 def _disable_file_log(self) -> None:
9f95a23c
TL
650 # disable file log
651 self.getLogger().warning("disabling logging to file")
652 self.log_to_file = False
653 self._root_logger.removeHandler(self._file_log_handler)
654
f67539c2 655 def _enable_cluster_log(self) -> None:
9f95a23c
TL
656 # enable cluster log
657 self.getLogger().warning("enabling logging to cluster")
658 self.log_to_cluster = True
659 self._root_logger.addHandler(self._cluster_log_handler)
660
f67539c2 661 def _disable_cluster_log(self) -> None:
9f95a23c
TL
662 # disable cluster log
663 self.getLogger().warning("disabling logging to cluster")
664 self.log_to_cluster = False
665 self._root_logger.removeHandler(self._cluster_log_handler)
666
f67539c2
TL
667 def _ceph_log_level_to_python(self, log_level: str) -> str:
668 if log_level:
9f95a23c 669 try:
f67539c2 670 ceph_log_level = int(log_level.split("/", 1)[0])
9f95a23c
TL
671 except ValueError:
672 ceph_log_level = 0
673 else:
674 ceph_log_level = 0
675
676 log_level = "DEBUG"
677 if ceph_log_level <= 0:
678 log_level = "CRITICAL"
679 elif ceph_log_level <= 1:
680 log_level = "WARNING"
681 elif ceph_log_level <= 4:
682 log_level = "INFO"
683 return log_level
684
f67539c2 685 def getLogger(self, name: Optional[str] = None) -> logging.Logger:
9f95a23c
TL
686 return logging.getLogger(name)
687
688
689class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
3efd9988
FG
690 """
691 Standby modules only implement a serve and shutdown method, they
692 are not permitted to implement commands and they do not receive
693 any notifications.
694
b32b8144 695 They only have access to the mgrmap (for accessing service URI info
3efd9988
FG
696 from their active peer), and to configuration settings (read only).
697 """
698
f67539c2 699 MODULE_OPTIONS: List[Option] = []
9f95a23c 700 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
11fdf7f2 701
f67539c2 702 def __init__(self, module_name: str, capsule: Any):
3efd9988
FG
703 super(MgrStandbyModule, self).__init__(capsule)
704 self.module_name = module_name
9f95a23c 705
11fdf7f2
TL
706 # see also MgrModule.__init__()
707 for o in self.MODULE_OPTIONS:
708 if 'default' in o:
709 if 'type' in o:
710 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
711 else:
712 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
3efd9988 713
f67539c2
TL
714 # mock does not return a str
715 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
716 log_level = cast(str, self.get_module_option("log_level"))
717 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
9f95a23c
TL
718 self._configure_logging(mgr_level, log_level, cluster_level,
719 False, False)
720
721 # for backwards compatibility
722 self._logger = self.getLogger()
723
f67539c2 724 def __del__(self) -> None:
cd265ab1 725 self._cleanup()
9f95a23c
TL
726 self._unconfigure_logging()
727
f67539c2 728 def _cleanup(self) -> None:
cd265ab1
TL
729 pass
730
9f95a23c 731 @classmethod
f67539c2 732 def _register_options(cls, module_name: str) -> None:
9f95a23c
TL
733 cls.MODULE_OPTIONS.append(
734 Option(name='log_level', type='str', default="", runtime=True,
735 enum_allowed=['info', 'debug', 'critical', 'error',
736 'warning', '']))
737 cls.MODULE_OPTIONS.append(
738 Option(name='log_to_file', type='bool', default=False, runtime=True))
739 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
740 cls.MODULE_OPTIONS.append(
741 Option(name='log_to_cluster', type='bool', default=False,
742 runtime=True))
743 cls.MODULE_OPTIONS.append(
744 Option(name='log_to_cluster_level', type='str', default='info',
745 runtime=True,
746 enum_allowed=['info', 'debug', 'critical', 'error',
747 'warning', '']))
3efd9988
FG
748
749 @property
f67539c2 750 def log(self) -> logging.Logger:
3efd9988
FG
751 return self._logger
752
f67539c2 753 def serve(self) -> None:
3efd9988
FG
754 """
755 The serve method is mandatory for standby modules.
756 :return:
757 """
758 raise NotImplementedError()
759
f67539c2 760 def get_mgr_id(self) -> str:
3efd9988
FG
761 return self._ceph_get_mgr_id()
762
f67539c2 763 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
b32b8144
FG
764 """
765 Retrieve the value of a persistent configuration setting
766
b32b8144 767 :param default: the default value of the config if it is not found
b32b8144 768 """
11fdf7f2 769 r = self._ceph_get_module_option(key)
b32b8144 770 if r is None:
11fdf7f2 771 return self.MODULE_OPTION_DEFAULTS.get(key, default)
b32b8144
FG
772 else:
773 return r
774
f67539c2 775 def get_ceph_option(self, key: str) -> OptionValue:
11fdf7f2
TL
776 return self._ceph_get_option(key)
777
f67539c2 778 def get_store(self, key: str) -> Optional[str]:
11fdf7f2
TL
779 """
780 Retrieve the value of a persistent KV store entry
781
782 :param key: String
783 :return: Byte string or None
784 """
785 return self._ceph_get_store(key)
3efd9988 786
f67539c2
TL
787 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
788 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
789 if r is None:
790 r = self._ceph_get_store(key)
791 if r is None:
792 r = default
793 return r
794
795 def get_active_uri(self) -> str:
3efd9988
FG
796 return self._ceph_get_active_uri()
797
b3b6e05e
TL
798 def get(self, data_name: str):
799 return self._ceph_get(data_name)
800
801 def get_mgr_ip(self) -> str:
802 ips = self.get("mgr_ips").get('ips', [])
803 if not ips:
804 return socket.gethostname()
805 return ips[0]
806
f67539c2 807 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
11fdf7f2 808 r = self._ceph_get_module_option(key, self.get_mgr_id())
3efd9988 809 if r is None:
11fdf7f2
TL
810 return self.MODULE_OPTION_DEFAULTS.get(key, default)
811 else:
812 return r
3efd9988 813
3efd9988 814
f67539c2
TL
815HealthChecksT = Mapping[str, Mapping[str, Union[int, str, Sequence[str]]]]
816# {"type": service_type, "id": service_id}
817ServiceInfoT = Dict[str, str]
818# {"hostname": hostname,
819# "ceph_version": version,
820# "services": [service_info, ..]}
821ServerInfoT = Dict[str, Union[str, List[ServiceInfoT]]]
822PerfCounterT = Dict[str, Any]
823
824
9f95a23c
TL
825class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
826 COMMANDS = [] # type: List[Any]
f67539c2 827 MODULE_OPTIONS: List[Option] = []
9f95a23c 828 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
7c673cae 829
3efd9988
FG
830 # Priority definitions for perf counters
831 PRIO_CRITICAL = 10
832 PRIO_INTERESTING = 8
833 PRIO_USEFUL = 5
834 PRIO_UNINTERESTING = 2
835 PRIO_DEBUGONLY = 0
836
837 # counter value types
838 PERFCOUNTER_TIME = 1
839 PERFCOUNTER_U64 = 2
840
841 # counter types
842 PERFCOUNTER_LONGRUNAVG = 4
843 PERFCOUNTER_COUNTER = 8
844 PERFCOUNTER_HISTOGRAM = 0x10
28e407b8 845 PERFCOUNTER_TYPE_MASK = ~3
7c673cae 846
1adf2230
AA
847 # units supported
848 BYTES = 0
849 NONE = 1
11fdf7f2
TL
850
851 # Cluster log priorities
f67539c2
TL
852 class ClusterLogPrio(IntEnum):
853 DEBUG = 0
854 INFO = 1
855 SEC = 2
856 WARN = 3
857 ERROR = 4
858
859 def __init__(self, module_name: str, py_modules_ptr: object, this_ptr: object):
3efd9988 860 self.module_name = module_name
3efd9988 861 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
7c673cae 862
11fdf7f2
TL
863 for o in self.MODULE_OPTIONS:
864 if 'default' in o:
865 if 'type' in o:
866 # we'll assume the declared type matches the
867 # supplied default value's type.
868 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
869 else:
870 # module not declaring it's type, so normalize the
871 # default value to be a string for consistent behavior
872 # with default and user-supplied option values.
873 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
874
f67539c2
TL
875 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
876 log_level = cast(str, self.get_module_option("log_level"))
877 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
9f95a23c 878 log_to_file = self.get_module_option("log_to_file")
f67539c2 879 assert isinstance(log_to_file, bool)
9f95a23c 880 log_to_cluster = self.get_module_option("log_to_cluster")
f67539c2 881 assert isinstance(log_to_cluster, bool)
9f95a23c
TL
882 self._configure_logging(mgr_level, log_level, cluster_level,
883 log_to_file, log_to_cluster)
884
885 # for backwards compatibility
886 self._logger = self.getLogger()
887
888 self._version = self._ceph_get_version()
889
890 self._perf_schema_cache = None
891
892 # Keep a librados instance for those that need it.
f67539c2 893 self._rados: Optional[rados.Rados] = None
9f95a23c 894
f67539c2 895 def __del__(self) -> None:
9f95a23c
TL
896 self._unconfigure_logging()
897
898 @classmethod
f67539c2 899 def _register_options(cls, module_name: str) -> None:
9f95a23c
TL
900 cls.MODULE_OPTIONS.append(
901 Option(name='log_level', type='str', default="", runtime=True,
902 enum_allowed=['info', 'debug', 'critical', 'error',
903 'warning', '']))
904 cls.MODULE_OPTIONS.append(
905 Option(name='log_to_file', type='bool', default=False, runtime=True))
906 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
907 cls.MODULE_OPTIONS.append(
908 Option(name='log_to_cluster', type='bool', default=False,
909 runtime=True))
910 cls.MODULE_OPTIONS.append(
911 Option(name='log_to_cluster_level', type='str', default='info',
912 runtime=True,
913 enum_allowed=['info', 'debug', 'critical', 'error',
914 'warning', '']))
3efd9988 915
11fdf7f2 916 @classmethod
f67539c2 917 def _register_commands(cls, module_name: str) -> None:
11fdf7f2 918 cls.COMMANDS.extend(CLICommand.dump_cmd_list())
7c673cae
FG
919
920 @property
f67539c2 921 def log(self) -> logging.Logger:
7c673cae
FG
922 return self._logger
923
f67539c2 924 def cluster_log(self, channel: str, priority: ClusterLogPrio, message: str) -> None:
11fdf7f2
TL
925 """
926 :param channel: The log channel. This can be 'cluster', 'audit', ...
f67539c2 927 :param priority: The log message priority.
11fdf7f2 928 :param message: The message to log.
11fdf7f2 929 """
f67539c2 930 self._ceph_cluster_log(channel, priority.value, message)
11fdf7f2 931
7c673cae 932 @property
f67539c2 933 def version(self) -> str:
7c673cae
FG
934 return self._version
935
92f5a8d4 936 @property
f67539c2 937 def release_name(self) -> str:
92f5a8d4
TL
938 """
939 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
940 :return: Returns the release name of the Ceph version in lower case.
941 :rtype: str
942 """
943 return self._ceph_get_release_name()
944
f67539c2
TL
945 def lookup_release_name(self, major: int) -> str:
946 return self._ceph_lookup_release_name(major)
947
948 def get_context(self) -> object:
3efd9988
FG
949 """
950 :return: a Python capsule containing a C++ CephContext pointer
951 """
952 return self._ceph_get_context()
953
f67539c2 954 def notify(self, notify_type: str, notify_id: str) -> None:
7c673cae
FG
955 """
956 Called by the ceph-mgr service to notify the Python plugin
957 that new state is available.
11fdf7f2
TL
958
959 :param notify_type: string indicating what kind of notification,
960 such as osd_map, mon_map, fs_map, mon_status,
961 health, pg_summary, command, service_map
962 :param notify_id: string (may be empty) that optionally specifies
963 which entity is being notified about. With
964 "command" notifications this is set to the tag
965 ``from send_command``.
966 """
967 pass
968
f67539c2 969 def _config_notify(self) -> None:
9f95a23c 970 # check logging options for changes
f67539c2
TL
971 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
972 module_level = cast(str, self.get_module_option("log_level"))
973 cluster_level = cast(str, self.get_module_option("log_to_cluster_level"))
974 assert isinstance(cluster_level, str)
9f95a23c 975 log_to_file = self.get_module_option("log_to_file", False)
f67539c2 976 assert isinstance(log_to_file, bool)
9f95a23c 977 log_to_cluster = self.get_module_option("log_to_cluster", False)
f67539c2 978 assert isinstance(log_to_cluster, bool)
9f95a23c
TL
979 self._set_log_level(mgr_level, module_level, cluster_level)
980
981 if log_to_file != self.log_to_file:
982 if log_to_file:
983 self._enable_file_log()
984 else:
985 self._disable_file_log()
986 if log_to_cluster != self.log_to_cluster:
987 if log_to_cluster:
988 self._enable_cluster_log()
989 else:
990 self._disable_cluster_log()
991
992 # call module subclass implementations
993 self.config_notify()
994
f67539c2 995 def config_notify(self) -> None:
11fdf7f2
TL
996 """
997 Called by the ceph-mgr service to notify the Python plugin
998 that the configuration may have changed. Modules will want to
999 refresh any configuration values stored in config variables.
7c673cae
FG
1000 """
1001 pass
1002
f67539c2 1003 def serve(self) -> None:
7c673cae
FG
1004 """
1005 Called by the ceph-mgr service to start any server that
1006 is provided by this Python plugin. The implementation
1007 of this function should block until ``shutdown`` is called.
1008
1009 You *must* implement ``shutdown`` if you implement ``serve``
1010 """
1011 pass
1012
f67539c2 1013 def shutdown(self) -> None:
7c673cae
FG
1014 """
1015 Called by the ceph-mgr service to request that this
1016 module drop out of its serve() function. You do not
1017 need to implement this if you do not implement serve()
1018
1019 :return: None
1020 """
11fdf7f2 1021 if self._rados:
9f95a23c 1022 addrs = self._rados.get_addrs()
11fdf7f2 1023 self._rados.shutdown()
9f95a23c 1024 self._ceph_unregister_client(addrs)
7c673cae 1025
f67539c2 1026 def get(self, data_name: str):
7c673cae 1027 """
11fdf7f2
TL
1028 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
1029
1030 :param str data_name: Valid things to fetch are osd_crush_map_text,
1031 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
1032 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
9f95a23c
TL
1033 health, mon_status, devices, device <devid>, pg_stats,
1034 pool_stats, pg_ready, osd_ping_times.
11fdf7f2
TL
1035
1036 Note:
1037 All these structures have their own JSON representations: experiment
1038 or look at the C++ ``dump()`` methods to learn about them.
7c673cae 1039 """
3efd9988 1040 return self._ceph_get(data_name)
7c673cae 1041
f67539c2 1042 def _stattype_to_str(self, stattype: int) -> str:
11fdf7f2 1043
94b18763
FG
1044 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
1045 if typeonly == 0:
1046 return 'gauge'
1047 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
1048 # this lie matches the DaemonState decoding: only val, no counts
1049 return 'counter'
1050 if typeonly == self.PERFCOUNTER_COUNTER:
1051 return 'counter'
1052 if typeonly == self.PERFCOUNTER_HISTOGRAM:
1053 return 'histogram'
11fdf7f2 1054
94b18763
FG
1055 return ''
1056
f67539c2
TL
1057 def _perfpath_to_path_labels(self, daemon: str,
1058 path: str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]:
9f95a23c
TL
1059 label_names = ("ceph_daemon",) # type: Tuple[str, ...]
1060 labels = (daemon,) # type: Tuple[str, ...]
81eedcae
TL
1061
1062 if daemon.startswith('rbd-mirror.'):
1063 match = re.match(
9f95a23c 1064 r'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
81eedcae
TL
1065 path
1066 )
1067 if match:
9f95a23c 1068 path = 'rbd_mirror_image_' + match.group(4)
81eedcae
TL
1069 pool = match.group(1)
1070 namespace = match.group(2) or ''
1071 image = match.group(3)
1072 label_names += ('pool', 'namespace', 'image')
1073 labels += (pool, namespace, image)
1074
1075 return path, label_names, labels,
1076
f67539c2 1077 def _perfvalue_to_value(self, stattype: int, value: Union[int, float]) -> Union[float, int]:
28e407b8
AA
1078 if stattype & self.PERFCOUNTER_TIME:
1079 # Convert from ns to seconds
1080 return value / 1000000000.0
1081 else:
1082 return value
1083
f67539c2 1084 def _unit_to_str(self, unit: int) -> str:
1adf2230
AA
1085 if unit == self.NONE:
1086 return "/s"
1087 elif unit == self.BYTES:
11fdf7f2 1088 return "B/s"
f67539c2
TL
1089 else:
1090 raise ValueError(f'bad unit "{unit}"')
11fdf7f2
TL
1091
1092 @staticmethod
f67539c2 1093 def to_pretty_iec(n: int) -> str:
11fdf7f2
TL
1094 for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
1095 (20, 'Mi'), (10, 'Ki')]:
1096 if n > 10 << bits:
1097 return str(n >> bits) + ' ' + suffix
1098 return str(n) + ' '
1099
1100 @staticmethod
f67539c2 1101 def get_pretty_row(elems: Sequence[str], width: int) -> str:
11fdf7f2
TL
1102 """
1103 Takes an array of elements and returns a string with those elements
1104 formatted as a table row. Useful for polling modules.
1105
1106 :param elems: the elements to be printed
1107 :param width: the width of the terminal
1108 """
1109 n = len(elems)
1110 column_width = int(width / n)
1111
1112 ret = '|'
1113 for elem in elems:
1114 ret += '{0:>{w}} |'.format(elem, w=column_width - 2)
1115
1116 return ret
1117
f67539c2 1118 def get_pretty_header(self, elems: Sequence[str], width: int) -> str:
11fdf7f2
TL
1119 """
1120 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
1121
1122 :param elems: the elements to be printed
1123 :param width: the width of the terminal
1124 """
1125 n = len(elems)
1126 column_width = int(width / n)
1127
1128 # dash line
1129 ret = '+'
1130 for i in range(0, n):
1131 ret += '-' * (column_width - 1) + '+'
1132 ret += '\n'
1133
1134 # title
1135 ret += self.get_pretty_row(elems, width)
1136 ret += '\n'
1137
1138 # dash line
1139 ret += '+'
1140 for i in range(0, n):
1141 ret += '-' * (column_width - 1) + '+'
1142 ret += '\n'
1143
1144 return ret
1145
f67539c2 1146 def get_server(self, hostname) -> ServerInfoT:
7c673cae 1147 """
11fdf7f2
TL
1148 Called by the plugin to fetch metadata about a particular hostname from
1149 ceph-mgr.
1150
1151 This is information that ceph-mgr has gleaned from the daemon metadata
1152 reported by daemons running on a particular server.
7c673cae 1153
11fdf7f2 1154 :param hostname: a hostname
7c673cae 1155 """
f67539c2 1156 return cast(ServerInfoT, self._ceph_get_server(hostname))
7c673cae 1157
f67539c2
TL
1158 def get_perf_schema(self,
1159 svc_type: str,
1160 svc_name: str) -> Dict[str,
1161 Dict[str, Dict[str, Union[str, int]]]]:
c07f9fc5
FG
1162 """
1163 Called by the plugin to fetch perf counter schema info.
1164 svc_name can be nullptr, as can svc_type, in which case
1165 they are wildcards
1166
11fdf7f2
TL
1167 :param str svc_type:
1168 :param str svc_name:
c07f9fc5
FG
1169 :return: list of dicts describing the counters requested
1170 """
3efd9988 1171 return self._ceph_get_perf_schema(svc_type, svc_name)
c07f9fc5 1172
f67539c2
TL
1173 def get_counter(self,
1174 svc_type: str,
1175 svc_name: str,
1176 path: str) -> Dict[str, List[Tuple[float, int]]]:
7c673cae 1177 """
11fdf7f2
TL
1178 Called by the plugin to fetch the latest performance counter data for a
1179 particular counter on a particular service.
7c673cae 1180
11fdf7f2
TL
1181 :param str svc_type:
1182 :param str svc_name:
1183 :param str path: a period-separated concatenation of the subsystem and the
1184 counter name, for example "mds.inodes".
f67539c2
TL
1185 :return: A dict of counter names to their values. each value is a list of
1186 of two-tuples of (timestamp, value). This may be empty if no data is
1187 available.
7c673cae 1188 """
3efd9988 1189 return self._ceph_get_counter(svc_type, svc_name, path)
7c673cae 1190
f67539c2
TL
1191 def get_latest_counter(self,
1192 svc_type: str,
1193 svc_name: str,
1194 path: str) -> Dict[str, Union[Tuple[float, int],
1195 Tuple[float, int, int]]]:
11fdf7f2
TL
1196 """
1197 Called by the plugin to fetch only the newest performance counter data
f67539c2 1198 point for a particular counter on a particular service.
11fdf7f2
TL
1199
1200 :param str svc_type:
1201 :param str svc_name:
1202 :param str path: a period-separated concatenation of the subsystem and the
1203 counter name, for example "mds.inodes".
f67539c2
TL
1204 :return: A list of two-tuples of (timestamp, value) or three-tuple of
1205 (timestamp, value, count) is returned. This may be empty if no
1206 data is available.
11fdf7f2
TL
1207 """
1208 return self._ceph_get_latest_counter(svc_type, svc_name, path)
1209
f67539c2 1210 def list_servers(self) -> List[ServerInfoT]:
7c673cae 1211 """
11fdf7f2
TL
1212 Like ``get_server``, but gives information about all servers (i.e. all
1213 unique hostnames that have been mentioned in daemon metadata)
1214
1215 :return: a list of information about all servers
1216 :rtype: list
7c673cae 1217 """
f67539c2 1218 return cast(List[ServerInfoT], self._ceph_get_server(None))
7c673cae 1219
f67539c2
TL
1220 def get_metadata(self,
1221 svc_type: str,
1222 svc_id: str,
1223 default: Optional[Dict[str, str]] = None) -> Optional[Dict[str, str]]:
7c673cae 1224 """
11fdf7f2 1225 Fetch the daemon metadata for a particular service.
7c673cae 1226
11fdf7f2
TL
1227 ceph-mgr fetches metadata asynchronously, so are windows of time during
1228 addition/removal of services where the metadata is not available to
1229 modules. ``None`` is returned if no metadata is available.
1230
1231 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1232 :param str svc_id: service id. convert OSD integer IDs to strings when
1233 calling this
1234 :rtype: dict, or None if no metadata found
7c673cae 1235 """
f6b5b4d7
TL
1236 metadata = self._ceph_get_metadata(svc_type, svc_id)
1237 if metadata is None:
1238 return default
1239 return metadata
7c673cae 1240
f67539c2 1241 def get_daemon_status(self, svc_type: str, svc_id: str) -> Dict[str, str]:
224ce89b
WB
1242 """
1243 Fetch the latest status for a particular service daemon.
1244
11fdf7f2
TL
1245 This method may return ``None`` if no status information is
1246 available, for example because the daemon hasn't fully started yet.
1247
224ce89b
WB
1248 :param svc_type: string (e.g., 'rgw')
1249 :param svc_id: string
11fdf7f2 1250 :return: dict, or None if the service is not found
224ce89b 1251 """
3efd9988 1252 return self._ceph_get_daemon_status(svc_type, svc_id)
224ce89b 1253
f67539c2 1254 def check_mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> HandleCommandResult:
e306af50
TL
1255 """
1256 Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
1257 if ``retval != 0``.
1258 """
1259
cd265ab1 1260 r = HandleCommandResult(*self.mon_command(cmd_dict, inbuf))
e306af50
TL
1261 if r.retval:
1262 raise MonCommandFailed(f'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
1263 return r
1264
f67539c2 1265 def mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
11fdf7f2
TL
1266 """
1267 Helper for modules that do simple, synchronous mon command
1268 execution.
1269
1270 See send_command for general case.
1271
1272 :return: status int, out std, err str
1273 """
1274
1275 t1 = time.time()
1276 result = CommandResult()
cd265ab1 1277 self.send_command(result, "mon", "", json.dumps(cmd_dict), "", inbuf)
11fdf7f2
TL
1278 r = result.wait()
1279 t2 = time.time()
1280
1281 self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1282 cmd_dict['prefix'], r[0], t2 - t1
1283 ))
1284
1285 return r
1286
cd265ab1
TL
1287 def send_command(
1288 self,
1289 result: CommandResult,
1290 svc_type: str,
1291 svc_id: str,
1292 command: str,
1293 tag: str,
f67539c2 1294 inbuf: Optional[str] = None) -> None:
7c673cae
FG
1295 """
1296 Called by the plugin to send a command to the mon
1297 cluster.
11fdf7f2
TL
1298
1299 :param CommandResult result: an instance of the ``CommandResult``
1300 class, defined in the same module as MgrModule. This acts as a
1301 completion and stores the output of the command. Use
1302 ``CommandResult.wait()`` if you want to block on completion.
1303 :param str svc_type:
1304 :param str svc_id:
1305 :param str command: a JSON-serialized command. This uses the same
1306 format as the ceph command line, which is a dictionary of command
1307 arguments, with the extra ``prefix`` key containing the command
1308 name itself. Consult MonCommands.h for available commands and
1309 their expected arguments.
1310 :param str tag: used for nonblocking operation: when a command
1311 completes, the ``notify()`` callback on the MgrModule instance is
1312 triggered, with notify_type set to "command", and notify_id set to
1313 the tag of the command.
cd265ab1 1314 :param str inbuf: input buffer for sending additional data.
7c673cae 1315 """
cd265ab1 1316 self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
7c673cae 1317
f67539c2 1318 def set_health_checks(self, checks: HealthChecksT) -> None:
c07f9fc5 1319 """
c07f9fc5
FG
1320 Set the module's current map of health checks. Argument is a
1321 dict of check names to info, in this form:
1322
11fdf7f2
TL
1323 ::
1324
c07f9fc5
FG
1325 {
1326 'CHECK_FOO': {
1327 'severity': 'warning', # or 'error'
1328 'summary': 'summary string',
9f95a23c 1329 'count': 4, # quantify badness
c07f9fc5
FG
1330 'detail': [ 'list', 'of', 'detail', 'strings' ],
1331 },
1332 'CHECK_BAR': {
1333 'severity': 'error',
1334 'summary': 'bars are bad',
1335 'detail': [ 'too hard' ],
1336 },
1337 }
1338
1339 :param list: dict of health check dicts
1340 """
3efd9988 1341 self._ceph_set_health_checks(checks)
c07f9fc5 1342
f67539c2
TL
1343 def _handle_command(self,
1344 inbuf: str,
1345 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1346 Tuple[int, str, str]]:
11fdf7f2
TL
1347 if cmd['prefix'] not in CLICommand.COMMANDS:
1348 return self.handle_command(inbuf, cmd)
9f95a23c 1349
11fdf7f2
TL
1350 return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
1351
f67539c2
TL
1352 def handle_command(self,
1353 inbuf: str,
1354 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1355 Tuple[int, str, str]]:
7c673cae
FG
1356 """
1357 Called by ceph-mgr to request the plugin to handle one
1358 of the commands that it declared in self.COMMANDS
1359
1360 Return a status code, an output buffer, and an
1361 output string. The output buffer is for data results,
1362 the output string is for informative text.
1363
11fdf7f2
TL
1364 :param inbuf: content of any "-i <file>" supplied to ceph cli
1365 :type inbuf: str
1366 :param cmd: from Ceph's cmdmap_t
1367 :type cmd: dict
7c673cae 1368
11fdf7f2 1369 :return: HandleCommandResult or a 3-tuple of (int, str, str)
7c673cae
FG
1370 """
1371
1372 # Should never get called if they didn't declare
1373 # any ``COMMANDS``
1374 raise NotImplementedError()
1375
f67539c2 1376 def get_mgr_id(self) -> str:
31f18b77 1377 """
11fdf7f2
TL
1378 Retrieve the name of the manager daemon where this plugin
1379 is currently being executed (i.e. the active manager).
31f18b77
FG
1380
1381 :return: str
1382 """
3efd9988 1383 return self._ceph_get_mgr_id()
31f18b77 1384
b3b6e05e
TL
1385 def get_ceph_conf_path(self) -> str:
1386 return self._ceph_get_ceph_conf_path()
1387
1388 def get_mgr_ip(self) -> str:
1389 ips = self.get("mgr_ips").get('ips', [])
1390 if not ips:
1391 return socket.gethostname()
1392 return ips[0]
1393
f67539c2 1394 def get_ceph_option(self, key: str) -> OptionValue:
11fdf7f2 1395 return self._ceph_get_option(key)
7c673cae 1396
f67539c2
TL
1397 def get_foreign_ceph_option(self, entity: str, key: str) -> OptionValue:
1398 return self._ceph_get_foreign_option(entity, key)
1399
1400 def _validate_module_option(self, key: str) -> None:
11fdf7f2
TL
1401 """
1402 Helper: don't allow get/set config callers to
1403 access config options that they didn't declare
1404 in their schema.
7c673cae 1405 """
11fdf7f2
TL
1406 if key not in [o['name'] for o in self.MODULE_OPTIONS]:
1407 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1408 format(key, self.__class__.__name__))
1409
f67539c2
TL
1410 def _get_module_option(self,
1411 key: str,
1412 default: OptionValue,
1413 localized_prefix: str = "") -> OptionValue:
11fdf7f2
TL
1414 r = self._ceph_get_module_option(self.module_name, key,
1415 localized_prefix)
3efd9988 1416 if r is None:
11fdf7f2 1417 return self.MODULE_OPTION_DEFAULTS.get(key, default)
3efd9988
FG
1418 else:
1419 return r
7c673cae 1420
f67539c2 1421 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
11fdf7f2
TL
1422 """
1423 Retrieve the value of a persistent configuration setting
11fdf7f2
TL
1424 """
1425 self._validate_module_option(key)
1426 return self._get_module_option(key, default)
1427
f67539c2
TL
1428 def get_module_option_ex(self, module: str,
1429 key: str,
1430 default: OptionValue = None) -> OptionValue:
11fdf7f2
TL
1431 """
1432 Retrieve the value of a persistent configuration setting
1433 for the specified module.
1434
f67539c2 1435 :param module: The name of the module, e.g. 'dashboard'
11fdf7f2 1436 or 'telemetry'.
f67539c2
TL
1437 :param key: The configuration key, e.g. 'server_addr'.
1438 :param default: The default value to use when the
11fdf7f2 1439 returned value is ``None``. Defaults to ``None``.
11fdf7f2
TL
1440 """
1441 if module == self.module_name:
1442 self._validate_module_option(key)
1443 r = self._ceph_get_module_option(module, key)
1444 return default if r is None else r
1445
f67539c2 1446 def get_store_prefix(self, key_prefix: str) -> Dict[str, str]:
31f18b77 1447 """
11fdf7f2
TL
1448 Retrieve a dict of KV store keys to values, where the keys
1449 have the given prefix
31f18b77 1450
11fdf7f2 1451 :param str key_prefix:
31f18b77
FG
1452 :return: str
1453 """
11fdf7f2 1454 return self._ceph_get_store_prefix(key_prefix)
31f18b77 1455
f67539c2
TL
1456 def _set_localized(self,
1457 key: str,
1458 val: Optional[str],
1459 setter: Callable[[str, Optional[str]], None]) -> None:
11fdf7f2
TL
1460 return setter(_get_localized_key(self.get_mgr_id(), key), val)
1461
f67539c2 1462 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
224ce89b
WB
1463 """
1464 Retrieve localized configuration for this ceph-mgr instance
224ce89b 1465 """
11fdf7f2
TL
1466 self._validate_module_option(key)
1467 return self._get_module_option(key, default, self.get_mgr_id())
224ce89b 1468
f67539c2 1469 def _set_module_option(self, key: str, val: Any) -> None:
eafe8130
TL
1470 return self._ceph_set_module_option(self.module_name, key,
1471 None if val is None else str(val))
224ce89b 1472
f67539c2 1473 def set_module_option(self, key: str, val: Any) -> None:
7c673cae
FG
1474 """
1475 Set the value of a persistent configuration setting
1476
11fdf7f2
TL
1477 :param str key:
1478 :type val: str | None
7c673cae 1479 """
11fdf7f2
TL
1480 self._validate_module_option(key)
1481 return self._set_module_option(key, val)
7c673cae 1482
f67539c2 1483 def set_module_option_ex(self, module: str, key: str, val: OptionValue) -> None:
11fdf7f2
TL
1484 """
1485 Set the value of a persistent configuration setting
1486 for the specified module.
1487
1488 :param str module:
1489 :param str key:
1490 :param str val:
1491 """
1492 if module == self.module_name:
1493 self._validate_module_option(key)
1494 return self._ceph_set_module_option(module, key, str(val))
1495
f67539c2 1496 def set_localized_module_option(self, key: str, val: Optional[str]) -> None:
224ce89b
WB
1497 """
1498 Set localized configuration for this ceph-mgr instance
11fdf7f2
TL
1499 :param str key:
1500 :param str val:
224ce89b
WB
1501 :return: str
1502 """
11fdf7f2
TL
1503 self._validate_module_option(key)
1504 return self._set_localized(key, val, self._set_module_option)
224ce89b 1505
f67539c2 1506 def set_store(self, key: str, val: Optional[str]) -> None:
7c673cae 1507 """
11fdf7f2
TL
1508 Set a value in this module's persistent key value store.
1509 If val is None, remove key from store
7c673cae 1510 """
11fdf7f2 1511 self._ceph_set_store(key, val)
7c673cae 1512
f67539c2 1513 def get_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
7c673cae 1514 """
11fdf7f2 1515 Get a value from this module's persistent key value store
7c673cae 1516 """
11fdf7f2
TL
1517 r = self._ceph_get_store(key)
1518 if r is None:
1519 return default
7c673cae 1520 else:
11fdf7f2
TL
1521 return r
1522
f67539c2 1523 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
11fdf7f2
TL
1524 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
1525 if r is None:
1526 r = self._ceph_get_store(key)
1527 if r is None:
1528 r = default
1529 return r
1530
f67539c2 1531 def set_localized_store(self, key: str, val: Optional[str]) -> None:
11fdf7f2 1532 return self._set_localized(key, val, self.set_store)
224ce89b 1533
f67539c2 1534 def self_test(self) -> None:
224ce89b
WB
1535 """
1536 Run a self-test on the module. Override this function and implement
1537 a best as possible self-test for (automated) testing of the module
11fdf7f2
TL
1538
1539 Indicate any failures by raising an exception. This does not have
1540 to be pretty, it's mainly for picking up regressions during
1541 development, rather than use in the field.
1542
1543 :return: None, or an advisory string for developer interest, such
1544 as a json dump of some state.
224ce89b
WB
1545 """
1546 pass
3efd9988 1547
f67539c2 1548 def get_osdmap(self) -> OSDMap:
3efd9988
FG
1549 """
1550 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1551 OSDMap.
1552 :return: OSDMap
1553 """
f67539c2 1554 return cast(OSDMap, self._ceph_get_osdmap())
3efd9988 1555
f67539c2 1556 def get_latest(self, daemon_type: str, daemon_name: str, counter: str) -> int:
11fdf7f2
TL
1557 data = self.get_latest_counter(
1558 daemon_type, daemon_name, counter)[counter]
1559 if data:
1560 return data[1]
1561 else:
1562 return 0
1563
f67539c2 1564 def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tuple[int, int]:
11fdf7f2
TL
1565 data = self.get_latest_counter(
1566 daemon_type, daemon_name, counter)[counter]
1567 if data:
f67539c2
TL
1568 # https://github.com/python/mypy/issues/1178
1569 _, value, count = cast(Tuple[float, int, int], data)
1570 return value, count
11fdf7f2
TL
1571 else:
1572 return 0, 0
1573
f6b5b4d7 1574 @profile_method()
f67539c2
TL
1575 def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL,
1576 services: Sequence[str] = ("mds", "mon", "osd",
1577 "rbd-mirror", "rgw",
1578 "tcmu-runner")) -> Dict[str, dict]:
3efd9988
FG
1579 """
1580 Return the perf counters currently known to this ceph-mgr
1581 instance, filtered by priority equal to or greater than `prio_limit`.
1582
11fdf7f2 1583 The result is a map of string to dict, associating services
3efd9988
FG
1584 (like "osd.123") with their counters. The counter
1585 dict for each service maps counter paths to a counter
1586 info structure, which is the information from
1587 the schema, plus an additional "value" member with the latest
1588 value.
1589 """
1590
9f95a23c 1591 result = defaultdict(dict) # type: Dict[str, dict]
3efd9988 1592
3efd9988 1593 for server in self.list_servers():
f67539c2 1594 for service in cast(List[ServiceInfoT], server['services']):
11fdf7f2 1595 if service['type'] not in services:
3efd9988
FG
1596 continue
1597
f67539c2
TL
1598 schemas = self.get_perf_schema(service['type'], service['id'])
1599 if not schemas:
e306af50 1600 self.log.warning("No perf counter schema for {0}.{1}".format(
3efd9988
FG
1601 service['type'], service['id']
1602 ))
1603 continue
1604
1605 # Value is returned in a potentially-multi-service format,
1606 # get just the service we're asking about
11fdf7f2
TL
1607 svc_full_name = "{0}.{1}".format(
1608 service['type'], service['id'])
f67539c2 1609 schema = schemas[svc_full_name]
3efd9988
FG
1610
1611 # Populate latest values
1612 for counter_path, counter_schema in schema.items():
1613 # self.log.debug("{0}: {1}".format(
1614 # counter_path, json.dumps(counter_schema)
1615 # ))
f67539c2
TL
1616 priority = counter_schema['priority']
1617 assert isinstance(priority, int)
1618 if priority < prio_limit:
3efd9988
FG
1619 continue
1620
f67539c2
TL
1621 tp = counter_schema['type']
1622 assert isinstance(tp, int)
28e407b8 1623 counter_info = dict(counter_schema)
28e407b8 1624 # Also populate count for the long running avgs
f67539c2 1625 if tp & self.PERFCOUNTER_LONGRUNAVG:
11fdf7f2 1626 v, c = self.get_latest_avg(
28e407b8
AA
1627 service['type'],
1628 service['id'],
1629 counter_path
1630 )
1631 counter_info['value'], counter_info['count'] = v, c
1632 result[svc_full_name][counter_path] = counter_info
1633 else:
11fdf7f2 1634 counter_info['value'] = self.get_latest(
28e407b8
AA
1635 service['type'],
1636 service['id'],
1637 counter_path
1638 )
1639
3efd9988
FG
1640 result[svc_full_name][counter_path] = counter_info
1641
1642 self.log.debug("returning {0} counter".format(len(result)))
1643
1644 return result
1645
f67539c2 1646 def set_uri(self, uri: str) -> None:
3efd9988
FG
1647 """
1648 If the module exposes a service, then call this to publish the
1649 address once it is available.
1650
1651 :return: a string
1652 """
1653 return self._ceph_set_uri(uri)
94b18763 1654
f67539c2
TL
1655 def set_device_wear_level(self, devid: str, wear_level: float) -> None:
1656 return self._ceph_set_device_wear_level(devid, wear_level)
1657
1658 def have_mon_connection(self) -> bool:
94b18763
FG
1659 """
1660 Check whether this ceph-mgr daemon has an open connection
1661 to a monitor. If it doesn't, then it's likely that the
1662 information we have about the cluster is out of date,
1663 and/or the monitor cluster is down.
1664 """
1665
28e407b8 1666 return self._ceph_have_mon_connection()
11fdf7f2 1667
f67539c2
TL
1668 def update_progress_event(self,
1669 evid: str,
1670 desc: str,
1671 progress: float,
1672 add_to_ceph_s: bool) -> None:
1673 return self._ceph_update_progress_event(evid, desc, progress, add_to_ceph_s)
11fdf7f2 1674
f67539c2
TL
1675 def complete_progress_event(self, evid: str) -> None:
1676 return self._ceph_complete_progress_event(evid)
11fdf7f2 1677
f67539c2 1678 def clear_all_progress_events(self) -> None:
11fdf7f2
TL
1679 return self._ceph_clear_all_progress_events()
1680
1681 @property
f67539c2 1682 def rados(self) -> rados.Rados:
11fdf7f2
TL
1683 """
1684 A librados instance to be shared by any classes within
1685 this mgr module that want one.
1686 """
1687 if self._rados:
1688 return self._rados
1689
1690 ctx_capsule = self.get_context()
1691 self._rados = rados.Rados(context=ctx_capsule)
1692 self._rados.connect()
9f95a23c 1693 self._ceph_register_client(self._rados.get_addrs())
11fdf7f2
TL
1694 return self._rados
1695
1696 @staticmethod
f67539c2 1697 def can_run() -> Tuple[bool, str]:
11fdf7f2
TL
1698 """
1699 Implement this function to report whether the module's dependencies
1700 are met. For example, if the module needs to import a particular
1701 dependency to work, then use a try/except around the import at
1702 file scope, and then report here if the import failed.
1703
1704 This will be called in a blocking way from the C++ code, so do not
1705 do any I/O that could block in this function.
1706
1707 :return a 2-tuple consisting of a boolean and explanatory string
1708 """
1709
1710 return True, ""
1711
f67539c2 1712 def remote(self, module_name: str, method_name: str, *args: Any, **kwargs: Any) -> Any:
11fdf7f2
TL
1713 """
1714 Invoke a method on another module. All arguments, and the return
1715 value from the other module must be serializable.
1716
1717 Limitation: Do not import any modules within the called method.
1718 Otherwise you will get an error in Python 2::
1719
1720 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
1721
1722
1723
1724 :param module_name: Name of other module. If module isn't loaded,
1725 an ImportError exception is raised.
1726 :param method_name: Method name. If it does not exist, a NameError
1727 exception is raised.
1728 :param args: Argument tuple
1729 :param kwargs: Keyword argument dict
1730 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
1731 :raises ImportError: No such module
1732 """
1733 return self._ceph_dispatch_remote(module_name, method_name,
1734 args, kwargs)
1735
f67539c2 1736 def add_osd_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
11fdf7f2
TL
1737 """
1738 Register an OSD perf query. Argument is a
1739 dict of the query parameters, in this form:
1740
1741 ::
1742
1743 {
1744 'key_descriptor': [
1745 {'type': subkey_type, 'regex': regex_pattern},
1746 ...
1747 ],
1748 'performance_counter_descriptors': [
1749 list, of, descriptor, types
1750 ],
1751 'limit': {'order_by': performance_counter_type, 'max_count': n},
1752 }
1753
1754 Valid subkey types:
1755 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
1756 'pg_id', 'object_name', 'snap_id'
1757 Valid performance counter types:
1758 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
1759 'latency', 'write_latency', 'read_latency'
1760
1761 :param object query: query
1762 :rtype: int (query id)
1763 """
1764 return self._ceph_add_osd_perf_query(query)
1765
f67539c2 1766 def remove_osd_perf_query(self, query_id: int) -> None:
11fdf7f2
TL
1767 """
1768 Unregister an OSD perf query.
1769
1770 :param int query_id: query ID
1771 """
1772 return self._ceph_remove_osd_perf_query(query_id)
1773
f67539c2 1774 def get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
11fdf7f2
TL
1775 """
1776 Get stats collected for an OSD perf query.
1777
1778 :param int query_id: query ID
1779 """
1780 return self._ceph_get_osd_perf_counters(query_id)
494da23a 1781
f67539c2
TL
1782 def add_mds_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
1783 """
1784 Register an MDS perf query. Argument is a
1785 dict of the query parameters, in this form:
1786
1787 ::
1788
1789 {
1790 'key_descriptor': [
1791 {'type': subkey_type, 'regex': regex_pattern},
1792 ...
1793 ],
1794 'performance_counter_descriptors': [
1795 list, of, descriptor, types
1796 ],
1797 }
1798
1799 NOTE: 'limit' and 'order_by' are not supported (yet).
1800
1801 Valid subkey types:
1802 'mds_rank', 'client_id'
1803 Valid performance counter types:
1804 'cap_hit_metric'
1805
1806 :param object query: query
1807 :rtype: int (query id)
1808 """
1809 return self._ceph_add_mds_perf_query(query)
1810
1811 def remove_mds_perf_query(self, query_id: int) -> None:
1812 """
1813 Unregister an MDS perf query.
1814
1815 :param int query_id: query ID
1816 """
1817 return self._ceph_remove_mds_perf_query(query_id)
1818
1819 def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
1820 """
1821 Get stats collected for an MDS perf query.
1822
1823 :param int query_id: query ID
1824 """
1825 return self._ceph_get_mds_perf_counters(query_id)
1826
1827 def is_authorized(self, arguments: Dict[str, str]) -> bool:
92f5a8d4
TL
1828 """
1829 Verifies that the current session caps permit executing the py service
1830 or current module with the provided arguments. This provides a generic
1831 way to allow modules to restrict by more fine-grained controls (e.g.
1832 pools).
1833
1834 :param arguments: dict of key/value arguments to test
1835 """
1836 return self._ceph_is_authorized(arguments)