]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/mgr_module.py
import ceph 16.2.6
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
CommitLineData
3efd9988 1import ceph_module # noqa
3efd9988 2
f67539c2
TL
3from typing import cast, Tuple, Any, Dict, Generic, Optional, Callable, List, \
4 Mapping, NamedTuple, Sequence, Union, TYPE_CHECKING
5if TYPE_CHECKING:
6 import sys
7 if sys.version_info >= (3, 8):
8 from typing import Literal
9 else:
10 from typing_extensions import Literal
11
12import inspect
7c673cae 13import logging
9f95a23c 14import errno
f67539c2 15import functools
11fdf7f2 16import json
522d829b 17import subprocess
7c673cae 18import threading
f67539c2
TL
19from collections import defaultdict
20from enum import IntEnum
11fdf7f2 21import rados
81eedcae 22import re
b3b6e05e 23import socket
f67539c2 24import sys
11fdf7f2 25import time
f67539c2 26from ceph_argparse import CephArgtype
f6b5b4d7 27from mgr_util import profile_method
11fdf7f2 28
f67539c2
TL
29if sys.version_info >= (3, 8):
30 from typing import get_args, get_origin
31else:
32 def get_args(tp):
33 if tp is Generic:
34 return tp
35 else:
36 return getattr(tp, '__args__', ())
37
38 def get_origin(tp):
39 return getattr(tp, '__origin__', None)
40
41
522d829b
TL
42ERROR_MSG_EMPTY_INPUT_FILE = 'Empty input file'
43ERROR_MSG_NO_INPUT_FILE = 'Input file not specified'
f6b5b4d7 44# Full list of strings in "osd_types.cc:pg_state_string()"
11fdf7f2
TL
45PG_STATES = [
46 "active",
47 "clean",
48 "down",
49 "recovery_unfound",
50 "backfill_unfound",
51 "scrubbing",
52 "degraded",
53 "inconsistent",
54 "peering",
55 "repair",
56 "recovering",
57 "forced_recovery",
58 "backfill_wait",
59 "incomplete",
60 "stale",
61 "remapped",
62 "deep",
63 "backfilling",
64 "forced_backfill",
65 "backfill_toofull",
66 "recovery_wait",
67 "recovery_toofull",
68 "undersized",
69 "activating",
70 "peered",
71 "snaptrim",
72 "snaptrim_wait",
73 "snaptrim_error",
74 "creating",
f6b5b4d7
TL
75 "unknown",
76 "premerge",
77 "failed_repair",
78 "laggy",
79 "wait",
80]
3efd9988
FG
81
82
7c673cae
FG
83class CommandResult(object):
84 """
85 Use with MgrModule.send_command
86 """
11fdf7f2 87
f67539c2 88 def __init__(self, tag: Optional[str] = None):
7c673cae
FG
89 self.ev = threading.Event()
90 self.outs = ""
91 self.outb = ""
92 self.r = 0
93
94 # This is just a convenience for notifications from
95 # C++ land, to avoid passing addresses around in messages.
11fdf7f2 96 self.tag = tag if tag else ""
7c673cae 97
f67539c2 98 def complete(self, r: int, outb: str, outs: str) -> None:
7c673cae
FG
99 self.r = r
100 self.outb = outb
101 self.outs = outs
102 self.ev.set()
103
f67539c2 104 def wait(self) -> Tuple[int, str, str]:
7c673cae
FG
105 self.ev.wait()
106 return self.r, self.outb, self.outs
107
108
f67539c2
TL
109class HandleCommandResult(NamedTuple):
110 """
111 Tuple containing the result of `handle_command()`
11fdf7f2 112
f67539c2 113 Only write to stderr if there is an error, or in extraordinary circumstances
11fdf7f2 114
f67539c2
TL
115 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
116 is critical information to include there.
11fdf7f2 117
f67539c2
TL
118 Everything programmatically consumable should be put on stdout
119 """
120 retval: int = 0 # return code. E.g. 0 or -errno.EINVAL
121 stdout: str = "" # data of this result.
122 stderr: str = "" # Typically used for error messages.
11fdf7f2
TL
123
124
e306af50
TL
125class MonCommandFailed(RuntimeError): pass
126
127
3efd9988 128class OSDMap(ceph_module.BasePyOSDMap):
f67539c2 129 def get_epoch(self) -> int:
3efd9988
FG
130 return self._get_epoch()
131
f67539c2 132 def get_crush_version(self) -> int:
3efd9988
FG
133 return self._get_crush_version()
134
f67539c2 135 def dump(self) -> Dict[str, Any]:
3efd9988
FG
136 return self._dump()
137
f67539c2 138 def get_pools(self) -> Dict[int, Dict[str, Any]]:
11fdf7f2
TL
139 # FIXME: efficient implementation
140 d = self._dump()
141 return dict([(p['pool'], p) for p in d['pools']])
142
f67539c2 143 def get_pools_by_name(self) -> Dict[str, Dict[str, Any]]:
11fdf7f2
TL
144 # FIXME: efficient implementation
145 d = self._dump()
146 return dict([(p['pool_name'], p) for p in d['pools']])
147
f67539c2 148 def new_incremental(self) -> 'OSDMapIncremental':
3efd9988
FG
149 return self._new_incremental()
150
f67539c2 151 def apply_incremental(self, inc: 'OSDMapIncremental') -> 'OSDMap':
3efd9988
FG
152 return self._apply_incremental(inc)
153
f67539c2 154 def get_crush(self) -> 'CRUSHMap':
3efd9988
FG
155 return self._get_crush()
156
f67539c2 157 def get_pools_by_take(self, take: int) -> List[int]:
3efd9988
FG
158 return self._get_pools_by_take(take).get('pools', [])
159
f67539c2
TL
160 def calc_pg_upmaps(self, inc: 'OSDMapIncremental',
161 max_deviation: int,
162 max_iterations: int = 10,
163 pools: Optional[List[str]] = None) -> int:
11fdf7f2
TL
164 if pools is None:
165 pools = []
3efd9988
FG
166 return self._calc_pg_upmaps(
167 inc,
168 max_deviation, max_iterations, pools)
169
f67539c2 170 def map_pool_pgs_up(self, poolid: int) -> List[int]:
3efd9988
FG
171 return self._map_pool_pgs_up(poolid)
172
f67539c2 173 def pg_to_up_acting_osds(self, pool_id: int, ps: int) -> Dict[str, Any]:
11fdf7f2
TL
174 return self._pg_to_up_acting_osds(pool_id, ps)
175
f67539c2 176 def pool_raw_used_rate(self, pool_id: int) -> float:
11fdf7f2
TL
177 return self._pool_raw_used_rate(pool_id)
178
f67539c2 179 def get_ec_profile(self, name: str) -> Optional[List[Dict[str, str]]]:
11fdf7f2
TL
180 # FIXME: efficient implementation
181 d = self._dump()
182 return d['erasure_code_profiles'].get(name, None)
183
f67539c2 184 def get_require_osd_release(self) -> str:
9f95a23c
TL
185 d = self._dump()
186 return d['require_osd_release']
187
11fdf7f2 188
3efd9988 189class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
f67539c2 190 def get_epoch(self) -> int:
3efd9988
FG
191 return self._get_epoch()
192
f67539c2 193 def dump(self) -> Dict[str, Any]:
3efd9988
FG
194 return self._dump()
195
f67539c2 196 def set_osd_reweights(self, weightmap: Dict[int, float]) -> None:
3efd9988
FG
197 """
198 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
199 """
200 return self._set_osd_reweights(weightmap)
201
f67539c2 202 def set_crush_compat_weight_set_weights(self, weightmap: Dict[str, float]) -> None:
3efd9988
FG
203 """
204 weightmap is a dict, int to float. devices only. e.g.,
205 { 0: 3.4, 1: 3.3, 2: 3.334 }
206 """
207 return self._set_crush_compat_weight_set_weights(weightmap)
208
11fdf7f2 209
3efd9988 210class CRUSHMap(ceph_module.BasePyCRUSH):
b32b8144 211 ITEM_NONE = 0x7fffffff
11fdf7f2 212 DEFAULT_CHOOSE_ARGS = '-1'
b32b8144 213
f67539c2 214 def dump(self) -> Dict[str, Any]:
3efd9988
FG
215 return self._dump()
216
f67539c2 217 def get_item_weight(self, item: int) -> Optional[int]:
3efd9988
FG
218 return self._get_item_weight(item)
219
f67539c2 220 def get_item_name(self, item: int) -> Optional[str]:
3efd9988
FG
221 return self._get_item_name(item)
222
f67539c2 223 def find_takes(self) -> List[int]:
3efd9988
FG
224 return self._find_takes().get('takes', [])
225
b3b6e05e
TL
226 def find_roots(self) -> List[int]:
227 return self._find_roots().get('roots', [])
228
f67539c2 229 def get_take_weight_osd_map(self, root: int) -> Dict[int, float]:
3efd9988 230 uglymap = self._get_take_weight_osd_map(root)
f67539c2 231 return {int(k): v for k, v in uglymap.get('weights', {}).items()}
11fdf7f2
TL
232
233 @staticmethod
f67539c2 234 def have_default_choose_args(dump: Dict[str, Any]) -> bool:
11fdf7f2
TL
235 return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
236
237 @staticmethod
f67539c2
TL
238 def get_default_choose_args(dump: Dict[str, Any]) -> List[Dict[str, Any]]:
239 choose_args = dump.get('choose_args')
240 assert isinstance(choose_args, dict)
241 return choose_args.get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
11fdf7f2 242
f67539c2 243 def get_rule(self, rule_name: str) -> Optional[Dict[str, Any]]:
11fdf7f2
TL
244 # TODO efficient implementation
245 for rule in self.dump()['rules']:
246 if rule_name == rule['rule_name']:
247 return rule
248
249 return None
250
f67539c2 251 def get_rule_by_id(self, rule_id: int) -> Optional[Dict[str, Any]]:
11fdf7f2
TL
252 for rule in self.dump()['rules']:
253 if rule['rule_id'] == rule_id:
254 return rule
255
256 return None
257
f67539c2 258 def get_rule_root(self, rule_name: str) -> Optional[int]:
11fdf7f2
TL
259 rule = self.get_rule(rule_name)
260 if rule is None:
261 return None
262
263 try:
f67539c2
TL
264 first_take = next(s for s in rule['steps'] if s.get('op') == 'take')
265 except StopIteration:
9f95a23c 266 logging.warning("CRUSH rule '{0}' has no 'take' step".format(
11fdf7f2
TL
267 rule_name))
268 return None
269 else:
270 return first_take['item']
271
f67539c2 272 def get_osds_under(self, root_id: int) -> List[int]:
11fdf7f2
TL
273 # TODO don't abuse dump like this
274 d = self.dump()
275 buckets = dict([(b['id'], b) for b in d['buckets']])
276
277 osd_list = []
278
f67539c2 279 def accumulate(b: Dict[str, Any]) -> None:
11fdf7f2
TL
280 for item in b['items']:
281 if item['id'] >= 0:
282 osd_list.append(item['id'])
283 else:
284 try:
285 accumulate(buckets[item['id']])
286 except KeyError:
287 pass
288
289 accumulate(buckets[root_id])
290
291 return osd_list
292
f67539c2 293 def device_class_counts(self) -> Dict[str, int]:
9f95a23c 294 result = defaultdict(int) # type: Dict[str, int]
11fdf7f2
TL
295 # TODO don't abuse dump like this
296 d = self.dump()
297 for device in d['devices']:
298 cls = device.get('class', None)
299 result[cls] += 1
300
301 return dict(result)
302
303
f67539c2
TL
304HandlerFuncType = Callable[..., Tuple[int, str, str]]
305
306
11fdf7f2 307class CLICommand(object):
9f95a23c 308 COMMANDS = {} # type: Dict[str, CLICommand]
11fdf7f2 309
f67539c2
TL
310 def __init__(self,
311 prefix: str,
312 perm: str = 'rw',
313 poll: bool = False):
11fdf7f2 314 self.prefix = prefix
11fdf7f2 315 self.perm = perm
f67539c2 316 self.poll = poll
9f95a23c 317 self.func = None # type: Optional[Callable]
f67539c2
TL
318 self.arg_spec = {} # type: Dict[str, Any]
319 self.first_default = -1
11fdf7f2 320
f67539c2
TL
321 KNOWN_ARGS = '_', 'self', 'mgr', 'inbuf', 'return'
322
323 @staticmethod
324 def load_func_metadata(f: HandlerFuncType) -> Tuple[str, Dict[str, Any], int, str]:
325 desc = inspect.getdoc(f) or ''
326 full_argspec = inspect.getfullargspec(f)
327 arg_spec = full_argspec.annotations
328 first_default = len(arg_spec)
329 if full_argspec.defaults:
330 first_default -= len(full_argspec.defaults)
331 args = []
332 for index, arg in enumerate(full_argspec.args):
333 if arg in CLICommand.KNOWN_ARGS:
334 continue
335 assert arg in arg_spec, \
336 f"'{arg}' is not annotated for {f}: {full_argspec}"
337 has_default = index >= first_default
338 args.append(CephArgtype.to_argdesc(arg_spec[arg],
339 dict(name=arg),
340 has_default))
341 return desc, arg_spec, first_default, ' '.join(args)
342
343 def store_func_metadata(self, f: HandlerFuncType) -> None:
344 self.desc, self.arg_spec, self.first_default, self.args = \
345 self.load_func_metadata(f)
346
347 def __call__(self, func: HandlerFuncType) -> HandlerFuncType:
348 self.store_func_metadata(func)
11fdf7f2
TL
349 self.func = func
350 self.COMMANDS[self.prefix] = self
351 return self.func
352
f67539c2
TL
353 def _get_arg_value(self, kwargs_switch: bool, key: str, val: Any) -> Tuple[bool, str, Any]:
354 def start_kwargs() -> bool:
355 if isinstance(val, str) and '=' in val:
356 k, v = val.split('=', 1)
357 if k in self.arg_spec:
358 return True
359 return False
360
361 if not kwargs_switch:
362 kwargs_switch = start_kwargs()
363
364 if kwargs_switch:
365 k, v = val.split('=', 1)
366 else:
367 k, v = key, val
368 return kwargs_switch, k.replace('-', '_'), v
369
370 def _collect_args_by_argspec(self, cmd_dict: Dict[str, Any]) -> Dict[str, Any]:
11fdf7f2 371 kwargs = {}
f67539c2
TL
372 kwargs_switch = False
373 for index, (name, tp) in enumerate(self.arg_spec.items()):
374 if name in CLICommand.KNOWN_ARGS:
11fdf7f2 375 continue
f67539c2
TL
376 assert self.first_default >= 0
377 raw_v = cmd_dict.get(name)
378 if index >= self.first_default:
379 if raw_v is None:
380 continue
381 kwargs_switch, k, v = self._get_arg_value(kwargs_switch,
382 name, raw_v)
383 kwargs[k] = CephArgtype.cast_to(tp, v)
384 return kwargs
385
386 def call(self,
387 mgr: Any,
388 cmd_dict: Dict[str, Any],
389 inbuf: Optional[str] = None) -> HandleCommandResult:
390 kwargs = self._collect_args_by_argspec(cmd_dict)
11fdf7f2
TL
391 if inbuf:
392 kwargs['inbuf'] = inbuf
9f95a23c 393 assert self.func
11fdf7f2
TL
394 return self.func(mgr, **kwargs)
395
f67539c2 396 def dump_cmd(self) -> Dict[str, Union[str, bool]]:
9f95a23c
TL
397 return {
398 'cmd': '{} {}'.format(self.prefix, self.args),
399 'desc': self.desc,
f67539c2
TL
400 'perm': self.perm,
401 'poll': self.poll,
9f95a23c
TL
402 }
403
11fdf7f2 404 @classmethod
f67539c2 405 def dump_cmd_list(cls) -> List[Dict[str, Union[str, bool]]]:
9f95a23c 406 return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
11fdf7f2
TL
407
408
f67539c2
TL
409def CLIReadCommand(prefix: str, poll: bool = False) -> CLICommand:
410 return CLICommand(prefix, "r", poll)
11fdf7f2
TL
411
412
f67539c2
TL
413def CLIWriteCommand(prefix: str, poll: bool = False) -> CLICommand:
414 return CLICommand(prefix, "w", poll)
11fdf7f2
TL
415
416
522d829b
TL
417def CLICheckNonemptyFileInput(desc: str) -> Callable[[HandlerFuncType], HandlerFuncType]:
418 def CheckFileInput(func: HandlerFuncType) -> HandlerFuncType:
419 @functools.wraps(func)
420 def check(*args: Any, **kwargs: Any) -> Tuple[int, str, str]:
421 if 'inbuf' not in kwargs:
422 return -errno.EINVAL, '', f'{ERROR_MSG_NO_INPUT_FILE}: Please specify the file '\
423 f'containing {desc} with "-i" option'
424 if isinstance(kwargs['inbuf'], str):
425 # Delete new line separator at EOF (it may have been added by a text editor).
426 kwargs['inbuf'] = kwargs['inbuf'].rstrip('\r\n').rstrip('\n')
427 if not kwargs['inbuf'] or not kwargs['inbuf'].strip():
428 return -errno.EINVAL, '', f'{ERROR_MSG_EMPTY_INPUT_FILE}: Please add {desc} to '\
429 'the file'
430 return func(*args, **kwargs)
431 check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
432 return check
433 return CheckFileInput
cd265ab1
TL
434
435
f67539c2 436def _get_localized_key(prefix: str, key: str) -> str:
11fdf7f2
TL
437 return '{}/{}'.format(prefix, key)
438
439
f67539c2
TL
440"""
441MODULE_OPTIONS types and Option Class
442"""
443if TYPE_CHECKING:
444 OptionTypeLabel = Literal[
445 'uint', 'int', 'str', 'float', 'bool', 'addr', 'addrvec', 'uuid', 'size', 'secs']
446
447
448# common/options.h: value_t
449OptionValue = Optional[Union[bool, int, float, str]]
11fdf7f2 450
11fdf7f2 451
f67539c2
TL
452class Option(Dict):
453 """
454 Helper class to declare options for MODULE_OPTIONS list.
455 TODO: Replace with typing.TypedDict when in python_version >= 3.8
11fdf7f2
TL
456 """
457
458 def __init__(
f67539c2
TL
459 self,
460 name: str,
461 default: OptionValue = None,
462 type: 'OptionTypeLabel' = 'str',
463 desc: Optional[str] = None,
464 long_desc: Optional[str] = None,
465 min: OptionValue = None,
466 max: OptionValue = None,
467 enum_allowed: Optional[List[str]] = None,
468 tags: Optional[List[str]] = None,
469 see_also: Optional[List[str]] = None,
470 runtime: bool = False,
11fdf7f2
TL
471 ):
472 super(Option, self).__init__(
473 (k, v) for k, v in vars().items()
474 if k != 'self' and v is not None)
475
f67539c2 476
92f5a8d4
TL
477class Command(dict):
478 """
479 Helper class to declare options for COMMANDS list.
480
481 It also allows to specify prefix and args separately, as well as storing a
482 handler callable.
483
484 Usage:
485 >>> Command(prefix="example",
486 ... args="name=arg,type=CephInt",
487 ... perm='w',
488 ... desc="Blah")
489 {'poll': False, 'cmd': 'example name=arg,type=CephInt', 'perm': 'w', 'desc': 'Blah'}
490 """
491
492 def __init__(
493 self,
f67539c2
TL
494 prefix: str,
495 handler: HandlerFuncType,
496 perm: str = "rw",
497 poll: bool = False,
92f5a8d4 498 ):
f67539c2
TL
499 super().__init__(perm=perm,
500 poll=poll)
92f5a8d4 501 self.prefix = prefix
92f5a8d4
TL
502 self.handler = handler
503
f67539c2
TL
504 @staticmethod
505 def returns_command_result(instance: Any,
506 f: HandlerFuncType) -> Callable[..., HandleCommandResult]:
507 @functools.wraps(f)
508 def wrapper(mgr: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
509 retval, stdout, stderr = f(instance or mgr, *args, **kwargs)
510 return HandleCommandResult(retval, stdout, stderr)
511 wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined]
512 return wrapper
513
514 def register(self, instance: bool = False) -> HandlerFuncType:
92f5a8d4
TL
515 """
516 Register a CLICommand handler. It allows an instance to register bound
517 methods. In that case, the mgr instance is not passed, and it's expected
518 to be available in the class instance.
519 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
520 items.
521 """
f67539c2
TL
522 cmd = CLICommand(prefix=self.prefix, perm=self['perm'])
523 return cmd(self.returns_command_result(instance, self.handler))
92f5a8d4 524
3efd9988 525
9f95a23c 526class CPlusPlusHandler(logging.Handler):
f67539c2 527 def __init__(self, module_inst: Any):
9f95a23c
TL
528 super(CPlusPlusHandler, self).__init__()
529 self._module = module_inst
530 self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
531 .format(module_inst.module_name)))
532
f67539c2 533 def emit(self, record: logging.LogRecord) -> None:
9f95a23c
TL
534 if record.levelno >= self.level:
535 self._module._ceph_log(self.format(record))
536
f67539c2 537
9f95a23c 538class ClusterLogHandler(logging.Handler):
f67539c2 539 def __init__(self, module_inst: Any):
9f95a23c
TL
540 super().__init__()
541 self._module = module_inst
542 self.setFormatter(logging.Formatter("%(message)s"))
543
f67539c2 544 def emit(self, record: logging.LogRecord) -> None:
9f95a23c 545 levelmap = {
f67539c2
TL
546 logging.DEBUG: MgrModule.ClusterLogPrio.DEBUG,
547 logging.INFO: MgrModule.ClusterLogPrio.INFO,
548 logging.WARNING: MgrModule.ClusterLogPrio.WARN,
549 logging.ERROR: MgrModule.ClusterLogPrio.ERROR,
550 logging.CRITICAL: MgrModule.ClusterLogPrio.ERROR,
9f95a23c 551 }
f67539c2 552 level = levelmap[record.levelno]
9f95a23c
TL
553 if record.levelno >= self.level:
554 self._module.cluster_log(self._module.module_name,
555 level,
556 self.format(record))
557
f67539c2 558
9f95a23c 559class FileHandler(logging.FileHandler):
f67539c2 560 def __init__(self, module_inst: Any):
9f95a23c
TL
561 path = module_inst.get_ceph_option("log_file")
562 idx = path.rfind(".log")
563 if idx != -1:
564 self.path = "{}.{}.log".format(path[:idx], module_inst.module_name)
565 else:
566 self.path = "{}.{}".format(path, module_inst.module_name)
567 super(FileHandler, self).__init__(self.path, delay=True)
568 self.setFormatter(logging.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
569
570
571class MgrModuleLoggingMixin(object):
f67539c2
TL
572 def _configure_logging(self,
573 mgr_level: str,
574 module_level: str,
575 cluster_level: str,
576 log_to_file: bool,
577 log_to_cluster: bool) -> None:
578 self._mgr_level: Optional[str] = None
579 self._module_level: Optional[str] = None
9f95a23c
TL
580 self._root_logger = logging.getLogger()
581
582 self._unconfigure_logging()
583
584 # the ceph log handler is initialized only once
585 self._mgr_log_handler = CPlusPlusHandler(self)
586 self._cluster_log_handler = ClusterLogHandler(self)
587 self._file_log_handler = FileHandler(self)
588
589 self.log_to_file = log_to_file
590 self.log_to_cluster = log_to_cluster
591
592 self._root_logger.addHandler(self._mgr_log_handler)
593 if log_to_file:
594 self._root_logger.addHandler(self._file_log_handler)
595 if log_to_cluster:
596 self._root_logger.addHandler(self._cluster_log_handler)
597
598 self._root_logger.setLevel(logging.NOTSET)
599 self._set_log_level(mgr_level, module_level, cluster_level)
600
f67539c2 601 def _unconfigure_logging(self) -> None:
9f95a23c
TL
602 # remove existing handlers:
603 rm_handlers = [
f67539c2
TL
604 h for h in self._root_logger.handlers
605 if (isinstance(h, CPlusPlusHandler) or
606 isinstance(h, FileHandler) or
607 isinstance(h, ClusterLogHandler))]
9f95a23c
TL
608 for h in rm_handlers:
609 self._root_logger.removeHandler(h)
610 self.log_to_file = False
611 self.log_to_cluster = False
612
f67539c2
TL
613 def _set_log_level(self,
614 mgr_level: str,
615 module_level: str,
616 cluster_level: str) -> None:
9f95a23c
TL
617 self._cluster_log_handler.setLevel(cluster_level.upper())
618
619 module_level = module_level.upper() if module_level else ''
620 if not self._module_level:
621 # using debug_mgr level
622 if not module_level and self._mgr_level == mgr_level:
623 # no change in module level neither in debug_mgr
624 return
625 else:
626 if self._module_level == module_level:
627 # no change in module level
628 return
629
630 if not self._module_level and not module_level:
631 level = self._ceph_log_level_to_python(mgr_level)
f67539c2
TL
632 self.getLogger().debug("setting log level based on debug_mgr: %s (%s)",
633 level, mgr_level)
9f95a23c
TL
634 elif self._module_level and not module_level:
635 level = self._ceph_log_level_to_python(mgr_level)
636 self.getLogger().warning("unsetting module log level, falling back to "
637 "debug_mgr level: %s (%s)", level, mgr_level)
638 elif module_level:
639 level = module_level
640 self.getLogger().debug("setting log level: %s", level)
641
642 self._module_level = module_level
643 self._mgr_level = mgr_level
644
645 self._mgr_log_handler.setLevel(level)
646 self._file_log_handler.setLevel(level)
647
f67539c2 648 def _enable_file_log(self) -> None:
9f95a23c
TL
649 # enable file log
650 self.getLogger().warning("enabling logging to file")
651 self.log_to_file = True
652 self._root_logger.addHandler(self._file_log_handler)
653
f67539c2 654 def _disable_file_log(self) -> None:
9f95a23c
TL
655 # disable file log
656 self.getLogger().warning("disabling logging to file")
657 self.log_to_file = False
658 self._root_logger.removeHandler(self._file_log_handler)
659
f67539c2 660 def _enable_cluster_log(self) -> None:
9f95a23c
TL
661 # enable cluster log
662 self.getLogger().warning("enabling logging to cluster")
663 self.log_to_cluster = True
664 self._root_logger.addHandler(self._cluster_log_handler)
665
f67539c2 666 def _disable_cluster_log(self) -> None:
9f95a23c
TL
667 # disable cluster log
668 self.getLogger().warning("disabling logging to cluster")
669 self.log_to_cluster = False
670 self._root_logger.removeHandler(self._cluster_log_handler)
671
f67539c2
TL
672 def _ceph_log_level_to_python(self, log_level: str) -> str:
673 if log_level:
9f95a23c 674 try:
f67539c2 675 ceph_log_level = int(log_level.split("/", 1)[0])
9f95a23c
TL
676 except ValueError:
677 ceph_log_level = 0
678 else:
679 ceph_log_level = 0
680
681 log_level = "DEBUG"
682 if ceph_log_level <= 0:
683 log_level = "CRITICAL"
684 elif ceph_log_level <= 1:
685 log_level = "WARNING"
686 elif ceph_log_level <= 4:
687 log_level = "INFO"
688 return log_level
689
f67539c2 690 def getLogger(self, name: Optional[str] = None) -> logging.Logger:
9f95a23c
TL
691 return logging.getLogger(name)
692
693
694class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
3efd9988
FG
695 """
696 Standby modules only implement a serve and shutdown method, they
697 are not permitted to implement commands and they do not receive
698 any notifications.
699
b32b8144 700 They only have access to the mgrmap (for accessing service URI info
3efd9988
FG
701 from their active peer), and to configuration settings (read only).
702 """
703
f67539c2 704 MODULE_OPTIONS: List[Option] = []
9f95a23c 705 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
11fdf7f2 706
f67539c2 707 def __init__(self, module_name: str, capsule: Any):
3efd9988
FG
708 super(MgrStandbyModule, self).__init__(capsule)
709 self.module_name = module_name
9f95a23c 710
11fdf7f2
TL
711 # see also MgrModule.__init__()
712 for o in self.MODULE_OPTIONS:
713 if 'default' in o:
714 if 'type' in o:
715 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
716 else:
717 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
3efd9988 718
f67539c2
TL
719 # mock does not return a str
720 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
721 log_level = cast(str, self.get_module_option("log_level"))
722 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
9f95a23c
TL
723 self._configure_logging(mgr_level, log_level, cluster_level,
724 False, False)
725
726 # for backwards compatibility
727 self._logger = self.getLogger()
728
f67539c2 729 def __del__(self) -> None:
cd265ab1 730 self._cleanup()
9f95a23c
TL
731 self._unconfigure_logging()
732
f67539c2 733 def _cleanup(self) -> None:
cd265ab1
TL
734 pass
735
9f95a23c 736 @classmethod
f67539c2 737 def _register_options(cls, module_name: str) -> None:
9f95a23c
TL
738 cls.MODULE_OPTIONS.append(
739 Option(name='log_level', type='str', default="", runtime=True,
740 enum_allowed=['info', 'debug', 'critical', 'error',
741 'warning', '']))
742 cls.MODULE_OPTIONS.append(
743 Option(name='log_to_file', type='bool', default=False, runtime=True))
744 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
745 cls.MODULE_OPTIONS.append(
746 Option(name='log_to_cluster', type='bool', default=False,
747 runtime=True))
748 cls.MODULE_OPTIONS.append(
749 Option(name='log_to_cluster_level', type='str', default='info',
750 runtime=True,
751 enum_allowed=['info', 'debug', 'critical', 'error',
752 'warning', '']))
3efd9988
FG
753
754 @property
f67539c2 755 def log(self) -> logging.Logger:
3efd9988
FG
756 return self._logger
757
f67539c2 758 def serve(self) -> None:
3efd9988
FG
759 """
760 The serve method is mandatory for standby modules.
761 :return:
762 """
763 raise NotImplementedError()
764
f67539c2 765 def get_mgr_id(self) -> str:
3efd9988
FG
766 return self._ceph_get_mgr_id()
767
f67539c2 768 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
b32b8144
FG
769 """
770 Retrieve the value of a persistent configuration setting
771
b32b8144 772 :param default: the default value of the config if it is not found
b32b8144 773 """
11fdf7f2 774 r = self._ceph_get_module_option(key)
b32b8144 775 if r is None:
11fdf7f2 776 return self.MODULE_OPTION_DEFAULTS.get(key, default)
b32b8144
FG
777 else:
778 return r
779
f67539c2 780 def get_ceph_option(self, key: str) -> OptionValue:
11fdf7f2
TL
781 return self._ceph_get_option(key)
782
f67539c2 783 def get_store(self, key: str) -> Optional[str]:
11fdf7f2
TL
784 """
785 Retrieve the value of a persistent KV store entry
786
787 :param key: String
788 :return: Byte string or None
789 """
790 return self._ceph_get_store(key)
3efd9988 791
f67539c2
TL
792 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
793 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
794 if r is None:
795 r = self._ceph_get_store(key)
796 if r is None:
797 r = default
798 return r
799
800 def get_active_uri(self) -> str:
3efd9988
FG
801 return self._ceph_get_active_uri()
802
b3b6e05e
TL
803 def get(self, data_name: str):
804 return self._ceph_get(data_name)
805
806 def get_mgr_ip(self) -> str:
807 ips = self.get("mgr_ips").get('ips', [])
808 if not ips:
809 return socket.gethostname()
810 return ips[0]
811
f67539c2 812 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
11fdf7f2 813 r = self._ceph_get_module_option(key, self.get_mgr_id())
3efd9988 814 if r is None:
11fdf7f2
TL
815 return self.MODULE_OPTION_DEFAULTS.get(key, default)
816 else:
817 return r
3efd9988 818
3efd9988 819
f67539c2
TL
820HealthChecksT = Mapping[str, Mapping[str, Union[int, str, Sequence[str]]]]
821# {"type": service_type, "id": service_id}
822ServiceInfoT = Dict[str, str]
823# {"hostname": hostname,
824# "ceph_version": version,
825# "services": [service_info, ..]}
826ServerInfoT = Dict[str, Union[str, List[ServiceInfoT]]]
827PerfCounterT = Dict[str, Any]
828
829
9f95a23c
TL
830class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
831 COMMANDS = [] # type: List[Any]
f67539c2 832 MODULE_OPTIONS: List[Option] = []
9f95a23c 833 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
7c673cae 834
3efd9988
FG
835 # Priority definitions for perf counters
836 PRIO_CRITICAL = 10
837 PRIO_INTERESTING = 8
838 PRIO_USEFUL = 5
839 PRIO_UNINTERESTING = 2
840 PRIO_DEBUGONLY = 0
841
842 # counter value types
843 PERFCOUNTER_TIME = 1
844 PERFCOUNTER_U64 = 2
845
846 # counter types
847 PERFCOUNTER_LONGRUNAVG = 4
848 PERFCOUNTER_COUNTER = 8
849 PERFCOUNTER_HISTOGRAM = 0x10
28e407b8 850 PERFCOUNTER_TYPE_MASK = ~3
7c673cae 851
1adf2230
AA
852 # units supported
853 BYTES = 0
854 NONE = 1
11fdf7f2
TL
855
856 # Cluster log priorities
f67539c2
TL
857 class ClusterLogPrio(IntEnum):
858 DEBUG = 0
859 INFO = 1
860 SEC = 2
861 WARN = 3
862 ERROR = 4
863
864 def __init__(self, module_name: str, py_modules_ptr: object, this_ptr: object):
3efd9988 865 self.module_name = module_name
3efd9988 866 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
7c673cae 867
11fdf7f2
TL
868 for o in self.MODULE_OPTIONS:
869 if 'default' in o:
870 if 'type' in o:
871 # we'll assume the declared type matches the
872 # supplied default value's type.
873 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
874 else:
875 # module not declaring it's type, so normalize the
876 # default value to be a string for consistent behavior
877 # with default and user-supplied option values.
878 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
879
f67539c2
TL
880 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
881 log_level = cast(str, self.get_module_option("log_level"))
882 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
9f95a23c 883 log_to_file = self.get_module_option("log_to_file")
f67539c2 884 assert isinstance(log_to_file, bool)
9f95a23c 885 log_to_cluster = self.get_module_option("log_to_cluster")
f67539c2 886 assert isinstance(log_to_cluster, bool)
9f95a23c
TL
887 self._configure_logging(mgr_level, log_level, cluster_level,
888 log_to_file, log_to_cluster)
889
890 # for backwards compatibility
891 self._logger = self.getLogger()
892
893 self._version = self._ceph_get_version()
894
895 self._perf_schema_cache = None
896
897 # Keep a librados instance for those that need it.
f67539c2 898 self._rados: Optional[rados.Rados] = None
9f95a23c 899
f67539c2 900 def __del__(self) -> None:
9f95a23c
TL
901 self._unconfigure_logging()
902
903 @classmethod
f67539c2 904 def _register_options(cls, module_name: str) -> None:
9f95a23c
TL
905 cls.MODULE_OPTIONS.append(
906 Option(name='log_level', type='str', default="", runtime=True,
907 enum_allowed=['info', 'debug', 'critical', 'error',
908 'warning', '']))
909 cls.MODULE_OPTIONS.append(
910 Option(name='log_to_file', type='bool', default=False, runtime=True))
911 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
912 cls.MODULE_OPTIONS.append(
913 Option(name='log_to_cluster', type='bool', default=False,
914 runtime=True))
915 cls.MODULE_OPTIONS.append(
916 Option(name='log_to_cluster_level', type='str', default='info',
917 runtime=True,
918 enum_allowed=['info', 'debug', 'critical', 'error',
919 'warning', '']))
3efd9988 920
11fdf7f2 921 @classmethod
f67539c2 922 def _register_commands(cls, module_name: str) -> None:
11fdf7f2 923 cls.COMMANDS.extend(CLICommand.dump_cmd_list())
7c673cae
FG
924
925 @property
f67539c2 926 def log(self) -> logging.Logger:
7c673cae
FG
927 return self._logger
928
f67539c2 929 def cluster_log(self, channel: str, priority: ClusterLogPrio, message: str) -> None:
11fdf7f2
TL
930 """
931 :param channel: The log channel. This can be 'cluster', 'audit', ...
f67539c2 932 :param priority: The log message priority.
11fdf7f2 933 :param message: The message to log.
11fdf7f2 934 """
f67539c2 935 self._ceph_cluster_log(channel, priority.value, message)
11fdf7f2 936
7c673cae 937 @property
f67539c2 938 def version(self) -> str:
7c673cae
FG
939 return self._version
940
92f5a8d4 941 @property
f67539c2 942 def release_name(self) -> str:
92f5a8d4
TL
943 """
944 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
945 :return: Returns the release name of the Ceph version in lower case.
946 :rtype: str
947 """
948 return self._ceph_get_release_name()
949
f67539c2
TL
950 def lookup_release_name(self, major: int) -> str:
951 return self._ceph_lookup_release_name(major)
952
953 def get_context(self) -> object:
3efd9988
FG
954 """
955 :return: a Python capsule containing a C++ CephContext pointer
956 """
957 return self._ceph_get_context()
958
f67539c2 959 def notify(self, notify_type: str, notify_id: str) -> None:
7c673cae
FG
960 """
961 Called by the ceph-mgr service to notify the Python plugin
962 that new state is available.
11fdf7f2
TL
963
964 :param notify_type: string indicating what kind of notification,
965 such as osd_map, mon_map, fs_map, mon_status,
966 health, pg_summary, command, service_map
967 :param notify_id: string (may be empty) that optionally specifies
968 which entity is being notified about. With
969 "command" notifications this is set to the tag
970 ``from send_command``.
971 """
972 pass
973
f67539c2 974 def _config_notify(self) -> None:
9f95a23c 975 # check logging options for changes
f67539c2
TL
976 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
977 module_level = cast(str, self.get_module_option("log_level"))
978 cluster_level = cast(str, self.get_module_option("log_to_cluster_level"))
979 assert isinstance(cluster_level, str)
9f95a23c 980 log_to_file = self.get_module_option("log_to_file", False)
f67539c2 981 assert isinstance(log_to_file, bool)
9f95a23c 982 log_to_cluster = self.get_module_option("log_to_cluster", False)
f67539c2 983 assert isinstance(log_to_cluster, bool)
9f95a23c
TL
984 self._set_log_level(mgr_level, module_level, cluster_level)
985
986 if log_to_file != self.log_to_file:
987 if log_to_file:
988 self._enable_file_log()
989 else:
990 self._disable_file_log()
991 if log_to_cluster != self.log_to_cluster:
992 if log_to_cluster:
993 self._enable_cluster_log()
994 else:
995 self._disable_cluster_log()
996
997 # call module subclass implementations
998 self.config_notify()
999
f67539c2 1000 def config_notify(self) -> None:
11fdf7f2
TL
1001 """
1002 Called by the ceph-mgr service to notify the Python plugin
1003 that the configuration may have changed. Modules will want to
1004 refresh any configuration values stored in config variables.
7c673cae
FG
1005 """
1006 pass
1007
f67539c2 1008 def serve(self) -> None:
7c673cae
FG
1009 """
1010 Called by the ceph-mgr service to start any server that
1011 is provided by this Python plugin. The implementation
1012 of this function should block until ``shutdown`` is called.
1013
1014 You *must* implement ``shutdown`` if you implement ``serve``
1015 """
1016 pass
1017
f67539c2 1018 def shutdown(self) -> None:
7c673cae
FG
1019 """
1020 Called by the ceph-mgr service to request that this
1021 module drop out of its serve() function. You do not
1022 need to implement this if you do not implement serve()
1023
1024 :return: None
1025 """
11fdf7f2 1026 if self._rados:
9f95a23c 1027 addrs = self._rados.get_addrs()
11fdf7f2 1028 self._rados.shutdown()
9f95a23c 1029 self._ceph_unregister_client(addrs)
7c673cae 1030
f67539c2 1031 def get(self, data_name: str):
7c673cae 1032 """
11fdf7f2
TL
1033 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
1034
1035 :param str data_name: Valid things to fetch are osd_crush_map_text,
1036 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
1037 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
9f95a23c
TL
1038 health, mon_status, devices, device <devid>, pg_stats,
1039 pool_stats, pg_ready, osd_ping_times.
11fdf7f2
TL
1040
1041 Note:
1042 All these structures have their own JSON representations: experiment
1043 or look at the C++ ``dump()`` methods to learn about them.
7c673cae 1044 """
3efd9988 1045 return self._ceph_get(data_name)
7c673cae 1046
f67539c2 1047 def _stattype_to_str(self, stattype: int) -> str:
11fdf7f2 1048
94b18763
FG
1049 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
1050 if typeonly == 0:
1051 return 'gauge'
1052 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
1053 # this lie matches the DaemonState decoding: only val, no counts
1054 return 'counter'
1055 if typeonly == self.PERFCOUNTER_COUNTER:
1056 return 'counter'
1057 if typeonly == self.PERFCOUNTER_HISTOGRAM:
1058 return 'histogram'
11fdf7f2 1059
94b18763
FG
1060 return ''
1061
f67539c2
TL
1062 def _perfpath_to_path_labels(self, daemon: str,
1063 path: str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]:
9f95a23c
TL
1064 label_names = ("ceph_daemon",) # type: Tuple[str, ...]
1065 labels = (daemon,) # type: Tuple[str, ...]
81eedcae
TL
1066
1067 if daemon.startswith('rbd-mirror.'):
1068 match = re.match(
9f95a23c 1069 r'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
81eedcae
TL
1070 path
1071 )
1072 if match:
9f95a23c 1073 path = 'rbd_mirror_image_' + match.group(4)
81eedcae
TL
1074 pool = match.group(1)
1075 namespace = match.group(2) or ''
1076 image = match.group(3)
1077 label_names += ('pool', 'namespace', 'image')
1078 labels += (pool, namespace, image)
1079
1080 return path, label_names, labels,
1081
f67539c2 1082 def _perfvalue_to_value(self, stattype: int, value: Union[int, float]) -> Union[float, int]:
28e407b8
AA
1083 if stattype & self.PERFCOUNTER_TIME:
1084 # Convert from ns to seconds
1085 return value / 1000000000.0
1086 else:
1087 return value
1088
f67539c2 1089 def _unit_to_str(self, unit: int) -> str:
1adf2230
AA
1090 if unit == self.NONE:
1091 return "/s"
1092 elif unit == self.BYTES:
11fdf7f2 1093 return "B/s"
f67539c2
TL
1094 else:
1095 raise ValueError(f'bad unit "{unit}"')
11fdf7f2
TL
1096
1097 @staticmethod
f67539c2 1098 def to_pretty_iec(n: int) -> str:
11fdf7f2
TL
1099 for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
1100 (20, 'Mi'), (10, 'Ki')]:
1101 if n > 10 << bits:
1102 return str(n >> bits) + ' ' + suffix
1103 return str(n) + ' '
1104
1105 @staticmethod
f67539c2 1106 def get_pretty_row(elems: Sequence[str], width: int) -> str:
11fdf7f2
TL
1107 """
1108 Takes an array of elements and returns a string with those elements
1109 formatted as a table row. Useful for polling modules.
1110
1111 :param elems: the elements to be printed
1112 :param width: the width of the terminal
1113 """
1114 n = len(elems)
1115 column_width = int(width / n)
1116
1117 ret = '|'
1118 for elem in elems:
1119 ret += '{0:>{w}} |'.format(elem, w=column_width - 2)
1120
1121 return ret
1122
f67539c2 1123 def get_pretty_header(self, elems: Sequence[str], width: int) -> str:
11fdf7f2
TL
1124 """
1125 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
1126
1127 :param elems: the elements to be printed
1128 :param width: the width of the terminal
1129 """
1130 n = len(elems)
1131 column_width = int(width / n)
1132
1133 # dash line
1134 ret = '+'
1135 for i in range(0, n):
1136 ret += '-' * (column_width - 1) + '+'
1137 ret += '\n'
1138
1139 # title
1140 ret += self.get_pretty_row(elems, width)
1141 ret += '\n'
1142
1143 # dash line
1144 ret += '+'
1145 for i in range(0, n):
1146 ret += '-' * (column_width - 1) + '+'
1147 ret += '\n'
1148
1149 return ret
1150
f67539c2 1151 def get_server(self, hostname) -> ServerInfoT:
7c673cae 1152 """
11fdf7f2
TL
1153 Called by the plugin to fetch metadata about a particular hostname from
1154 ceph-mgr.
1155
1156 This is information that ceph-mgr has gleaned from the daemon metadata
1157 reported by daemons running on a particular server.
7c673cae 1158
11fdf7f2 1159 :param hostname: a hostname
7c673cae 1160 """
f67539c2 1161 return cast(ServerInfoT, self._ceph_get_server(hostname))
7c673cae 1162
f67539c2
TL
1163 def get_perf_schema(self,
1164 svc_type: str,
1165 svc_name: str) -> Dict[str,
1166 Dict[str, Dict[str, Union[str, int]]]]:
c07f9fc5
FG
1167 """
1168 Called by the plugin to fetch perf counter schema info.
1169 svc_name can be nullptr, as can svc_type, in which case
1170 they are wildcards
1171
11fdf7f2
TL
1172 :param str svc_type:
1173 :param str svc_name:
c07f9fc5
FG
1174 :return: list of dicts describing the counters requested
1175 """
3efd9988 1176 return self._ceph_get_perf_schema(svc_type, svc_name)
c07f9fc5 1177
f67539c2
TL
1178 def get_counter(self,
1179 svc_type: str,
1180 svc_name: str,
1181 path: str) -> Dict[str, List[Tuple[float, int]]]:
7c673cae 1182 """
11fdf7f2
TL
1183 Called by the plugin to fetch the latest performance counter data for a
1184 particular counter on a particular service.
7c673cae 1185
11fdf7f2
TL
1186 :param str svc_type:
1187 :param str svc_name:
1188 :param str path: a period-separated concatenation of the subsystem and the
1189 counter name, for example "mds.inodes".
f67539c2
TL
1190 :return: A dict of counter names to their values. each value is a list of
1191 of two-tuples of (timestamp, value). This may be empty if no data is
1192 available.
7c673cae 1193 """
3efd9988 1194 return self._ceph_get_counter(svc_type, svc_name, path)
7c673cae 1195
f67539c2
TL
1196 def get_latest_counter(self,
1197 svc_type: str,
1198 svc_name: str,
1199 path: str) -> Dict[str, Union[Tuple[float, int],
1200 Tuple[float, int, int]]]:
11fdf7f2
TL
1201 """
1202 Called by the plugin to fetch only the newest performance counter data
f67539c2 1203 point for a particular counter on a particular service.
11fdf7f2
TL
1204
1205 :param str svc_type:
1206 :param str svc_name:
1207 :param str path: a period-separated concatenation of the subsystem and the
1208 counter name, for example "mds.inodes".
f67539c2
TL
1209 :return: A list of two-tuples of (timestamp, value) or three-tuple of
1210 (timestamp, value, count) is returned. This may be empty if no
1211 data is available.
11fdf7f2
TL
1212 """
1213 return self._ceph_get_latest_counter(svc_type, svc_name, path)
1214
f67539c2 1215 def list_servers(self) -> List[ServerInfoT]:
7c673cae 1216 """
11fdf7f2
TL
1217 Like ``get_server``, but gives information about all servers (i.e. all
1218 unique hostnames that have been mentioned in daemon metadata)
1219
1220 :return: a list of information about all servers
1221 :rtype: list
7c673cae 1222 """
f67539c2 1223 return cast(List[ServerInfoT], self._ceph_get_server(None))
7c673cae 1224
f67539c2
TL
1225 def get_metadata(self,
1226 svc_type: str,
1227 svc_id: str,
1228 default: Optional[Dict[str, str]] = None) -> Optional[Dict[str, str]]:
7c673cae 1229 """
11fdf7f2 1230 Fetch the daemon metadata for a particular service.
7c673cae 1231
11fdf7f2
TL
1232 ceph-mgr fetches metadata asynchronously, so are windows of time during
1233 addition/removal of services where the metadata is not available to
1234 modules. ``None`` is returned if no metadata is available.
1235
1236 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1237 :param str svc_id: service id. convert OSD integer IDs to strings when
1238 calling this
1239 :rtype: dict, or None if no metadata found
7c673cae 1240 """
f6b5b4d7
TL
1241 metadata = self._ceph_get_metadata(svc_type, svc_id)
1242 if metadata is None:
1243 return default
1244 return metadata
7c673cae 1245
f67539c2 1246 def get_daemon_status(self, svc_type: str, svc_id: str) -> Dict[str, str]:
224ce89b
WB
1247 """
1248 Fetch the latest status for a particular service daemon.
1249
11fdf7f2
TL
1250 This method may return ``None`` if no status information is
1251 available, for example because the daemon hasn't fully started yet.
1252
224ce89b
WB
1253 :param svc_type: string (e.g., 'rgw')
1254 :param svc_id: string
11fdf7f2 1255 :return: dict, or None if the service is not found
224ce89b 1256 """
3efd9988 1257 return self._ceph_get_daemon_status(svc_type, svc_id)
224ce89b 1258
f67539c2 1259 def check_mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> HandleCommandResult:
e306af50
TL
1260 """
1261 Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
1262 if ``retval != 0``.
1263 """
1264
cd265ab1 1265 r = HandleCommandResult(*self.mon_command(cmd_dict, inbuf))
e306af50
TL
1266 if r.retval:
1267 raise MonCommandFailed(f'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
1268 return r
1269
f67539c2 1270 def mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
11fdf7f2
TL
1271 """
1272 Helper for modules that do simple, synchronous mon command
1273 execution.
1274
1275 See send_command for general case.
1276
1277 :return: status int, out std, err str
1278 """
1279
1280 t1 = time.time()
1281 result = CommandResult()
cd265ab1 1282 self.send_command(result, "mon", "", json.dumps(cmd_dict), "", inbuf)
11fdf7f2
TL
1283 r = result.wait()
1284 t2 = time.time()
1285
1286 self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1287 cmd_dict['prefix'], r[0], t2 - t1
1288 ))
1289
1290 return r
1291
cd265ab1
TL
1292 def send_command(
1293 self,
1294 result: CommandResult,
1295 svc_type: str,
1296 svc_id: str,
1297 command: str,
1298 tag: str,
f67539c2 1299 inbuf: Optional[str] = None) -> None:
7c673cae
FG
1300 """
1301 Called by the plugin to send a command to the mon
1302 cluster.
11fdf7f2
TL
1303
1304 :param CommandResult result: an instance of the ``CommandResult``
1305 class, defined in the same module as MgrModule. This acts as a
1306 completion and stores the output of the command. Use
1307 ``CommandResult.wait()`` if you want to block on completion.
1308 :param str svc_type:
1309 :param str svc_id:
1310 :param str command: a JSON-serialized command. This uses the same
1311 format as the ceph command line, which is a dictionary of command
1312 arguments, with the extra ``prefix`` key containing the command
1313 name itself. Consult MonCommands.h for available commands and
1314 their expected arguments.
1315 :param str tag: used for nonblocking operation: when a command
1316 completes, the ``notify()`` callback on the MgrModule instance is
1317 triggered, with notify_type set to "command", and notify_id set to
1318 the tag of the command.
cd265ab1 1319 :param str inbuf: input buffer for sending additional data.
7c673cae 1320 """
cd265ab1 1321 self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
7c673cae 1322
f67539c2 1323 def set_health_checks(self, checks: HealthChecksT) -> None:
c07f9fc5 1324 """
c07f9fc5
FG
1325 Set the module's current map of health checks. Argument is a
1326 dict of check names to info, in this form:
1327
11fdf7f2
TL
1328 ::
1329
c07f9fc5
FG
1330 {
1331 'CHECK_FOO': {
1332 'severity': 'warning', # or 'error'
1333 'summary': 'summary string',
9f95a23c 1334 'count': 4, # quantify badness
c07f9fc5
FG
1335 'detail': [ 'list', 'of', 'detail', 'strings' ],
1336 },
1337 'CHECK_BAR': {
1338 'severity': 'error',
1339 'summary': 'bars are bad',
1340 'detail': [ 'too hard' ],
1341 },
1342 }
1343
1344 :param list: dict of health check dicts
1345 """
3efd9988 1346 self._ceph_set_health_checks(checks)
c07f9fc5 1347
f67539c2
TL
1348 def _handle_command(self,
1349 inbuf: str,
1350 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1351 Tuple[int, str, str]]:
11fdf7f2
TL
1352 if cmd['prefix'] not in CLICommand.COMMANDS:
1353 return self.handle_command(inbuf, cmd)
9f95a23c 1354
11fdf7f2
TL
1355 return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
1356
f67539c2
TL
1357 def handle_command(self,
1358 inbuf: str,
1359 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1360 Tuple[int, str, str]]:
7c673cae
FG
1361 """
1362 Called by ceph-mgr to request the plugin to handle one
1363 of the commands that it declared in self.COMMANDS
1364
1365 Return a status code, an output buffer, and an
1366 output string. The output buffer is for data results,
1367 the output string is for informative text.
1368
11fdf7f2
TL
1369 :param inbuf: content of any "-i <file>" supplied to ceph cli
1370 :type inbuf: str
1371 :param cmd: from Ceph's cmdmap_t
1372 :type cmd: dict
7c673cae 1373
11fdf7f2 1374 :return: HandleCommandResult or a 3-tuple of (int, str, str)
7c673cae
FG
1375 """
1376
1377 # Should never get called if they didn't declare
1378 # any ``COMMANDS``
1379 raise NotImplementedError()
1380
f67539c2 1381 def get_mgr_id(self) -> str:
31f18b77 1382 """
11fdf7f2
TL
1383 Retrieve the name of the manager daemon where this plugin
1384 is currently being executed (i.e. the active manager).
31f18b77
FG
1385
1386 :return: str
1387 """
3efd9988 1388 return self._ceph_get_mgr_id()
31f18b77 1389
b3b6e05e
TL
1390 def get_ceph_conf_path(self) -> str:
1391 return self._ceph_get_ceph_conf_path()
1392
1393 def get_mgr_ip(self) -> str:
1394 ips = self.get("mgr_ips").get('ips', [])
1395 if not ips:
1396 return socket.gethostname()
1397 return ips[0]
1398
f67539c2 1399 def get_ceph_option(self, key: str) -> OptionValue:
11fdf7f2 1400 return self._ceph_get_option(key)
7c673cae 1401
f67539c2
TL
1402 def get_foreign_ceph_option(self, entity: str, key: str) -> OptionValue:
1403 return self._ceph_get_foreign_option(entity, key)
1404
1405 def _validate_module_option(self, key: str) -> None:
11fdf7f2
TL
1406 """
1407 Helper: don't allow get/set config callers to
1408 access config options that they didn't declare
1409 in their schema.
7c673cae 1410 """
11fdf7f2
TL
1411 if key not in [o['name'] for o in self.MODULE_OPTIONS]:
1412 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1413 format(key, self.__class__.__name__))
1414
f67539c2
TL
1415 def _get_module_option(self,
1416 key: str,
1417 default: OptionValue,
1418 localized_prefix: str = "") -> OptionValue:
11fdf7f2
TL
1419 r = self._ceph_get_module_option(self.module_name, key,
1420 localized_prefix)
3efd9988 1421 if r is None:
11fdf7f2 1422 return self.MODULE_OPTION_DEFAULTS.get(key, default)
3efd9988
FG
1423 else:
1424 return r
7c673cae 1425
f67539c2 1426 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
11fdf7f2
TL
1427 """
1428 Retrieve the value of a persistent configuration setting
11fdf7f2
TL
1429 """
1430 self._validate_module_option(key)
1431 return self._get_module_option(key, default)
1432
f67539c2
TL
1433 def get_module_option_ex(self, module: str,
1434 key: str,
1435 default: OptionValue = None) -> OptionValue:
11fdf7f2
TL
1436 """
1437 Retrieve the value of a persistent configuration setting
1438 for the specified module.
1439
f67539c2 1440 :param module: The name of the module, e.g. 'dashboard'
11fdf7f2 1441 or 'telemetry'.
f67539c2
TL
1442 :param key: The configuration key, e.g. 'server_addr'.
1443 :param default: The default value to use when the
11fdf7f2 1444 returned value is ``None``. Defaults to ``None``.
11fdf7f2
TL
1445 """
1446 if module == self.module_name:
1447 self._validate_module_option(key)
1448 r = self._ceph_get_module_option(module, key)
1449 return default if r is None else r
1450
f67539c2 1451 def get_store_prefix(self, key_prefix: str) -> Dict[str, str]:
31f18b77 1452 """
11fdf7f2
TL
1453 Retrieve a dict of KV store keys to values, where the keys
1454 have the given prefix
31f18b77 1455
11fdf7f2 1456 :param str key_prefix:
31f18b77
FG
1457 :return: str
1458 """
11fdf7f2 1459 return self._ceph_get_store_prefix(key_prefix)
31f18b77 1460
f67539c2
TL
1461 def _set_localized(self,
1462 key: str,
1463 val: Optional[str],
1464 setter: Callable[[str, Optional[str]], None]) -> None:
11fdf7f2
TL
1465 return setter(_get_localized_key(self.get_mgr_id(), key), val)
1466
f67539c2 1467 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
224ce89b
WB
1468 """
1469 Retrieve localized configuration for this ceph-mgr instance
224ce89b 1470 """
11fdf7f2
TL
1471 self._validate_module_option(key)
1472 return self._get_module_option(key, default, self.get_mgr_id())
224ce89b 1473
f67539c2 1474 def _set_module_option(self, key: str, val: Any) -> None:
eafe8130
TL
1475 return self._ceph_set_module_option(self.module_name, key,
1476 None if val is None else str(val))
224ce89b 1477
f67539c2 1478 def set_module_option(self, key: str, val: Any) -> None:
7c673cae
FG
1479 """
1480 Set the value of a persistent configuration setting
1481
11fdf7f2
TL
1482 :param str key:
1483 :type val: str | None
7c673cae 1484 """
11fdf7f2
TL
1485 self._validate_module_option(key)
1486 return self._set_module_option(key, val)
7c673cae 1487
f67539c2 1488 def set_module_option_ex(self, module: str, key: str, val: OptionValue) -> None:
11fdf7f2
TL
1489 """
1490 Set the value of a persistent configuration setting
1491 for the specified module.
1492
1493 :param str module:
1494 :param str key:
1495 :param str val:
1496 """
1497 if module == self.module_name:
1498 self._validate_module_option(key)
1499 return self._ceph_set_module_option(module, key, str(val))
1500
f67539c2 1501 def set_localized_module_option(self, key: str, val: Optional[str]) -> None:
224ce89b
WB
1502 """
1503 Set localized configuration for this ceph-mgr instance
11fdf7f2
TL
1504 :param str key:
1505 :param str val:
224ce89b
WB
1506 :return: str
1507 """
11fdf7f2
TL
1508 self._validate_module_option(key)
1509 return self._set_localized(key, val, self._set_module_option)
224ce89b 1510
f67539c2 1511 def set_store(self, key: str, val: Optional[str]) -> None:
7c673cae 1512 """
11fdf7f2
TL
1513 Set a value in this module's persistent key value store.
1514 If val is None, remove key from store
7c673cae 1515 """
11fdf7f2 1516 self._ceph_set_store(key, val)
7c673cae 1517
f67539c2 1518 def get_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
7c673cae 1519 """
11fdf7f2 1520 Get a value from this module's persistent key value store
7c673cae 1521 """
11fdf7f2
TL
1522 r = self._ceph_get_store(key)
1523 if r is None:
1524 return default
7c673cae 1525 else:
11fdf7f2
TL
1526 return r
1527
f67539c2 1528 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
11fdf7f2
TL
1529 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
1530 if r is None:
1531 r = self._ceph_get_store(key)
1532 if r is None:
1533 r = default
1534 return r
1535
f67539c2 1536 def set_localized_store(self, key: str, val: Optional[str]) -> None:
11fdf7f2 1537 return self._set_localized(key, val, self.set_store)
224ce89b 1538
f67539c2 1539 def self_test(self) -> None:
224ce89b
WB
1540 """
1541 Run a self-test on the module. Override this function and implement
1542 a best as possible self-test for (automated) testing of the module
11fdf7f2
TL
1543
1544 Indicate any failures by raising an exception. This does not have
1545 to be pretty, it's mainly for picking up regressions during
1546 development, rather than use in the field.
1547
1548 :return: None, or an advisory string for developer interest, such
1549 as a json dump of some state.
224ce89b
WB
1550 """
1551 pass
3efd9988 1552
f67539c2 1553 def get_osdmap(self) -> OSDMap:
3efd9988
FG
1554 """
1555 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1556 OSDMap.
1557 :return: OSDMap
1558 """
f67539c2 1559 return cast(OSDMap, self._ceph_get_osdmap())
3efd9988 1560
f67539c2 1561 def get_latest(self, daemon_type: str, daemon_name: str, counter: str) -> int:
11fdf7f2
TL
1562 data = self.get_latest_counter(
1563 daemon_type, daemon_name, counter)[counter]
1564 if data:
1565 return data[1]
1566 else:
1567 return 0
1568
f67539c2 1569 def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tuple[int, int]:
11fdf7f2
TL
1570 data = self.get_latest_counter(
1571 daemon_type, daemon_name, counter)[counter]
1572 if data:
f67539c2
TL
1573 # https://github.com/python/mypy/issues/1178
1574 _, value, count = cast(Tuple[float, int, int], data)
1575 return value, count
11fdf7f2
TL
1576 else:
1577 return 0, 0
1578
f6b5b4d7 1579 @profile_method()
f67539c2
TL
1580 def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL,
1581 services: Sequence[str] = ("mds", "mon", "osd",
1582 "rbd-mirror", "rgw",
1583 "tcmu-runner")) -> Dict[str, dict]:
3efd9988
FG
1584 """
1585 Return the perf counters currently known to this ceph-mgr
1586 instance, filtered by priority equal to or greater than `prio_limit`.
1587
11fdf7f2 1588 The result is a map of string to dict, associating services
3efd9988
FG
1589 (like "osd.123") with their counters. The counter
1590 dict for each service maps counter paths to a counter
1591 info structure, which is the information from
1592 the schema, plus an additional "value" member with the latest
1593 value.
1594 """
1595
9f95a23c 1596 result = defaultdict(dict) # type: Dict[str, dict]
3efd9988 1597
3efd9988 1598 for server in self.list_servers():
f67539c2 1599 for service in cast(List[ServiceInfoT], server['services']):
11fdf7f2 1600 if service['type'] not in services:
3efd9988
FG
1601 continue
1602
f67539c2
TL
1603 schemas = self.get_perf_schema(service['type'], service['id'])
1604 if not schemas:
e306af50 1605 self.log.warning("No perf counter schema for {0}.{1}".format(
3efd9988
FG
1606 service['type'], service['id']
1607 ))
1608 continue
1609
1610 # Value is returned in a potentially-multi-service format,
1611 # get just the service we're asking about
11fdf7f2
TL
1612 svc_full_name = "{0}.{1}".format(
1613 service['type'], service['id'])
f67539c2 1614 schema = schemas[svc_full_name]
3efd9988
FG
1615
1616 # Populate latest values
1617 for counter_path, counter_schema in schema.items():
1618 # self.log.debug("{0}: {1}".format(
1619 # counter_path, json.dumps(counter_schema)
1620 # ))
f67539c2
TL
1621 priority = counter_schema['priority']
1622 assert isinstance(priority, int)
1623 if priority < prio_limit:
3efd9988
FG
1624 continue
1625
f67539c2
TL
1626 tp = counter_schema['type']
1627 assert isinstance(tp, int)
28e407b8 1628 counter_info = dict(counter_schema)
28e407b8 1629 # Also populate count for the long running avgs
f67539c2 1630 if tp & self.PERFCOUNTER_LONGRUNAVG:
11fdf7f2 1631 v, c = self.get_latest_avg(
28e407b8
AA
1632 service['type'],
1633 service['id'],
1634 counter_path
1635 )
1636 counter_info['value'], counter_info['count'] = v, c
1637 result[svc_full_name][counter_path] = counter_info
1638 else:
11fdf7f2 1639 counter_info['value'] = self.get_latest(
28e407b8
AA
1640 service['type'],
1641 service['id'],
1642 counter_path
1643 )
1644
3efd9988
FG
1645 result[svc_full_name][counter_path] = counter_info
1646
1647 self.log.debug("returning {0} counter".format(len(result)))
1648
1649 return result
1650
f67539c2 1651 def set_uri(self, uri: str) -> None:
3efd9988
FG
1652 """
1653 If the module exposes a service, then call this to publish the
1654 address once it is available.
1655
1656 :return: a string
1657 """
1658 return self._ceph_set_uri(uri)
94b18763 1659
f67539c2
TL
1660 def set_device_wear_level(self, devid: str, wear_level: float) -> None:
1661 return self._ceph_set_device_wear_level(devid, wear_level)
1662
1663 def have_mon_connection(self) -> bool:
94b18763
FG
1664 """
1665 Check whether this ceph-mgr daemon has an open connection
1666 to a monitor. If it doesn't, then it's likely that the
1667 information we have about the cluster is out of date,
1668 and/or the monitor cluster is down.
1669 """
1670
28e407b8 1671 return self._ceph_have_mon_connection()
11fdf7f2 1672
f67539c2
TL
1673 def update_progress_event(self,
1674 evid: str,
1675 desc: str,
1676 progress: float,
1677 add_to_ceph_s: bool) -> None:
1678 return self._ceph_update_progress_event(evid, desc, progress, add_to_ceph_s)
11fdf7f2 1679
f67539c2
TL
1680 def complete_progress_event(self, evid: str) -> None:
1681 return self._ceph_complete_progress_event(evid)
11fdf7f2 1682
f67539c2 1683 def clear_all_progress_events(self) -> None:
11fdf7f2
TL
1684 return self._ceph_clear_all_progress_events()
1685
1686 @property
f67539c2 1687 def rados(self) -> rados.Rados:
11fdf7f2
TL
1688 """
1689 A librados instance to be shared by any classes within
1690 this mgr module that want one.
1691 """
1692 if self._rados:
1693 return self._rados
1694
1695 ctx_capsule = self.get_context()
1696 self._rados = rados.Rados(context=ctx_capsule)
1697 self._rados.connect()
9f95a23c 1698 self._ceph_register_client(self._rados.get_addrs())
11fdf7f2
TL
1699 return self._rados
1700
1701 @staticmethod
f67539c2 1702 def can_run() -> Tuple[bool, str]:
11fdf7f2
TL
1703 """
1704 Implement this function to report whether the module's dependencies
1705 are met. For example, if the module needs to import a particular
1706 dependency to work, then use a try/except around the import at
1707 file scope, and then report here if the import failed.
1708
1709 This will be called in a blocking way from the C++ code, so do not
1710 do any I/O that could block in this function.
1711
1712 :return a 2-tuple consisting of a boolean and explanatory string
1713 """
1714
1715 return True, ""
1716
f67539c2 1717 def remote(self, module_name: str, method_name: str, *args: Any, **kwargs: Any) -> Any:
11fdf7f2
TL
1718 """
1719 Invoke a method on another module. All arguments, and the return
1720 value from the other module must be serializable.
1721
1722 Limitation: Do not import any modules within the called method.
1723 Otherwise you will get an error in Python 2::
1724
1725 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
1726
1727
1728
1729 :param module_name: Name of other module. If module isn't loaded,
1730 an ImportError exception is raised.
1731 :param method_name: Method name. If it does not exist, a NameError
1732 exception is raised.
1733 :param args: Argument tuple
1734 :param kwargs: Keyword argument dict
1735 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
1736 :raises ImportError: No such module
1737 """
1738 return self._ceph_dispatch_remote(module_name, method_name,
1739 args, kwargs)
1740
f67539c2 1741 def add_osd_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
11fdf7f2
TL
1742 """
1743 Register an OSD perf query. Argument is a
1744 dict of the query parameters, in this form:
1745
1746 ::
1747
1748 {
1749 'key_descriptor': [
1750 {'type': subkey_type, 'regex': regex_pattern},
1751 ...
1752 ],
1753 'performance_counter_descriptors': [
1754 list, of, descriptor, types
1755 ],
1756 'limit': {'order_by': performance_counter_type, 'max_count': n},
1757 }
1758
1759 Valid subkey types:
1760 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
1761 'pg_id', 'object_name', 'snap_id'
1762 Valid performance counter types:
1763 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
1764 'latency', 'write_latency', 'read_latency'
1765
1766 :param object query: query
1767 :rtype: int (query id)
1768 """
1769 return self._ceph_add_osd_perf_query(query)
1770
f67539c2 1771 def remove_osd_perf_query(self, query_id: int) -> None:
11fdf7f2
TL
1772 """
1773 Unregister an OSD perf query.
1774
1775 :param int query_id: query ID
1776 """
1777 return self._ceph_remove_osd_perf_query(query_id)
1778
f67539c2 1779 def get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
11fdf7f2
TL
1780 """
1781 Get stats collected for an OSD perf query.
1782
1783 :param int query_id: query ID
1784 """
1785 return self._ceph_get_osd_perf_counters(query_id)
494da23a 1786
f67539c2
TL
1787 def add_mds_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
1788 """
1789 Register an MDS perf query. Argument is a
1790 dict of the query parameters, in this form:
1791
1792 ::
1793
1794 {
1795 'key_descriptor': [
1796 {'type': subkey_type, 'regex': regex_pattern},
1797 ...
1798 ],
1799 'performance_counter_descriptors': [
1800 list, of, descriptor, types
1801 ],
1802 }
1803
1804 NOTE: 'limit' and 'order_by' are not supported (yet).
1805
1806 Valid subkey types:
1807 'mds_rank', 'client_id'
1808 Valid performance counter types:
1809 'cap_hit_metric'
1810
1811 :param object query: query
1812 :rtype: int (query id)
1813 """
1814 return self._ceph_add_mds_perf_query(query)
1815
1816 def remove_mds_perf_query(self, query_id: int) -> None:
1817 """
1818 Unregister an MDS perf query.
1819
1820 :param int query_id: query ID
1821 """
1822 return self._ceph_remove_mds_perf_query(query_id)
1823
1824 def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
1825 """
1826 Get stats collected for an MDS perf query.
1827
1828 :param int query_id: query ID
1829 """
1830 return self._ceph_get_mds_perf_counters(query_id)
1831
1832 def is_authorized(self, arguments: Dict[str, str]) -> bool:
92f5a8d4
TL
1833 """
1834 Verifies that the current session caps permit executing the py service
1835 or current module with the provided arguments. This provides a generic
1836 way to allow modules to restrict by more fine-grained controls (e.g.
1837 pools).
1838
1839 :param arguments: dict of key/value arguments to test
1840 """
1841 return self._ceph_is_authorized(arguments)
522d829b
TL
1842
1843 def send_rgwadmin_command(self, args: List[str],
1844 stdout_as_json: bool = True) -> Tuple[int, Union[str, dict], str]:
1845 try:
1846 cmd = [
1847 'radosgw-admin',
1848 '-c', str(self.get_ceph_conf_path()),
1849 '-k', str(self.get_ceph_option('keyring')),
1850 '-n', f'mgr.{self.get_mgr_id()}',
1851 ] + args
1852 self.log.debug('Executing %s', str(cmd))
1853 result = subprocess.run( # pylint: disable=subprocess-run-check
1854 cmd,
1855 stdout=subprocess.PIPE,
1856 stderr=subprocess.PIPE,
1857 timeout=10,
1858 )
1859 stdout = result.stdout.decode('utf-8')
1860 stderr = result.stderr.decode('utf-8')
1861 if stdout and stdout_as_json:
1862 stdout = json.loads(stdout)
1863 if result.returncode:
1864 self.log.debug('Error %s executing %s: %s', result.returncode, str(cmd), stderr)
1865 return result.returncode, stdout, stderr
1866 except subprocess.CalledProcessError as ex:
1867 self.log.exception('Error executing radosgw-admin %s: %s', str(ex.cmd), str(ex.output))
1868 raise
1869 except subprocess.TimeoutExpired as ex:
1870 self.log.error('Timeout (10s) executing radosgw-admin %s', str(ex.cmd))
1871 raise