]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
701b1eaf1e5dc33765d420469d229f8c1f48140b
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
1 import ceph_module # noqa
2
3 from typing import cast, Tuple, Any, Dict, Generic, Optional, Callable, List, \
4 Mapping, NamedTuple, Sequence, Union, TYPE_CHECKING
5 if TYPE_CHECKING:
6 import sys
7 if sys.version_info >= (3, 8):
8 from typing import Literal
9 else:
10 from typing_extensions import Literal
11
12 import inspect
13 import logging
14 import errno
15 import functools
16 import json
17 import threading
18 from collections import defaultdict
19 from enum import IntEnum
20 import rados
21 import re
22 import sys
23 import time
24 from ceph_argparse import CephArgtype
25 from mgr_util import profile_method
26
27 if sys.version_info >= (3, 8):
28 from typing import get_args, get_origin
29 else:
30 def get_args(tp):
31 if tp is Generic:
32 return tp
33 else:
34 return getattr(tp, '__args__', ())
35
36 def get_origin(tp):
37 return getattr(tp, '__origin__', None)
38
39
40 ERROR_MSG_EMPTY_INPUT_FILE = 'Empty content: please add a password/secret to the file.'
41 ERROR_MSG_NO_INPUT_FILE = 'Please specify the file containing the password/secret with "-i" option.'
42 # Full list of strings in "osd_types.cc:pg_state_string()"
43 PG_STATES = [
44 "active",
45 "clean",
46 "down",
47 "recovery_unfound",
48 "backfill_unfound",
49 "scrubbing",
50 "degraded",
51 "inconsistent",
52 "peering",
53 "repair",
54 "recovering",
55 "forced_recovery",
56 "backfill_wait",
57 "incomplete",
58 "stale",
59 "remapped",
60 "deep",
61 "backfilling",
62 "forced_backfill",
63 "backfill_toofull",
64 "recovery_wait",
65 "recovery_toofull",
66 "undersized",
67 "activating",
68 "peered",
69 "snaptrim",
70 "snaptrim_wait",
71 "snaptrim_error",
72 "creating",
73 "unknown",
74 "premerge",
75 "failed_repair",
76 "laggy",
77 "wait",
78 ]
79
80
81 class CommandResult(object):
82 """
83 Use with MgrModule.send_command
84 """
85
86 def __init__(self, tag: Optional[str] = None):
87 self.ev = threading.Event()
88 self.outs = ""
89 self.outb = ""
90 self.r = 0
91
92 # This is just a convenience for notifications from
93 # C++ land, to avoid passing addresses around in messages.
94 self.tag = tag if tag else ""
95
96 def complete(self, r: int, outb: str, outs: str) -> None:
97 self.r = r
98 self.outb = outb
99 self.outs = outs
100 self.ev.set()
101
102 def wait(self) -> Tuple[int, str, str]:
103 self.ev.wait()
104 return self.r, self.outb, self.outs
105
106
107 class HandleCommandResult(NamedTuple):
108 """
109 Tuple containing the result of `handle_command()`
110
111 Only write to stderr if there is an error, or in extraordinary circumstances
112
113 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
114 is critical information to include there.
115
116 Everything programmatically consumable should be put on stdout
117 """
118 retval: int = 0 # return code. E.g. 0 or -errno.EINVAL
119 stdout: str = "" # data of this result.
120 stderr: str = "" # Typically used for error messages.
121
122
123 class MonCommandFailed(RuntimeError): pass
124
125
126 class OSDMap(ceph_module.BasePyOSDMap):
127 def get_epoch(self) -> int:
128 return self._get_epoch()
129
130 def get_crush_version(self) -> int:
131 return self._get_crush_version()
132
133 def dump(self) -> Dict[str, Any]:
134 return self._dump()
135
136 def get_pools(self) -> Dict[int, Dict[str, Any]]:
137 # FIXME: efficient implementation
138 d = self._dump()
139 return dict([(p['pool'], p) for p in d['pools']])
140
141 def get_pools_by_name(self) -> Dict[str, Dict[str, Any]]:
142 # FIXME: efficient implementation
143 d = self._dump()
144 return dict([(p['pool_name'], p) for p in d['pools']])
145
146 def new_incremental(self) -> 'OSDMapIncremental':
147 return self._new_incremental()
148
149 def apply_incremental(self, inc: 'OSDMapIncremental') -> 'OSDMap':
150 return self._apply_incremental(inc)
151
152 def get_crush(self) -> 'CRUSHMap':
153 return self._get_crush()
154
155 def get_pools_by_take(self, take: int) -> List[int]:
156 return self._get_pools_by_take(take).get('pools', [])
157
158 def calc_pg_upmaps(self, inc: 'OSDMapIncremental',
159 max_deviation: int,
160 max_iterations: int = 10,
161 pools: Optional[List[str]] = None) -> int:
162 if pools is None:
163 pools = []
164 return self._calc_pg_upmaps(
165 inc,
166 max_deviation, max_iterations, pools)
167
168 def map_pool_pgs_up(self, poolid: int) -> List[int]:
169 return self._map_pool_pgs_up(poolid)
170
171 def pg_to_up_acting_osds(self, pool_id: int, ps: int) -> Dict[str, Any]:
172 return self._pg_to_up_acting_osds(pool_id, ps)
173
174 def pool_raw_used_rate(self, pool_id: int) -> float:
175 return self._pool_raw_used_rate(pool_id)
176
177 def get_ec_profile(self, name: str) -> Optional[List[Dict[str, str]]]:
178 # FIXME: efficient implementation
179 d = self._dump()
180 return d['erasure_code_profiles'].get(name, None)
181
182 def get_require_osd_release(self) -> str:
183 d = self._dump()
184 return d['require_osd_release']
185
186
187 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
188 def get_epoch(self) -> int:
189 return self._get_epoch()
190
191 def dump(self) -> Dict[str, Any]:
192 return self._dump()
193
194 def set_osd_reweights(self, weightmap: Dict[int, float]) -> None:
195 """
196 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
197 """
198 return self._set_osd_reweights(weightmap)
199
200 def set_crush_compat_weight_set_weights(self, weightmap: Dict[str, float]) -> None:
201 """
202 weightmap is a dict, int to float. devices only. e.g.,
203 { 0: 3.4, 1: 3.3, 2: 3.334 }
204 """
205 return self._set_crush_compat_weight_set_weights(weightmap)
206
207
208 class CRUSHMap(ceph_module.BasePyCRUSH):
209 ITEM_NONE = 0x7fffffff
210 DEFAULT_CHOOSE_ARGS = '-1'
211
212 def dump(self) -> Dict[str, Any]:
213 return self._dump()
214
215 def get_item_weight(self, item: int) -> Optional[int]:
216 return self._get_item_weight(item)
217
218 def get_item_name(self, item: int) -> Optional[str]:
219 return self._get_item_name(item)
220
221 def find_takes(self) -> List[int]:
222 return self._find_takes().get('takes', [])
223
224 def get_take_weight_osd_map(self, root: int) -> Dict[int, float]:
225 uglymap = self._get_take_weight_osd_map(root)
226 return {int(k): v for k, v in uglymap.get('weights', {}).items()}
227
228 @staticmethod
229 def have_default_choose_args(dump: Dict[str, Any]) -> bool:
230 return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
231
232 @staticmethod
233 def get_default_choose_args(dump: Dict[str, Any]) -> List[Dict[str, Any]]:
234 choose_args = dump.get('choose_args')
235 assert isinstance(choose_args, dict)
236 return choose_args.get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
237
238 def get_rule(self, rule_name: str) -> Optional[Dict[str, Any]]:
239 # TODO efficient implementation
240 for rule in self.dump()['rules']:
241 if rule_name == rule['rule_name']:
242 return rule
243
244 return None
245
246 def get_rule_by_id(self, rule_id: int) -> Optional[Dict[str, Any]]:
247 for rule in self.dump()['rules']:
248 if rule['rule_id'] == rule_id:
249 return rule
250
251 return None
252
253 def get_rule_root(self, rule_name: str) -> Optional[int]:
254 rule = self.get_rule(rule_name)
255 if rule is None:
256 return None
257
258 try:
259 first_take = next(s for s in rule['steps'] if s.get('op') == 'take')
260 except StopIteration:
261 logging.warning("CRUSH rule '{0}' has no 'take' step".format(
262 rule_name))
263 return None
264 else:
265 return first_take['item']
266
267 def get_osds_under(self, root_id: int) -> List[int]:
268 # TODO don't abuse dump like this
269 d = self.dump()
270 buckets = dict([(b['id'], b) for b in d['buckets']])
271
272 osd_list = []
273
274 def accumulate(b: Dict[str, Any]) -> None:
275 for item in b['items']:
276 if item['id'] >= 0:
277 osd_list.append(item['id'])
278 else:
279 try:
280 accumulate(buckets[item['id']])
281 except KeyError:
282 pass
283
284 accumulate(buckets[root_id])
285
286 return osd_list
287
288 def device_class_counts(self) -> Dict[str, int]:
289 result = defaultdict(int) # type: Dict[str, int]
290 # TODO don't abuse dump like this
291 d = self.dump()
292 for device in d['devices']:
293 cls = device.get('class', None)
294 result[cls] += 1
295
296 return dict(result)
297
298
299 HandlerFuncType = Callable[..., Tuple[int, str, str]]
300
301
302 class CLICommand(object):
303 COMMANDS = {} # type: Dict[str, CLICommand]
304
305 def __init__(self,
306 prefix: str,
307 perm: str = 'rw',
308 poll: bool = False):
309 self.prefix = prefix
310 self.perm = perm
311 self.poll = poll
312 self.func = None # type: Optional[Callable]
313 self.arg_spec = {} # type: Dict[str, Any]
314 self.first_default = -1
315
316 KNOWN_ARGS = '_', 'self', 'mgr', 'inbuf', 'return'
317
318 @staticmethod
319 def load_func_metadata(f: HandlerFuncType) -> Tuple[str, Dict[str, Any], int, str]:
320 desc = inspect.getdoc(f) or ''
321 full_argspec = inspect.getfullargspec(f)
322 arg_spec = full_argspec.annotations
323 first_default = len(arg_spec)
324 if full_argspec.defaults:
325 first_default -= len(full_argspec.defaults)
326 args = []
327 for index, arg in enumerate(full_argspec.args):
328 if arg in CLICommand.KNOWN_ARGS:
329 continue
330 assert arg in arg_spec, \
331 f"'{arg}' is not annotated for {f}: {full_argspec}"
332 has_default = index >= first_default
333 args.append(CephArgtype.to_argdesc(arg_spec[arg],
334 dict(name=arg),
335 has_default))
336 return desc, arg_spec, first_default, ' '.join(args)
337
338 def store_func_metadata(self, f: HandlerFuncType) -> None:
339 self.desc, self.arg_spec, self.first_default, self.args = \
340 self.load_func_metadata(f)
341
342 def __call__(self, func: HandlerFuncType) -> HandlerFuncType:
343 self.store_func_metadata(func)
344 self.func = func
345 self.COMMANDS[self.prefix] = self
346 return self.func
347
348 def _get_arg_value(self, kwargs_switch: bool, key: str, val: Any) -> Tuple[bool, str, Any]:
349 def start_kwargs() -> bool:
350 if isinstance(val, str) and '=' in val:
351 k, v = val.split('=', 1)
352 if k in self.arg_spec:
353 return True
354 return False
355
356 if not kwargs_switch:
357 kwargs_switch = start_kwargs()
358
359 if kwargs_switch:
360 k, v = val.split('=', 1)
361 else:
362 k, v = key, val
363 return kwargs_switch, k.replace('-', '_'), v
364
365 def _collect_args_by_argspec(self, cmd_dict: Dict[str, Any]) -> Dict[str, Any]:
366 kwargs = {}
367 kwargs_switch = False
368 for index, (name, tp) in enumerate(self.arg_spec.items()):
369 if name in CLICommand.KNOWN_ARGS:
370 continue
371 assert self.first_default >= 0
372 raw_v = cmd_dict.get(name)
373 if index >= self.first_default:
374 if raw_v is None:
375 continue
376 kwargs_switch, k, v = self._get_arg_value(kwargs_switch,
377 name, raw_v)
378 kwargs[k] = CephArgtype.cast_to(tp, v)
379 return kwargs
380
381 def call(self,
382 mgr: Any,
383 cmd_dict: Dict[str, Any],
384 inbuf: Optional[str] = None) -> HandleCommandResult:
385 kwargs = self._collect_args_by_argspec(cmd_dict)
386 if inbuf:
387 kwargs['inbuf'] = inbuf
388 assert self.func
389 return self.func(mgr, **kwargs)
390
391 def dump_cmd(self) -> Dict[str, Union[str, bool]]:
392 return {
393 'cmd': '{} {}'.format(self.prefix, self.args),
394 'desc': self.desc,
395 'perm': self.perm,
396 'poll': self.poll,
397 }
398
399 @classmethod
400 def dump_cmd_list(cls) -> List[Dict[str, Union[str, bool]]]:
401 return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
402
403
404 def CLIReadCommand(prefix: str, poll: bool = False) -> CLICommand:
405 return CLICommand(prefix, "r", poll)
406
407
408 def CLIWriteCommand(prefix: str, poll: bool = False) -> CLICommand:
409 return CLICommand(prefix, "w", poll)
410
411
412 def CLICheckNonemptyFileInput(func: HandlerFuncType) -> HandlerFuncType:
413 @functools.wraps(func)
414 def check(*args: Any, **kwargs: Any) -> Tuple[int, str, str]:
415 if 'inbuf' not in kwargs:
416 return -errno.EINVAL, '', ERROR_MSG_NO_INPUT_FILE
417 if isinstance(kwargs['inbuf'], str):
418 # Delete new line separator at EOF (it may have been added by a text editor).
419 kwargs['inbuf'] = kwargs['inbuf'].rstrip('\r\n').rstrip('\n')
420 if not kwargs['inbuf']:
421 return -errno.EINVAL, '', ERROR_MSG_EMPTY_INPUT_FILE
422 return func(*args, **kwargs)
423 check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
424 return check
425
426
427 def _get_localized_key(prefix: str, key: str) -> str:
428 return '{}/{}'.format(prefix, key)
429
430
431 """
432 MODULE_OPTIONS types and Option Class
433 """
434 if TYPE_CHECKING:
435 OptionTypeLabel = Literal[
436 'uint', 'int', 'str', 'float', 'bool', 'addr', 'addrvec', 'uuid', 'size', 'secs']
437
438
439 # common/options.h: value_t
440 OptionValue = Optional[Union[bool, int, float, str]]
441
442
443 class Option(Dict):
444 """
445 Helper class to declare options for MODULE_OPTIONS list.
446 TODO: Replace with typing.TypedDict when in python_version >= 3.8
447 """
448
449 def __init__(
450 self,
451 name: str,
452 default: OptionValue = None,
453 type: 'OptionTypeLabel' = 'str',
454 desc: Optional[str] = None,
455 long_desc: Optional[str] = None,
456 min: OptionValue = None,
457 max: OptionValue = None,
458 enum_allowed: Optional[List[str]] = None,
459 tags: Optional[List[str]] = None,
460 see_also: Optional[List[str]] = None,
461 runtime: bool = False,
462 ):
463 super(Option, self).__init__(
464 (k, v) for k, v in vars().items()
465 if k != 'self' and v is not None)
466
467
468 class Command(dict):
469 """
470 Helper class to declare options for COMMANDS list.
471
472 It also allows to specify prefix and args separately, as well as storing a
473 handler callable.
474
475 Usage:
476 >>> Command(prefix="example",
477 ... args="name=arg,type=CephInt",
478 ... perm='w',
479 ... desc="Blah")
480 {'poll': False, 'cmd': 'example name=arg,type=CephInt', 'perm': 'w', 'desc': 'Blah'}
481 """
482
483 def __init__(
484 self,
485 prefix: str,
486 handler: HandlerFuncType,
487 perm: str = "rw",
488 poll: bool = False,
489 ):
490 super().__init__(perm=perm,
491 poll=poll)
492 self.prefix = prefix
493 self.handler = handler
494
495 @staticmethod
496 def returns_command_result(instance: Any,
497 f: HandlerFuncType) -> Callable[..., HandleCommandResult]:
498 @functools.wraps(f)
499 def wrapper(mgr: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
500 retval, stdout, stderr = f(instance or mgr, *args, **kwargs)
501 return HandleCommandResult(retval, stdout, stderr)
502 wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined]
503 return wrapper
504
505 def register(self, instance: bool = False) -> HandlerFuncType:
506 """
507 Register a CLICommand handler. It allows an instance to register bound
508 methods. In that case, the mgr instance is not passed, and it's expected
509 to be available in the class instance.
510 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
511 items.
512 """
513 cmd = CLICommand(prefix=self.prefix, perm=self['perm'])
514 return cmd(self.returns_command_result(instance, self.handler))
515
516
517 class CPlusPlusHandler(logging.Handler):
518 def __init__(self, module_inst: Any):
519 super(CPlusPlusHandler, self).__init__()
520 self._module = module_inst
521 self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
522 .format(module_inst.module_name)))
523
524 def emit(self, record: logging.LogRecord) -> None:
525 if record.levelno >= self.level:
526 self._module._ceph_log(self.format(record))
527
528
529 class ClusterLogHandler(logging.Handler):
530 def __init__(self, module_inst: Any):
531 super().__init__()
532 self._module = module_inst
533 self.setFormatter(logging.Formatter("%(message)s"))
534
535 def emit(self, record: logging.LogRecord) -> None:
536 levelmap = {
537 logging.DEBUG: MgrModule.ClusterLogPrio.DEBUG,
538 logging.INFO: MgrModule.ClusterLogPrio.INFO,
539 logging.WARNING: MgrModule.ClusterLogPrio.WARN,
540 logging.ERROR: MgrModule.ClusterLogPrio.ERROR,
541 logging.CRITICAL: MgrModule.ClusterLogPrio.ERROR,
542 }
543 level = levelmap[record.levelno]
544 if record.levelno >= self.level:
545 self._module.cluster_log(self._module.module_name,
546 level,
547 self.format(record))
548
549
550 class FileHandler(logging.FileHandler):
551 def __init__(self, module_inst: Any):
552 path = module_inst.get_ceph_option("log_file")
553 idx = path.rfind(".log")
554 if idx != -1:
555 self.path = "{}.{}.log".format(path[:idx], module_inst.module_name)
556 else:
557 self.path = "{}.{}".format(path, module_inst.module_name)
558 super(FileHandler, self).__init__(self.path, delay=True)
559 self.setFormatter(logging.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
560
561
562 class MgrModuleLoggingMixin(object):
563 def _configure_logging(self,
564 mgr_level: str,
565 module_level: str,
566 cluster_level: str,
567 log_to_file: bool,
568 log_to_cluster: bool) -> None:
569 self._mgr_level: Optional[str] = None
570 self._module_level: Optional[str] = None
571 self._root_logger = logging.getLogger()
572
573 self._unconfigure_logging()
574
575 # the ceph log handler is initialized only once
576 self._mgr_log_handler = CPlusPlusHandler(self)
577 self._cluster_log_handler = ClusterLogHandler(self)
578 self._file_log_handler = FileHandler(self)
579
580 self.log_to_file = log_to_file
581 self.log_to_cluster = log_to_cluster
582
583 self._root_logger.addHandler(self._mgr_log_handler)
584 if log_to_file:
585 self._root_logger.addHandler(self._file_log_handler)
586 if log_to_cluster:
587 self._root_logger.addHandler(self._cluster_log_handler)
588
589 self._root_logger.setLevel(logging.NOTSET)
590 self._set_log_level(mgr_level, module_level, cluster_level)
591
592 def _unconfigure_logging(self) -> None:
593 # remove existing handlers:
594 rm_handlers = [
595 h for h in self._root_logger.handlers
596 if (isinstance(h, CPlusPlusHandler) or
597 isinstance(h, FileHandler) or
598 isinstance(h, ClusterLogHandler))]
599 for h in rm_handlers:
600 self._root_logger.removeHandler(h)
601 self.log_to_file = False
602 self.log_to_cluster = False
603
604 def _set_log_level(self,
605 mgr_level: str,
606 module_level: str,
607 cluster_level: str) -> None:
608 self._cluster_log_handler.setLevel(cluster_level.upper())
609
610 module_level = module_level.upper() if module_level else ''
611 if not self._module_level:
612 # using debug_mgr level
613 if not module_level and self._mgr_level == mgr_level:
614 # no change in module level neither in debug_mgr
615 return
616 else:
617 if self._module_level == module_level:
618 # no change in module level
619 return
620
621 if not self._module_level and not module_level:
622 level = self._ceph_log_level_to_python(mgr_level)
623 self.getLogger().debug("setting log level based on debug_mgr: %s (%s)",
624 level, mgr_level)
625 elif self._module_level and not module_level:
626 level = self._ceph_log_level_to_python(mgr_level)
627 self.getLogger().warning("unsetting module log level, falling back to "
628 "debug_mgr level: %s (%s)", level, mgr_level)
629 elif module_level:
630 level = module_level
631 self.getLogger().debug("setting log level: %s", level)
632
633 self._module_level = module_level
634 self._mgr_level = mgr_level
635
636 self._mgr_log_handler.setLevel(level)
637 self._file_log_handler.setLevel(level)
638
639 def _enable_file_log(self) -> None:
640 # enable file log
641 self.getLogger().warning("enabling logging to file")
642 self.log_to_file = True
643 self._root_logger.addHandler(self._file_log_handler)
644
645 def _disable_file_log(self) -> None:
646 # disable file log
647 self.getLogger().warning("disabling logging to file")
648 self.log_to_file = False
649 self._root_logger.removeHandler(self._file_log_handler)
650
651 def _enable_cluster_log(self) -> None:
652 # enable cluster log
653 self.getLogger().warning("enabling logging to cluster")
654 self.log_to_cluster = True
655 self._root_logger.addHandler(self._cluster_log_handler)
656
657 def _disable_cluster_log(self) -> None:
658 # disable cluster log
659 self.getLogger().warning("disabling logging to cluster")
660 self.log_to_cluster = False
661 self._root_logger.removeHandler(self._cluster_log_handler)
662
663 def _ceph_log_level_to_python(self, log_level: str) -> str:
664 if log_level:
665 try:
666 ceph_log_level = int(log_level.split("/", 1)[0])
667 except ValueError:
668 ceph_log_level = 0
669 else:
670 ceph_log_level = 0
671
672 log_level = "DEBUG"
673 if ceph_log_level <= 0:
674 log_level = "CRITICAL"
675 elif ceph_log_level <= 1:
676 log_level = "WARNING"
677 elif ceph_log_level <= 4:
678 log_level = "INFO"
679 return log_level
680
681 def getLogger(self, name: Optional[str] = None) -> logging.Logger:
682 return logging.getLogger(name)
683
684
685 class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
686 """
687 Standby modules only implement a serve and shutdown method, they
688 are not permitted to implement commands and they do not receive
689 any notifications.
690
691 They only have access to the mgrmap (for accessing service URI info
692 from their active peer), and to configuration settings (read only).
693 """
694
695 MODULE_OPTIONS: List[Option] = []
696 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
697
698 def __init__(self, module_name: str, capsule: Any):
699 super(MgrStandbyModule, self).__init__(capsule)
700 self.module_name = module_name
701
702 # see also MgrModule.__init__()
703 for o in self.MODULE_OPTIONS:
704 if 'default' in o:
705 if 'type' in o:
706 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
707 else:
708 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
709
710 # mock does not return a str
711 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
712 log_level = cast(str, self.get_module_option("log_level"))
713 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
714 self._configure_logging(mgr_level, log_level, cluster_level,
715 False, False)
716
717 # for backwards compatibility
718 self._logger = self.getLogger()
719
720 def __del__(self) -> None:
721 self._cleanup()
722 self._unconfigure_logging()
723
724 def _cleanup(self) -> None:
725 pass
726
727 @classmethod
728 def _register_options(cls, module_name: str) -> None:
729 cls.MODULE_OPTIONS.append(
730 Option(name='log_level', type='str', default="", runtime=True,
731 enum_allowed=['info', 'debug', 'critical', 'error',
732 'warning', '']))
733 cls.MODULE_OPTIONS.append(
734 Option(name='log_to_file', type='bool', default=False, runtime=True))
735 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
736 cls.MODULE_OPTIONS.append(
737 Option(name='log_to_cluster', type='bool', default=False,
738 runtime=True))
739 cls.MODULE_OPTIONS.append(
740 Option(name='log_to_cluster_level', type='str', default='info',
741 runtime=True,
742 enum_allowed=['info', 'debug', 'critical', 'error',
743 'warning', '']))
744
745 @property
746 def log(self) -> logging.Logger:
747 return self._logger
748
749 def serve(self) -> None:
750 """
751 The serve method is mandatory for standby modules.
752 :return:
753 """
754 raise NotImplementedError()
755
756 def get_mgr_id(self) -> str:
757 return self._ceph_get_mgr_id()
758
759 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
760 """
761 Retrieve the value of a persistent configuration setting
762
763 :param default: the default value of the config if it is not found
764 """
765 r = self._ceph_get_module_option(key)
766 if r is None:
767 return self.MODULE_OPTION_DEFAULTS.get(key, default)
768 else:
769 return r
770
771 def get_ceph_option(self, key: str) -> OptionValue:
772 return self._ceph_get_option(key)
773
774 def get_store(self, key: str) -> Optional[str]:
775 """
776 Retrieve the value of a persistent KV store entry
777
778 :param key: String
779 :return: Byte string or None
780 """
781 return self._ceph_get_store(key)
782
783 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
784 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
785 if r is None:
786 r = self._ceph_get_store(key)
787 if r is None:
788 r = default
789 return r
790
791 def get_active_uri(self) -> str:
792 return self._ceph_get_active_uri()
793
794 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
795 r = self._ceph_get_module_option(key, self.get_mgr_id())
796 if r is None:
797 return self.MODULE_OPTION_DEFAULTS.get(key, default)
798 else:
799 return r
800
801
802 HealthChecksT = Mapping[str, Mapping[str, Union[int, str, Sequence[str]]]]
803 # {"type": service_type, "id": service_id}
804 ServiceInfoT = Dict[str, str]
805 # {"hostname": hostname,
806 # "ceph_version": version,
807 # "services": [service_info, ..]}
808 ServerInfoT = Dict[str, Union[str, List[ServiceInfoT]]]
809 PerfCounterT = Dict[str, Any]
810
811
812 class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
813 COMMANDS = [] # type: List[Any]
814 MODULE_OPTIONS: List[Option] = []
815 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
816
817 # Priority definitions for perf counters
818 PRIO_CRITICAL = 10
819 PRIO_INTERESTING = 8
820 PRIO_USEFUL = 5
821 PRIO_UNINTERESTING = 2
822 PRIO_DEBUGONLY = 0
823
824 # counter value types
825 PERFCOUNTER_TIME = 1
826 PERFCOUNTER_U64 = 2
827
828 # counter types
829 PERFCOUNTER_LONGRUNAVG = 4
830 PERFCOUNTER_COUNTER = 8
831 PERFCOUNTER_HISTOGRAM = 0x10
832 PERFCOUNTER_TYPE_MASK = ~3
833
834 # units supported
835 BYTES = 0
836 NONE = 1
837
838 # Cluster log priorities
839 class ClusterLogPrio(IntEnum):
840 DEBUG = 0
841 INFO = 1
842 SEC = 2
843 WARN = 3
844 ERROR = 4
845
846 def __init__(self, module_name: str, py_modules_ptr: object, this_ptr: object):
847 self.module_name = module_name
848 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
849
850 for o in self.MODULE_OPTIONS:
851 if 'default' in o:
852 if 'type' in o:
853 # we'll assume the declared type matches the
854 # supplied default value's type.
855 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
856 else:
857 # module not declaring it's type, so normalize the
858 # default value to be a string for consistent behavior
859 # with default and user-supplied option values.
860 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
861
862 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
863 log_level = cast(str, self.get_module_option("log_level"))
864 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
865 log_to_file = self.get_module_option("log_to_file")
866 assert isinstance(log_to_file, bool)
867 log_to_cluster = self.get_module_option("log_to_cluster")
868 assert isinstance(log_to_cluster, bool)
869 self._configure_logging(mgr_level, log_level, cluster_level,
870 log_to_file, log_to_cluster)
871
872 # for backwards compatibility
873 self._logger = self.getLogger()
874
875 self._version = self._ceph_get_version()
876
877 self._perf_schema_cache = None
878
879 # Keep a librados instance for those that need it.
880 self._rados: Optional[rados.Rados] = None
881
882 def __del__(self) -> None:
883 self._unconfigure_logging()
884
885 @classmethod
886 def _register_options(cls, module_name: str) -> None:
887 cls.MODULE_OPTIONS.append(
888 Option(name='log_level', type='str', default="", runtime=True,
889 enum_allowed=['info', 'debug', 'critical', 'error',
890 'warning', '']))
891 cls.MODULE_OPTIONS.append(
892 Option(name='log_to_file', type='bool', default=False, runtime=True))
893 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
894 cls.MODULE_OPTIONS.append(
895 Option(name='log_to_cluster', type='bool', default=False,
896 runtime=True))
897 cls.MODULE_OPTIONS.append(
898 Option(name='log_to_cluster_level', type='str', default='info',
899 runtime=True,
900 enum_allowed=['info', 'debug', 'critical', 'error',
901 'warning', '']))
902
903 @classmethod
904 def _register_commands(cls, module_name: str) -> None:
905 cls.COMMANDS.extend(CLICommand.dump_cmd_list())
906
907 @property
908 def log(self) -> logging.Logger:
909 return self._logger
910
911 def cluster_log(self, channel: str, priority: ClusterLogPrio, message: str) -> None:
912 """
913 :param channel: The log channel. This can be 'cluster', 'audit', ...
914 :param priority: The log message priority.
915 :param message: The message to log.
916 """
917 self._ceph_cluster_log(channel, priority.value, message)
918
919 @property
920 def version(self) -> str:
921 return self._version
922
923 @property
924 def release_name(self) -> str:
925 """
926 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
927 :return: Returns the release name of the Ceph version in lower case.
928 :rtype: str
929 """
930 return self._ceph_get_release_name()
931
932 def lookup_release_name(self, major: int) -> str:
933 return self._ceph_lookup_release_name(major)
934
935 def get_context(self) -> object:
936 """
937 :return: a Python capsule containing a C++ CephContext pointer
938 """
939 return self._ceph_get_context()
940
941 def notify(self, notify_type: str, notify_id: str) -> None:
942 """
943 Called by the ceph-mgr service to notify the Python plugin
944 that new state is available.
945
946 :param notify_type: string indicating what kind of notification,
947 such as osd_map, mon_map, fs_map, mon_status,
948 health, pg_summary, command, service_map
949 :param notify_id: string (may be empty) that optionally specifies
950 which entity is being notified about. With
951 "command" notifications this is set to the tag
952 ``from send_command``.
953 """
954 pass
955
956 def _config_notify(self) -> None:
957 # check logging options for changes
958 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
959 module_level = cast(str, self.get_module_option("log_level"))
960 cluster_level = cast(str, self.get_module_option("log_to_cluster_level"))
961 assert isinstance(cluster_level, str)
962 log_to_file = self.get_module_option("log_to_file", False)
963 assert isinstance(log_to_file, bool)
964 log_to_cluster = self.get_module_option("log_to_cluster", False)
965 assert isinstance(log_to_cluster, bool)
966 self._set_log_level(mgr_level, module_level, cluster_level)
967
968 if log_to_file != self.log_to_file:
969 if log_to_file:
970 self._enable_file_log()
971 else:
972 self._disable_file_log()
973 if log_to_cluster != self.log_to_cluster:
974 if log_to_cluster:
975 self._enable_cluster_log()
976 else:
977 self._disable_cluster_log()
978
979 # call module subclass implementations
980 self.config_notify()
981
982 def config_notify(self) -> None:
983 """
984 Called by the ceph-mgr service to notify the Python plugin
985 that the configuration may have changed. Modules will want to
986 refresh any configuration values stored in config variables.
987 """
988 pass
989
990 def serve(self) -> None:
991 """
992 Called by the ceph-mgr service to start any server that
993 is provided by this Python plugin. The implementation
994 of this function should block until ``shutdown`` is called.
995
996 You *must* implement ``shutdown`` if you implement ``serve``
997 """
998 pass
999
1000 def shutdown(self) -> None:
1001 """
1002 Called by the ceph-mgr service to request that this
1003 module drop out of its serve() function. You do not
1004 need to implement this if you do not implement serve()
1005
1006 :return: None
1007 """
1008 if self._rados:
1009 addrs = self._rados.get_addrs()
1010 self._rados.shutdown()
1011 self._ceph_unregister_client(addrs)
1012
1013 def get(self, data_name: str):
1014 """
1015 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
1016
1017 :param str data_name: Valid things to fetch are osd_crush_map_text,
1018 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
1019 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
1020 health, mon_status, devices, device <devid>, pg_stats,
1021 pool_stats, pg_ready, osd_ping_times.
1022
1023 Note:
1024 All these structures have their own JSON representations: experiment
1025 or look at the C++ ``dump()`` methods to learn about them.
1026 """
1027 return self._ceph_get(data_name)
1028
1029 def _stattype_to_str(self, stattype: int) -> str:
1030
1031 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
1032 if typeonly == 0:
1033 return 'gauge'
1034 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
1035 # this lie matches the DaemonState decoding: only val, no counts
1036 return 'counter'
1037 if typeonly == self.PERFCOUNTER_COUNTER:
1038 return 'counter'
1039 if typeonly == self.PERFCOUNTER_HISTOGRAM:
1040 return 'histogram'
1041
1042 return ''
1043
1044 def _perfpath_to_path_labels(self, daemon: str,
1045 path: str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]:
1046 label_names = ("ceph_daemon",) # type: Tuple[str, ...]
1047 labels = (daemon,) # type: Tuple[str, ...]
1048
1049 if daemon.startswith('rbd-mirror.'):
1050 match = re.match(
1051 r'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
1052 path
1053 )
1054 if match:
1055 path = 'rbd_mirror_image_' + match.group(4)
1056 pool = match.group(1)
1057 namespace = match.group(2) or ''
1058 image = match.group(3)
1059 label_names += ('pool', 'namespace', 'image')
1060 labels += (pool, namespace, image)
1061
1062 return path, label_names, labels,
1063
1064 def _perfvalue_to_value(self, stattype: int, value: Union[int, float]) -> Union[float, int]:
1065 if stattype & self.PERFCOUNTER_TIME:
1066 # Convert from ns to seconds
1067 return value / 1000000000.0
1068 else:
1069 return value
1070
1071 def _unit_to_str(self, unit: int) -> str:
1072 if unit == self.NONE:
1073 return "/s"
1074 elif unit == self.BYTES:
1075 return "B/s"
1076 else:
1077 raise ValueError(f'bad unit "{unit}"')
1078
1079 @staticmethod
1080 def to_pretty_iec(n: int) -> str:
1081 for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
1082 (20, 'Mi'), (10, 'Ki')]:
1083 if n > 10 << bits:
1084 return str(n >> bits) + ' ' + suffix
1085 return str(n) + ' '
1086
1087 @staticmethod
1088 def get_pretty_row(elems: Sequence[str], width: int) -> str:
1089 """
1090 Takes an array of elements and returns a string with those elements
1091 formatted as a table row. Useful for polling modules.
1092
1093 :param elems: the elements to be printed
1094 :param width: the width of the terminal
1095 """
1096 n = len(elems)
1097 column_width = int(width / n)
1098
1099 ret = '|'
1100 for elem in elems:
1101 ret += '{0:>{w}} |'.format(elem, w=column_width - 2)
1102
1103 return ret
1104
1105 def get_pretty_header(self, elems: Sequence[str], width: int) -> str:
1106 """
1107 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
1108
1109 :param elems: the elements to be printed
1110 :param width: the width of the terminal
1111 """
1112 n = len(elems)
1113 column_width = int(width / n)
1114
1115 # dash line
1116 ret = '+'
1117 for i in range(0, n):
1118 ret += '-' * (column_width - 1) + '+'
1119 ret += '\n'
1120
1121 # title
1122 ret += self.get_pretty_row(elems, width)
1123 ret += '\n'
1124
1125 # dash line
1126 ret += '+'
1127 for i in range(0, n):
1128 ret += '-' * (column_width - 1) + '+'
1129 ret += '\n'
1130
1131 return ret
1132
1133 def get_server(self, hostname) -> ServerInfoT:
1134 """
1135 Called by the plugin to fetch metadata about a particular hostname from
1136 ceph-mgr.
1137
1138 This is information that ceph-mgr has gleaned from the daemon metadata
1139 reported by daemons running on a particular server.
1140
1141 :param hostname: a hostname
1142 """
1143 return cast(ServerInfoT, self._ceph_get_server(hostname))
1144
1145 def get_perf_schema(self,
1146 svc_type: str,
1147 svc_name: str) -> Dict[str,
1148 Dict[str, Dict[str, Union[str, int]]]]:
1149 """
1150 Called by the plugin to fetch perf counter schema info.
1151 svc_name can be nullptr, as can svc_type, in which case
1152 they are wildcards
1153
1154 :param str svc_type:
1155 :param str svc_name:
1156 :return: list of dicts describing the counters requested
1157 """
1158 return self._ceph_get_perf_schema(svc_type, svc_name)
1159
1160 def get_counter(self,
1161 svc_type: str,
1162 svc_name: str,
1163 path: str) -> Dict[str, List[Tuple[float, int]]]:
1164 """
1165 Called by the plugin to fetch the latest performance counter data for a
1166 particular counter on a particular service.
1167
1168 :param str svc_type:
1169 :param str svc_name:
1170 :param str path: a period-separated concatenation of the subsystem and the
1171 counter name, for example "mds.inodes".
1172 :return: A dict of counter names to their values. each value is a list of
1173 of two-tuples of (timestamp, value). This may be empty if no data is
1174 available.
1175 """
1176 return self._ceph_get_counter(svc_type, svc_name, path)
1177
1178 def get_latest_counter(self,
1179 svc_type: str,
1180 svc_name: str,
1181 path: str) -> Dict[str, Union[Tuple[float, int],
1182 Tuple[float, int, int]]]:
1183 """
1184 Called by the plugin to fetch only the newest performance counter data
1185 point for a particular counter on a particular service.
1186
1187 :param str svc_type:
1188 :param str svc_name:
1189 :param str path: a period-separated concatenation of the subsystem and the
1190 counter name, for example "mds.inodes".
1191 :return: A list of two-tuples of (timestamp, value) or three-tuple of
1192 (timestamp, value, count) is returned. This may be empty if no
1193 data is available.
1194 """
1195 return self._ceph_get_latest_counter(svc_type, svc_name, path)
1196
1197 def list_servers(self) -> List[ServerInfoT]:
1198 """
1199 Like ``get_server``, but gives information about all servers (i.e. all
1200 unique hostnames that have been mentioned in daemon metadata)
1201
1202 :return: a list of information about all servers
1203 :rtype: list
1204 """
1205 return cast(List[ServerInfoT], self._ceph_get_server(None))
1206
1207 def get_metadata(self,
1208 svc_type: str,
1209 svc_id: str,
1210 default: Optional[Dict[str, str]] = None) -> Optional[Dict[str, str]]:
1211 """
1212 Fetch the daemon metadata for a particular service.
1213
1214 ceph-mgr fetches metadata asynchronously, so are windows of time during
1215 addition/removal of services where the metadata is not available to
1216 modules. ``None`` is returned if no metadata is available.
1217
1218 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1219 :param str svc_id: service id. convert OSD integer IDs to strings when
1220 calling this
1221 :rtype: dict, or None if no metadata found
1222 """
1223 metadata = self._ceph_get_metadata(svc_type, svc_id)
1224 if metadata is None:
1225 return default
1226 return metadata
1227
1228 def get_daemon_status(self, svc_type: str, svc_id: str) -> Dict[str, str]:
1229 """
1230 Fetch the latest status for a particular service daemon.
1231
1232 This method may return ``None`` if no status information is
1233 available, for example because the daemon hasn't fully started yet.
1234
1235 :param svc_type: string (e.g., 'rgw')
1236 :param svc_id: string
1237 :return: dict, or None if the service is not found
1238 """
1239 return self._ceph_get_daemon_status(svc_type, svc_id)
1240
1241 def check_mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> HandleCommandResult:
1242 """
1243 Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
1244 if ``retval != 0``.
1245 """
1246
1247 r = HandleCommandResult(*self.mon_command(cmd_dict, inbuf))
1248 if r.retval:
1249 raise MonCommandFailed(f'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
1250 return r
1251
1252 def mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1253 """
1254 Helper for modules that do simple, synchronous mon command
1255 execution.
1256
1257 See send_command for general case.
1258
1259 :return: status int, out std, err str
1260 """
1261
1262 t1 = time.time()
1263 result = CommandResult()
1264 self.send_command(result, "mon", "", json.dumps(cmd_dict), "", inbuf)
1265 r = result.wait()
1266 t2 = time.time()
1267
1268 self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1269 cmd_dict['prefix'], r[0], t2 - t1
1270 ))
1271
1272 return r
1273
1274 def send_command(
1275 self,
1276 result: CommandResult,
1277 svc_type: str,
1278 svc_id: str,
1279 command: str,
1280 tag: str,
1281 inbuf: Optional[str] = None) -> None:
1282 """
1283 Called by the plugin to send a command to the mon
1284 cluster.
1285
1286 :param CommandResult result: an instance of the ``CommandResult``
1287 class, defined in the same module as MgrModule. This acts as a
1288 completion and stores the output of the command. Use
1289 ``CommandResult.wait()`` if you want to block on completion.
1290 :param str svc_type:
1291 :param str svc_id:
1292 :param str command: a JSON-serialized command. This uses the same
1293 format as the ceph command line, which is a dictionary of command
1294 arguments, with the extra ``prefix`` key containing the command
1295 name itself. Consult MonCommands.h for available commands and
1296 their expected arguments.
1297 :param str tag: used for nonblocking operation: when a command
1298 completes, the ``notify()`` callback on the MgrModule instance is
1299 triggered, with notify_type set to "command", and notify_id set to
1300 the tag of the command.
1301 :param str inbuf: input buffer for sending additional data.
1302 """
1303 self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
1304
1305 def set_health_checks(self, checks: HealthChecksT) -> None:
1306 """
1307 Set the module's current map of health checks. Argument is a
1308 dict of check names to info, in this form:
1309
1310 ::
1311
1312 {
1313 'CHECK_FOO': {
1314 'severity': 'warning', # or 'error'
1315 'summary': 'summary string',
1316 'count': 4, # quantify badness
1317 'detail': [ 'list', 'of', 'detail', 'strings' ],
1318 },
1319 'CHECK_BAR': {
1320 'severity': 'error',
1321 'summary': 'bars are bad',
1322 'detail': [ 'too hard' ],
1323 },
1324 }
1325
1326 :param list: dict of health check dicts
1327 """
1328 self._ceph_set_health_checks(checks)
1329
1330 def _handle_command(self,
1331 inbuf: str,
1332 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1333 Tuple[int, str, str]]:
1334 if cmd['prefix'] not in CLICommand.COMMANDS:
1335 return self.handle_command(inbuf, cmd)
1336
1337 return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
1338
1339 def handle_command(self,
1340 inbuf: str,
1341 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1342 Tuple[int, str, str]]:
1343 """
1344 Called by ceph-mgr to request the plugin to handle one
1345 of the commands that it declared in self.COMMANDS
1346
1347 Return a status code, an output buffer, and an
1348 output string. The output buffer is for data results,
1349 the output string is for informative text.
1350
1351 :param inbuf: content of any "-i <file>" supplied to ceph cli
1352 :type inbuf: str
1353 :param cmd: from Ceph's cmdmap_t
1354 :type cmd: dict
1355
1356 :return: HandleCommandResult or a 3-tuple of (int, str, str)
1357 """
1358
1359 # Should never get called if they didn't declare
1360 # any ``COMMANDS``
1361 raise NotImplementedError()
1362
1363 def get_mgr_id(self) -> str:
1364 """
1365 Retrieve the name of the manager daemon where this plugin
1366 is currently being executed (i.e. the active manager).
1367
1368 :return: str
1369 """
1370 return self._ceph_get_mgr_id()
1371
1372 def get_ceph_option(self, key: str) -> OptionValue:
1373 return self._ceph_get_option(key)
1374
1375 def get_foreign_ceph_option(self, entity: str, key: str) -> OptionValue:
1376 return self._ceph_get_foreign_option(entity, key)
1377
1378 def _validate_module_option(self, key: str) -> None:
1379 """
1380 Helper: don't allow get/set config callers to
1381 access config options that they didn't declare
1382 in their schema.
1383 """
1384 if key not in [o['name'] for o in self.MODULE_OPTIONS]:
1385 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1386 format(key, self.__class__.__name__))
1387
1388 def _get_module_option(self,
1389 key: str,
1390 default: OptionValue,
1391 localized_prefix: str = "") -> OptionValue:
1392 r = self._ceph_get_module_option(self.module_name, key,
1393 localized_prefix)
1394 if r is None:
1395 return self.MODULE_OPTION_DEFAULTS.get(key, default)
1396 else:
1397 return r
1398
1399 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
1400 """
1401 Retrieve the value of a persistent configuration setting
1402 """
1403 self._validate_module_option(key)
1404 return self._get_module_option(key, default)
1405
1406 def get_module_option_ex(self, module: str,
1407 key: str,
1408 default: OptionValue = None) -> OptionValue:
1409 """
1410 Retrieve the value of a persistent configuration setting
1411 for the specified module.
1412
1413 :param module: The name of the module, e.g. 'dashboard'
1414 or 'telemetry'.
1415 :param key: The configuration key, e.g. 'server_addr'.
1416 :param default: The default value to use when the
1417 returned value is ``None``. Defaults to ``None``.
1418 """
1419 if module == self.module_name:
1420 self._validate_module_option(key)
1421 r = self._ceph_get_module_option(module, key)
1422 return default if r is None else r
1423
1424 def get_store_prefix(self, key_prefix: str) -> Dict[str, str]:
1425 """
1426 Retrieve a dict of KV store keys to values, where the keys
1427 have the given prefix
1428
1429 :param str key_prefix:
1430 :return: str
1431 """
1432 return self._ceph_get_store_prefix(key_prefix)
1433
1434 def _set_localized(self,
1435 key: str,
1436 val: Optional[str],
1437 setter: Callable[[str, Optional[str]], None]) -> None:
1438 return setter(_get_localized_key(self.get_mgr_id(), key), val)
1439
1440 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
1441 """
1442 Retrieve localized configuration for this ceph-mgr instance
1443 """
1444 self._validate_module_option(key)
1445 return self._get_module_option(key, default, self.get_mgr_id())
1446
1447 def _set_module_option(self, key: str, val: Any) -> None:
1448 return self._ceph_set_module_option(self.module_name, key,
1449 None if val is None else str(val))
1450
1451 def set_module_option(self, key: str, val: Any) -> None:
1452 """
1453 Set the value of a persistent configuration setting
1454
1455 :param str key:
1456 :type val: str | None
1457 """
1458 self._validate_module_option(key)
1459 return self._set_module_option(key, val)
1460
1461 def set_module_option_ex(self, module: str, key: str, val: OptionValue) -> None:
1462 """
1463 Set the value of a persistent configuration setting
1464 for the specified module.
1465
1466 :param str module:
1467 :param str key:
1468 :param str val:
1469 """
1470 if module == self.module_name:
1471 self._validate_module_option(key)
1472 return self._ceph_set_module_option(module, key, str(val))
1473
1474 def set_localized_module_option(self, key: str, val: Optional[str]) -> None:
1475 """
1476 Set localized configuration for this ceph-mgr instance
1477 :param str key:
1478 :param str val:
1479 :return: str
1480 """
1481 self._validate_module_option(key)
1482 return self._set_localized(key, val, self._set_module_option)
1483
1484 def set_store(self, key: str, val: Optional[str]) -> None:
1485 """
1486 Set a value in this module's persistent key value store.
1487 If val is None, remove key from store
1488 """
1489 self._ceph_set_store(key, val)
1490
1491 def get_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
1492 """
1493 Get a value from this module's persistent key value store
1494 """
1495 r = self._ceph_get_store(key)
1496 if r is None:
1497 return default
1498 else:
1499 return r
1500
1501 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
1502 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
1503 if r is None:
1504 r = self._ceph_get_store(key)
1505 if r is None:
1506 r = default
1507 return r
1508
1509 def set_localized_store(self, key: str, val: Optional[str]) -> None:
1510 return self._set_localized(key, val, self.set_store)
1511
1512 def self_test(self) -> None:
1513 """
1514 Run a self-test on the module. Override this function and implement
1515 a best as possible self-test for (automated) testing of the module
1516
1517 Indicate any failures by raising an exception. This does not have
1518 to be pretty, it's mainly for picking up regressions during
1519 development, rather than use in the field.
1520
1521 :return: None, or an advisory string for developer interest, such
1522 as a json dump of some state.
1523 """
1524 pass
1525
1526 def get_osdmap(self) -> OSDMap:
1527 """
1528 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1529 OSDMap.
1530 :return: OSDMap
1531 """
1532 return cast(OSDMap, self._ceph_get_osdmap())
1533
1534 def get_latest(self, daemon_type: str, daemon_name: str, counter: str) -> int:
1535 data = self.get_latest_counter(
1536 daemon_type, daemon_name, counter)[counter]
1537 if data:
1538 return data[1]
1539 else:
1540 return 0
1541
1542 def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tuple[int, int]:
1543 data = self.get_latest_counter(
1544 daemon_type, daemon_name, counter)[counter]
1545 if data:
1546 # https://github.com/python/mypy/issues/1178
1547 _, value, count = cast(Tuple[float, int, int], data)
1548 return value, count
1549 else:
1550 return 0, 0
1551
1552 @profile_method()
1553 def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL,
1554 services: Sequence[str] = ("mds", "mon", "osd",
1555 "rbd-mirror", "rgw",
1556 "tcmu-runner")) -> Dict[str, dict]:
1557 """
1558 Return the perf counters currently known to this ceph-mgr
1559 instance, filtered by priority equal to or greater than `prio_limit`.
1560
1561 The result is a map of string to dict, associating services
1562 (like "osd.123") with their counters. The counter
1563 dict for each service maps counter paths to a counter
1564 info structure, which is the information from
1565 the schema, plus an additional "value" member with the latest
1566 value.
1567 """
1568
1569 result = defaultdict(dict) # type: Dict[str, dict]
1570
1571 for server in self.list_servers():
1572 for service in cast(List[ServiceInfoT], server['services']):
1573 if service['type'] not in services:
1574 continue
1575
1576 schemas = self.get_perf_schema(service['type'], service['id'])
1577 if not schemas:
1578 self.log.warning("No perf counter schema for {0}.{1}".format(
1579 service['type'], service['id']
1580 ))
1581 continue
1582
1583 # Value is returned in a potentially-multi-service format,
1584 # get just the service we're asking about
1585 svc_full_name = "{0}.{1}".format(
1586 service['type'], service['id'])
1587 schema = schemas[svc_full_name]
1588
1589 # Populate latest values
1590 for counter_path, counter_schema in schema.items():
1591 # self.log.debug("{0}: {1}".format(
1592 # counter_path, json.dumps(counter_schema)
1593 # ))
1594 priority = counter_schema['priority']
1595 assert isinstance(priority, int)
1596 if priority < prio_limit:
1597 continue
1598
1599 tp = counter_schema['type']
1600 assert isinstance(tp, int)
1601 counter_info = dict(counter_schema)
1602 # Also populate count for the long running avgs
1603 if tp & self.PERFCOUNTER_LONGRUNAVG:
1604 v, c = self.get_latest_avg(
1605 service['type'],
1606 service['id'],
1607 counter_path
1608 )
1609 counter_info['value'], counter_info['count'] = v, c
1610 result[svc_full_name][counter_path] = counter_info
1611 else:
1612 counter_info['value'] = self.get_latest(
1613 service['type'],
1614 service['id'],
1615 counter_path
1616 )
1617
1618 result[svc_full_name][counter_path] = counter_info
1619
1620 self.log.debug("returning {0} counter".format(len(result)))
1621
1622 return result
1623
1624 def set_uri(self, uri: str) -> None:
1625 """
1626 If the module exposes a service, then call this to publish the
1627 address once it is available.
1628
1629 :return: a string
1630 """
1631 return self._ceph_set_uri(uri)
1632
1633 def set_device_wear_level(self, devid: str, wear_level: float) -> None:
1634 return self._ceph_set_device_wear_level(devid, wear_level)
1635
1636 def have_mon_connection(self) -> bool:
1637 """
1638 Check whether this ceph-mgr daemon has an open connection
1639 to a monitor. If it doesn't, then it's likely that the
1640 information we have about the cluster is out of date,
1641 and/or the monitor cluster is down.
1642 """
1643
1644 return self._ceph_have_mon_connection()
1645
1646 def update_progress_event(self,
1647 evid: str,
1648 desc: str,
1649 progress: float,
1650 add_to_ceph_s: bool) -> None:
1651 return self._ceph_update_progress_event(evid, desc, progress, add_to_ceph_s)
1652
1653 def complete_progress_event(self, evid: str) -> None:
1654 return self._ceph_complete_progress_event(evid)
1655
1656 def clear_all_progress_events(self) -> None:
1657 return self._ceph_clear_all_progress_events()
1658
1659 @property
1660 def rados(self) -> rados.Rados:
1661 """
1662 A librados instance to be shared by any classes within
1663 this mgr module that want one.
1664 """
1665 if self._rados:
1666 return self._rados
1667
1668 ctx_capsule = self.get_context()
1669 self._rados = rados.Rados(context=ctx_capsule)
1670 self._rados.connect()
1671 self._ceph_register_client(self._rados.get_addrs())
1672 return self._rados
1673
1674 @staticmethod
1675 def can_run() -> Tuple[bool, str]:
1676 """
1677 Implement this function to report whether the module's dependencies
1678 are met. For example, if the module needs to import a particular
1679 dependency to work, then use a try/except around the import at
1680 file scope, and then report here if the import failed.
1681
1682 This will be called in a blocking way from the C++ code, so do not
1683 do any I/O that could block in this function.
1684
1685 :return a 2-tuple consisting of a boolean and explanatory string
1686 """
1687
1688 return True, ""
1689
1690 def remote(self, module_name: str, method_name: str, *args: Any, **kwargs: Any) -> Any:
1691 """
1692 Invoke a method on another module. All arguments, and the return
1693 value from the other module must be serializable.
1694
1695 Limitation: Do not import any modules within the called method.
1696 Otherwise you will get an error in Python 2::
1697
1698 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
1699
1700
1701
1702 :param module_name: Name of other module. If module isn't loaded,
1703 an ImportError exception is raised.
1704 :param method_name: Method name. If it does not exist, a NameError
1705 exception is raised.
1706 :param args: Argument tuple
1707 :param kwargs: Keyword argument dict
1708 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
1709 :raises ImportError: No such module
1710 """
1711 return self._ceph_dispatch_remote(module_name, method_name,
1712 args, kwargs)
1713
1714 def add_osd_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
1715 """
1716 Register an OSD perf query. Argument is a
1717 dict of the query parameters, in this form:
1718
1719 ::
1720
1721 {
1722 'key_descriptor': [
1723 {'type': subkey_type, 'regex': regex_pattern},
1724 ...
1725 ],
1726 'performance_counter_descriptors': [
1727 list, of, descriptor, types
1728 ],
1729 'limit': {'order_by': performance_counter_type, 'max_count': n},
1730 }
1731
1732 Valid subkey types:
1733 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
1734 'pg_id', 'object_name', 'snap_id'
1735 Valid performance counter types:
1736 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
1737 'latency', 'write_latency', 'read_latency'
1738
1739 :param object query: query
1740 :rtype: int (query id)
1741 """
1742 return self._ceph_add_osd_perf_query(query)
1743
1744 def remove_osd_perf_query(self, query_id: int) -> None:
1745 """
1746 Unregister an OSD perf query.
1747
1748 :param int query_id: query ID
1749 """
1750 return self._ceph_remove_osd_perf_query(query_id)
1751
1752 def get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
1753 """
1754 Get stats collected for an OSD perf query.
1755
1756 :param int query_id: query ID
1757 """
1758 return self._ceph_get_osd_perf_counters(query_id)
1759
1760 def add_mds_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
1761 """
1762 Register an MDS perf query. Argument is a
1763 dict of the query parameters, in this form:
1764
1765 ::
1766
1767 {
1768 'key_descriptor': [
1769 {'type': subkey_type, 'regex': regex_pattern},
1770 ...
1771 ],
1772 'performance_counter_descriptors': [
1773 list, of, descriptor, types
1774 ],
1775 }
1776
1777 NOTE: 'limit' and 'order_by' are not supported (yet).
1778
1779 Valid subkey types:
1780 'mds_rank', 'client_id'
1781 Valid performance counter types:
1782 'cap_hit_metric'
1783
1784 :param object query: query
1785 :rtype: int (query id)
1786 """
1787 return self._ceph_add_mds_perf_query(query)
1788
1789 def remove_mds_perf_query(self, query_id: int) -> None:
1790 """
1791 Unregister an MDS perf query.
1792
1793 :param int query_id: query ID
1794 """
1795 return self._ceph_remove_mds_perf_query(query_id)
1796
1797 def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
1798 """
1799 Get stats collected for an MDS perf query.
1800
1801 :param int query_id: query ID
1802 """
1803 return self._ceph_get_mds_perf_counters(query_id)
1804
1805 def is_authorized(self, arguments: Dict[str, str]) -> bool:
1806 """
1807 Verifies that the current session caps permit executing the py service
1808 or current module with the provided arguments. This provides a generic
1809 way to allow modules to restrict by more fine-grained controls (e.g.
1810 pools).
1811
1812 :param arguments: dict of key/value arguments to test
1813 """
1814 return self._ceph_is_authorized(arguments)