]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
buildsys: switch source download to quincy
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
1 import ceph_module # noqa
2
3 from typing import cast, Tuple, Any, Dict, Generic, Optional, Callable, List, \
4 Mapping, NamedTuple, Sequence, Union, TYPE_CHECKING
5 if TYPE_CHECKING:
6 import sys
7 if sys.version_info >= (3, 8):
8 from typing import Literal
9 else:
10 from typing_extensions import Literal
11
12 import inspect
13 import logging
14 import errno
15 import functools
16 import json
17 import subprocess
18 import threading
19 from collections import defaultdict
20 from enum import IntEnum
21 import rados
22 import re
23 import socket
24 import sys
25 import time
26 from ceph_argparse import CephArgtype
27 from mgr_util import profile_method
28
29 if sys.version_info >= (3, 8):
30 from typing import get_args, get_origin
31 else:
32 def get_args(tp):
33 if tp is Generic:
34 return tp
35 else:
36 return getattr(tp, '__args__', ())
37
38 def get_origin(tp):
39 return getattr(tp, '__origin__', None)
40
41
42 ERROR_MSG_EMPTY_INPUT_FILE = 'Empty input file'
43 ERROR_MSG_NO_INPUT_FILE = 'Input file not specified'
44 # Full list of strings in "osd_types.cc:pg_state_string()"
45 PG_STATES = [
46 "active",
47 "clean",
48 "down",
49 "recovery_unfound",
50 "backfill_unfound",
51 "scrubbing",
52 "degraded",
53 "inconsistent",
54 "peering",
55 "repair",
56 "recovering",
57 "forced_recovery",
58 "backfill_wait",
59 "incomplete",
60 "stale",
61 "remapped",
62 "deep",
63 "backfilling",
64 "forced_backfill",
65 "backfill_toofull",
66 "recovery_wait",
67 "recovery_toofull",
68 "undersized",
69 "activating",
70 "peered",
71 "snaptrim",
72 "snaptrim_wait",
73 "snaptrim_error",
74 "creating",
75 "unknown",
76 "premerge",
77 "failed_repair",
78 "laggy",
79 "wait",
80 ]
81
82 NFS_GANESHA_SUPPORTED_FSALS = ['CEPH', 'RGW']
83 NFS_POOL_NAME = '.nfs'
84
85
86 class CommandResult(object):
87 """
88 Use with MgrModule.send_command
89 """
90
91 def __init__(self, tag: Optional[str] = None):
92 self.ev = threading.Event()
93 self.outs = ""
94 self.outb = ""
95 self.r = 0
96
97 # This is just a convenience for notifications from
98 # C++ land, to avoid passing addresses around in messages.
99 self.tag = tag if tag else ""
100
101 def complete(self, r: int, outb: str, outs: str) -> None:
102 self.r = r
103 self.outb = outb
104 self.outs = outs
105 self.ev.set()
106
107 def wait(self) -> Tuple[int, str, str]:
108 self.ev.wait()
109 return self.r, self.outb, self.outs
110
111
112 class HandleCommandResult(NamedTuple):
113 """
114 Tuple containing the result of `handle_command()`
115
116 Only write to stderr if there is an error, or in extraordinary circumstances
117
118 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
119 is critical information to include there.
120
121 Everything programmatically consumable should be put on stdout
122 """
123 retval: int = 0 # return code. E.g. 0 or -errno.EINVAL
124 stdout: str = "" # data of this result.
125 stderr: str = "" # Typically used for error messages.
126
127
128 class MonCommandFailed(RuntimeError): pass
129
130
131 class OSDMap(ceph_module.BasePyOSDMap):
132 def get_epoch(self) -> int:
133 return self._get_epoch()
134
135 def get_crush_version(self) -> int:
136 return self._get_crush_version()
137
138 def dump(self) -> Dict[str, Any]:
139 return self._dump()
140
141 def get_pools(self) -> Dict[int, Dict[str, Any]]:
142 # FIXME: efficient implementation
143 d = self._dump()
144 return dict([(p['pool'], p) for p in d['pools']])
145
146 def get_pools_by_name(self) -> Dict[str, Dict[str, Any]]:
147 # FIXME: efficient implementation
148 d = self._dump()
149 return dict([(p['pool_name'], p) for p in d['pools']])
150
151 def new_incremental(self) -> 'OSDMapIncremental':
152 return self._new_incremental()
153
154 def apply_incremental(self, inc: 'OSDMapIncremental') -> 'OSDMap':
155 return self._apply_incremental(inc)
156
157 def get_crush(self) -> 'CRUSHMap':
158 return self._get_crush()
159
160 def get_pools_by_take(self, take: int) -> List[int]:
161 return self._get_pools_by_take(take).get('pools', [])
162
163 def calc_pg_upmaps(self, inc: 'OSDMapIncremental',
164 max_deviation: int,
165 max_iterations: int = 10,
166 pools: Optional[List[str]] = None) -> int:
167 if pools is None:
168 pools = []
169 return self._calc_pg_upmaps(
170 inc,
171 max_deviation, max_iterations, pools)
172
173 def map_pool_pgs_up(self, poolid: int) -> List[int]:
174 return self._map_pool_pgs_up(poolid)
175
176 def pg_to_up_acting_osds(self, pool_id: int, ps: int) -> Dict[str, Any]:
177 return self._pg_to_up_acting_osds(pool_id, ps)
178
179 def pool_raw_used_rate(self, pool_id: int) -> float:
180 return self._pool_raw_used_rate(pool_id)
181
182 def get_ec_profile(self, name: str) -> Optional[List[Dict[str, str]]]:
183 # FIXME: efficient implementation
184 d = self._dump()
185 return d['erasure_code_profiles'].get(name, None)
186
187 def get_require_osd_release(self) -> str:
188 d = self._dump()
189 return d['require_osd_release']
190
191
192 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
193 def get_epoch(self) -> int:
194 return self._get_epoch()
195
196 def dump(self) -> Dict[str, Any]:
197 return self._dump()
198
199 def set_osd_reweights(self, weightmap: Dict[int, float]) -> None:
200 """
201 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
202 """
203 return self._set_osd_reweights(weightmap)
204
205 def set_crush_compat_weight_set_weights(self, weightmap: Dict[str, float]) -> None:
206 """
207 weightmap is a dict, int to float. devices only. e.g.,
208 { 0: 3.4, 1: 3.3, 2: 3.334 }
209 """
210 return self._set_crush_compat_weight_set_weights(weightmap)
211
212
213 class CRUSHMap(ceph_module.BasePyCRUSH):
214 ITEM_NONE = 0x7fffffff
215 DEFAULT_CHOOSE_ARGS = '-1'
216
217 def dump(self) -> Dict[str, Any]:
218 return self._dump()
219
220 def get_item_weight(self, item: int) -> Optional[int]:
221 return self._get_item_weight(item)
222
223 def get_item_name(self, item: int) -> Optional[str]:
224 return self._get_item_name(item)
225
226 def find_takes(self) -> List[int]:
227 return self._find_takes().get('takes', [])
228
229 def find_roots(self) -> List[int]:
230 return self._find_roots().get('roots', [])
231
232 def get_take_weight_osd_map(self, root: int) -> Dict[int, float]:
233 uglymap = self._get_take_weight_osd_map(root)
234 return {int(k): v for k, v in uglymap.get('weights', {}).items()}
235
236 @staticmethod
237 def have_default_choose_args(dump: Dict[str, Any]) -> bool:
238 return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
239
240 @staticmethod
241 def get_default_choose_args(dump: Dict[str, Any]) -> List[Dict[str, Any]]:
242 choose_args = dump.get('choose_args')
243 assert isinstance(choose_args, dict)
244 return choose_args.get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
245
246 def get_rule(self, rule_name: str) -> Optional[Dict[str, Any]]:
247 # TODO efficient implementation
248 for rule in self.dump()['rules']:
249 if rule_name == rule['rule_name']:
250 return rule
251
252 return None
253
254 def get_rule_by_id(self, rule_id: int) -> Optional[Dict[str, Any]]:
255 for rule in self.dump()['rules']:
256 if rule['rule_id'] == rule_id:
257 return rule
258
259 return None
260
261 def get_rule_root(self, rule_name: str) -> Optional[int]:
262 rule = self.get_rule(rule_name)
263 if rule is None:
264 return None
265
266 try:
267 first_take = next(s for s in rule['steps'] if s.get('op') == 'take')
268 except StopIteration:
269 logging.warning("CRUSH rule '{0}' has no 'take' step".format(
270 rule_name))
271 return None
272 else:
273 return first_take['item']
274
275 def get_osds_under(self, root_id: int) -> List[int]:
276 # TODO don't abuse dump like this
277 d = self.dump()
278 buckets = dict([(b['id'], b) for b in d['buckets']])
279
280 osd_list = []
281
282 def accumulate(b: Dict[str, Any]) -> None:
283 for item in b['items']:
284 if item['id'] >= 0:
285 osd_list.append(item['id'])
286 else:
287 try:
288 accumulate(buckets[item['id']])
289 except KeyError:
290 pass
291
292 accumulate(buckets[root_id])
293
294 return osd_list
295
296 def device_class_counts(self) -> Dict[str, int]:
297 result = defaultdict(int) # type: Dict[str, int]
298 # TODO don't abuse dump like this
299 d = self.dump()
300 for device in d['devices']:
301 cls = device.get('class', None)
302 result[cls] += 1
303
304 return dict(result)
305
306
307 HandlerFuncType = Callable[..., Tuple[int, str, str]]
308
309
310 class CLICommand(object):
311 COMMANDS = {} # type: Dict[str, CLICommand]
312
313 def __init__(self,
314 prefix: str,
315 perm: str = 'rw',
316 poll: bool = False):
317 self.prefix = prefix
318 self.perm = perm
319 self.poll = poll
320 self.func = None # type: Optional[Callable]
321 self.arg_spec = {} # type: Dict[str, Any]
322 self.first_default = -1
323
324 KNOWN_ARGS = '_', 'self', 'mgr', 'inbuf', 'return'
325
326 @staticmethod
327 def load_func_metadata(f: HandlerFuncType) -> Tuple[str, Dict[str, Any], int, str]:
328 desc = (inspect.getdoc(f) or '').replace('\n', ' ')
329 full_argspec = inspect.getfullargspec(f)
330 arg_spec = full_argspec.annotations
331 first_default = len(arg_spec)
332 if full_argspec.defaults:
333 first_default -= len(full_argspec.defaults)
334 args = []
335 for index, arg in enumerate(full_argspec.args):
336 if arg in CLICommand.KNOWN_ARGS:
337 continue
338 assert arg in arg_spec, \
339 f"'{arg}' is not annotated for {f}: {full_argspec}"
340 has_default = index >= first_default
341 args.append(CephArgtype.to_argdesc(arg_spec[arg],
342 dict(name=arg),
343 has_default))
344 return desc, arg_spec, first_default, ' '.join(args)
345
346 def store_func_metadata(self, f: HandlerFuncType) -> None:
347 self.desc, self.arg_spec, self.first_default, self.args = \
348 self.load_func_metadata(f)
349
350 def __call__(self, func: HandlerFuncType) -> HandlerFuncType:
351 self.store_func_metadata(func)
352 self.func = func
353 self.COMMANDS[self.prefix] = self
354 return self.func
355
356 def _get_arg_value(self, kwargs_switch: bool, key: str, val: Any) -> Tuple[bool, str, Any]:
357 def start_kwargs() -> bool:
358 if isinstance(val, str) and '=' in val:
359 k, v = val.split('=', 1)
360 if k in self.arg_spec:
361 return True
362 return False
363
364 if not kwargs_switch:
365 kwargs_switch = start_kwargs()
366
367 if kwargs_switch:
368 k, v = val.split('=', 1)
369 else:
370 k, v = key, val
371 return kwargs_switch, k.replace('-', '_'), v
372
373 def _collect_args_by_argspec(self, cmd_dict: Dict[str, Any]) -> Dict[str, Any]:
374 kwargs = {}
375 kwargs_switch = False
376 for index, (name, tp) in enumerate(self.arg_spec.items()):
377 if name in CLICommand.KNOWN_ARGS:
378 continue
379 assert self.first_default >= 0
380 raw_v = cmd_dict.get(name)
381 if index >= self.first_default:
382 if raw_v is None:
383 continue
384 kwargs_switch, k, v = self._get_arg_value(kwargs_switch,
385 name, raw_v)
386 kwargs[k] = CephArgtype.cast_to(tp, v)
387 return kwargs
388
389 def call(self,
390 mgr: Any,
391 cmd_dict: Dict[str, Any],
392 inbuf: Optional[str] = None) -> HandleCommandResult:
393 kwargs = self._collect_args_by_argspec(cmd_dict)
394 if inbuf:
395 kwargs['inbuf'] = inbuf
396 assert self.func
397 return self.func(mgr, **kwargs)
398
399 def dump_cmd(self) -> Dict[str, Union[str, bool]]:
400 return {
401 'cmd': '{} {}'.format(self.prefix, self.args),
402 'desc': self.desc,
403 'perm': self.perm,
404 'poll': self.poll,
405 }
406
407 @classmethod
408 def dump_cmd_list(cls) -> List[Dict[str, Union[str, bool]]]:
409 return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
410
411
412 def CLIReadCommand(prefix: str, poll: bool = False) -> CLICommand:
413 return CLICommand(prefix, "r", poll)
414
415
416 def CLIWriteCommand(prefix: str, poll: bool = False) -> CLICommand:
417 return CLICommand(prefix, "w", poll)
418
419
420 def CLICheckNonemptyFileInput(desc: str) -> Callable[[HandlerFuncType], HandlerFuncType]:
421 def CheckFileInput(func: HandlerFuncType) -> HandlerFuncType:
422 @functools.wraps(func)
423 def check(*args: Any, **kwargs: Any) -> Tuple[int, str, str]:
424 if 'inbuf' not in kwargs:
425 return -errno.EINVAL, '', f'{ERROR_MSG_NO_INPUT_FILE}: Please specify the file '\
426 f'containing {desc} with "-i" option'
427 if isinstance(kwargs['inbuf'], str):
428 # Delete new line separator at EOF (it may have been added by a text editor).
429 kwargs['inbuf'] = kwargs['inbuf'].rstrip('\r\n').rstrip('\n')
430 if not kwargs['inbuf'] or not kwargs['inbuf'].strip():
431 return -errno.EINVAL, '', f'{ERROR_MSG_EMPTY_INPUT_FILE}: Please add {desc} to '\
432 'the file'
433 return func(*args, **kwargs)
434 check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
435 return check
436 return CheckFileInput
437
438
439 def _get_localized_key(prefix: str, key: str) -> str:
440 return '{}/{}'.format(prefix, key)
441
442
443 """
444 MODULE_OPTIONS types and Option Class
445 """
446 if TYPE_CHECKING:
447 OptionTypeLabel = Literal[
448 'uint', 'int', 'str', 'float', 'bool', 'addr', 'addrvec', 'uuid', 'size', 'secs']
449
450
451 # common/options.h: value_t
452 OptionValue = Optional[Union[bool, int, float, str]]
453
454
455 class Option(Dict):
456 """
457 Helper class to declare options for MODULE_OPTIONS list.
458 TODO: Replace with typing.TypedDict when in python_version >= 3.8
459 """
460
461 def __init__(
462 self,
463 name: str,
464 default: OptionValue = None,
465 type: 'OptionTypeLabel' = 'str',
466 desc: Optional[str] = None,
467 long_desc: Optional[str] = None,
468 min: OptionValue = None,
469 max: OptionValue = None,
470 enum_allowed: Optional[List[str]] = None,
471 tags: Optional[List[str]] = None,
472 see_also: Optional[List[str]] = None,
473 runtime: bool = False,
474 ):
475 super(Option, self).__init__(
476 (k, v) for k, v in vars().items()
477 if k != 'self' and v is not None)
478
479
480 class Command(dict):
481 """
482 Helper class to declare options for COMMANDS list.
483
484 It also allows to specify prefix and args separately, as well as storing a
485 handler callable.
486
487 Usage:
488 >>> Command(prefix="example",
489 ... args="name=arg,type=CephInt",
490 ... perm='w',
491 ... desc="Blah")
492 {'poll': False, 'cmd': 'example name=arg,type=CephInt', 'perm': 'w', 'desc': 'Blah'}
493 """
494
495 def __init__(
496 self,
497 prefix: str,
498 handler: HandlerFuncType,
499 perm: str = "rw",
500 poll: bool = False,
501 ):
502 super().__init__(perm=perm,
503 poll=poll)
504 self.prefix = prefix
505 self.handler = handler
506
507 @staticmethod
508 def returns_command_result(instance: Any,
509 f: HandlerFuncType) -> Callable[..., HandleCommandResult]:
510 @functools.wraps(f)
511 def wrapper(mgr: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
512 retval, stdout, stderr = f(instance or mgr, *args, **kwargs)
513 return HandleCommandResult(retval, stdout, stderr)
514 wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined]
515 return wrapper
516
517 def register(self, instance: bool = False) -> HandlerFuncType:
518 """
519 Register a CLICommand handler. It allows an instance to register bound
520 methods. In that case, the mgr instance is not passed, and it's expected
521 to be available in the class instance.
522 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
523 items.
524 """
525 cmd = CLICommand(prefix=self.prefix, perm=self['perm'])
526 return cmd(self.returns_command_result(instance, self.handler))
527
528
529 class CPlusPlusHandler(logging.Handler):
530 def __init__(self, module_inst: Any):
531 super(CPlusPlusHandler, self).__init__()
532 self._module = module_inst
533 self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
534 .format(module_inst.module_name)))
535
536 def emit(self, record: logging.LogRecord) -> None:
537 if record.levelno >= self.level:
538 self._module._ceph_log(self.format(record))
539
540
541 class ClusterLogHandler(logging.Handler):
542 def __init__(self, module_inst: Any):
543 super().__init__()
544 self._module = module_inst
545 self.setFormatter(logging.Formatter("%(message)s"))
546
547 def emit(self, record: logging.LogRecord) -> None:
548 levelmap = {
549 logging.DEBUG: MgrModule.ClusterLogPrio.DEBUG,
550 logging.INFO: MgrModule.ClusterLogPrio.INFO,
551 logging.WARNING: MgrModule.ClusterLogPrio.WARN,
552 logging.ERROR: MgrModule.ClusterLogPrio.ERROR,
553 logging.CRITICAL: MgrModule.ClusterLogPrio.ERROR,
554 }
555 level = levelmap[record.levelno]
556 if record.levelno >= self.level:
557 self._module.cluster_log(self._module.module_name,
558 level,
559 self.format(record))
560
561
562 class FileHandler(logging.FileHandler):
563 def __init__(self, module_inst: Any):
564 path = module_inst.get_ceph_option("log_file")
565 idx = path.rfind(".log")
566 if idx != -1:
567 self.path = "{}.{}.log".format(path[:idx], module_inst.module_name)
568 else:
569 self.path = "{}.{}".format(path, module_inst.module_name)
570 super(FileHandler, self).__init__(self.path, delay=True)
571 self.setFormatter(logging.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
572
573
574 class MgrModuleLoggingMixin(object):
575 def _configure_logging(self,
576 mgr_level: str,
577 module_level: str,
578 cluster_level: str,
579 log_to_file: bool,
580 log_to_cluster: bool) -> None:
581 self._mgr_level: Optional[str] = None
582 self._module_level: Optional[str] = None
583 self._root_logger = logging.getLogger()
584
585 self._unconfigure_logging()
586
587 # the ceph log handler is initialized only once
588 self._mgr_log_handler = CPlusPlusHandler(self)
589 self._cluster_log_handler = ClusterLogHandler(self)
590 self._file_log_handler = FileHandler(self)
591
592 self.log_to_file = log_to_file
593 self.log_to_cluster = log_to_cluster
594
595 self._root_logger.addHandler(self._mgr_log_handler)
596 if log_to_file:
597 self._root_logger.addHandler(self._file_log_handler)
598 if log_to_cluster:
599 self._root_logger.addHandler(self._cluster_log_handler)
600
601 self._root_logger.setLevel(logging.NOTSET)
602 self._set_log_level(mgr_level, module_level, cluster_level)
603
604 def _unconfigure_logging(self) -> None:
605 # remove existing handlers:
606 rm_handlers = [
607 h for h in self._root_logger.handlers
608 if (isinstance(h, CPlusPlusHandler) or
609 isinstance(h, FileHandler) or
610 isinstance(h, ClusterLogHandler))]
611 for h in rm_handlers:
612 self._root_logger.removeHandler(h)
613 self.log_to_file = False
614 self.log_to_cluster = False
615
616 def _set_log_level(self,
617 mgr_level: str,
618 module_level: str,
619 cluster_level: str) -> None:
620 self._cluster_log_handler.setLevel(cluster_level.upper())
621
622 module_level = module_level.upper() if module_level else ''
623 if not self._module_level:
624 # using debug_mgr level
625 if not module_level and self._mgr_level == mgr_level:
626 # no change in module level neither in debug_mgr
627 return
628 else:
629 if self._module_level == module_level:
630 # no change in module level
631 return
632
633 if not self._module_level and not module_level:
634 level = self._ceph_log_level_to_python(mgr_level)
635 self.getLogger().debug("setting log level based on debug_mgr: %s (%s)",
636 level, mgr_level)
637 elif self._module_level and not module_level:
638 level = self._ceph_log_level_to_python(mgr_level)
639 self.getLogger().warning("unsetting module log level, falling back to "
640 "debug_mgr level: %s (%s)", level, mgr_level)
641 elif module_level:
642 level = module_level
643 self.getLogger().debug("setting log level: %s", level)
644
645 self._module_level = module_level
646 self._mgr_level = mgr_level
647
648 self._mgr_log_handler.setLevel(level)
649 self._file_log_handler.setLevel(level)
650
651 def _enable_file_log(self) -> None:
652 # enable file log
653 self.getLogger().warning("enabling logging to file")
654 self.log_to_file = True
655 self._root_logger.addHandler(self._file_log_handler)
656
657 def _disable_file_log(self) -> None:
658 # disable file log
659 self.getLogger().warning("disabling logging to file")
660 self.log_to_file = False
661 self._root_logger.removeHandler(self._file_log_handler)
662
663 def _enable_cluster_log(self) -> None:
664 # enable cluster log
665 self.getLogger().warning("enabling logging to cluster")
666 self.log_to_cluster = True
667 self._root_logger.addHandler(self._cluster_log_handler)
668
669 def _disable_cluster_log(self) -> None:
670 # disable cluster log
671 self.getLogger().warning("disabling logging to cluster")
672 self.log_to_cluster = False
673 self._root_logger.removeHandler(self._cluster_log_handler)
674
675 def _ceph_log_level_to_python(self, log_level: str) -> str:
676 if log_level:
677 try:
678 ceph_log_level = int(log_level.split("/", 1)[0])
679 except ValueError:
680 ceph_log_level = 0
681 else:
682 ceph_log_level = 0
683
684 log_level = "DEBUG"
685 if ceph_log_level <= 0:
686 log_level = "CRITICAL"
687 elif ceph_log_level <= 1:
688 log_level = "WARNING"
689 elif ceph_log_level <= 4:
690 log_level = "INFO"
691 return log_level
692
693 def getLogger(self, name: Optional[str] = None) -> logging.Logger:
694 return logging.getLogger(name)
695
696
697 class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
698 """
699 Standby modules only implement a serve and shutdown method, they
700 are not permitted to implement commands and they do not receive
701 any notifications.
702
703 They only have access to the mgrmap (for accessing service URI info
704 from their active peer), and to configuration settings (read only).
705 """
706
707 MODULE_OPTIONS: List[Option] = []
708 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
709
710 def __init__(self, module_name: str, capsule: Any):
711 super(MgrStandbyModule, self).__init__(capsule)
712 self.module_name = module_name
713
714 # see also MgrModule.__init__()
715 for o in self.MODULE_OPTIONS:
716 if 'default' in o:
717 if 'type' in o:
718 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
719 else:
720 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
721
722 # mock does not return a str
723 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
724 log_level = cast(str, self.get_module_option("log_level"))
725 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
726 self._configure_logging(mgr_level, log_level, cluster_level,
727 False, False)
728
729 # for backwards compatibility
730 self._logger = self.getLogger()
731
732 def __del__(self) -> None:
733 self._cleanup()
734 self._unconfigure_logging()
735
736 def _cleanup(self) -> None:
737 pass
738
739 @classmethod
740 def _register_options(cls, module_name: str) -> None:
741 cls.MODULE_OPTIONS.append(
742 Option(name='log_level', type='str', default="", runtime=True,
743 enum_allowed=['info', 'debug', 'critical', 'error',
744 'warning', '']))
745 cls.MODULE_OPTIONS.append(
746 Option(name='log_to_file', type='bool', default=False, runtime=True))
747 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
748 cls.MODULE_OPTIONS.append(
749 Option(name='log_to_cluster', type='bool', default=False,
750 runtime=True))
751 cls.MODULE_OPTIONS.append(
752 Option(name='log_to_cluster_level', type='str', default='info',
753 runtime=True,
754 enum_allowed=['info', 'debug', 'critical', 'error',
755 'warning', '']))
756
757 @property
758 def log(self) -> logging.Logger:
759 return self._logger
760
761 def serve(self) -> None:
762 """
763 The serve method is mandatory for standby modules.
764 :return:
765 """
766 raise NotImplementedError()
767
768 def get_mgr_id(self) -> str:
769 return self._ceph_get_mgr_id()
770
771 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
772 """
773 Retrieve the value of a persistent configuration setting
774
775 :param default: the default value of the config if it is not found
776 """
777 r = self._ceph_get_module_option(key)
778 if r is None:
779 return self.MODULE_OPTION_DEFAULTS.get(key, default)
780 else:
781 return r
782
783 def get_ceph_option(self, key: str) -> OptionValue:
784 return self._ceph_get_option(key)
785
786 def get_store(self, key: str) -> Optional[str]:
787 """
788 Retrieve the value of a persistent KV store entry
789
790 :param key: String
791 :return: Byte string or None
792 """
793 return self._ceph_get_store(key)
794
795 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
796 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
797 if r is None:
798 r = self._ceph_get_store(key)
799 if r is None:
800 r = default
801 return r
802
803 def get_active_uri(self) -> str:
804 return self._ceph_get_active_uri()
805
806 def get(self, data_name: str):
807 return self._ceph_get(data_name)
808
809 def get_mgr_ip(self) -> str:
810 ips = self.get("mgr_ips").get('ips', [])
811 if not ips:
812 return socket.gethostname()
813 return ips[0]
814
815 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
816 r = self._ceph_get_module_option(key, self.get_mgr_id())
817 if r is None:
818 return self.MODULE_OPTION_DEFAULTS.get(key, default)
819 else:
820 return r
821
822
823 HealthChecksT = Mapping[str, Mapping[str, Union[int, str, Sequence[str]]]]
824 # {"type": service_type, "id": service_id}
825 ServiceInfoT = Dict[str, str]
826 # {"hostname": hostname,
827 # "ceph_version": version,
828 # "services": [service_info, ..]}
829 ServerInfoT = Dict[str, Union[str, List[ServiceInfoT]]]
830 PerfCounterT = Dict[str, Any]
831
832
833 class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
834 COMMANDS = [] # type: List[Any]
835 MODULE_OPTIONS: List[Option] = []
836 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
837
838 # Priority definitions for perf counters
839 PRIO_CRITICAL = 10
840 PRIO_INTERESTING = 8
841 PRIO_USEFUL = 5
842 PRIO_UNINTERESTING = 2
843 PRIO_DEBUGONLY = 0
844
845 # counter value types
846 PERFCOUNTER_TIME = 1
847 PERFCOUNTER_U64 = 2
848
849 # counter types
850 PERFCOUNTER_LONGRUNAVG = 4
851 PERFCOUNTER_COUNTER = 8
852 PERFCOUNTER_HISTOGRAM = 0x10
853 PERFCOUNTER_TYPE_MASK = ~3
854
855 # units supported
856 BYTES = 0
857 NONE = 1
858
859 # Cluster log priorities
860 class ClusterLogPrio(IntEnum):
861 DEBUG = 0
862 INFO = 1
863 SEC = 2
864 WARN = 3
865 ERROR = 4
866
867 def __init__(self, module_name: str, py_modules_ptr: object, this_ptr: object):
868 self.module_name = module_name
869 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
870
871 for o in self.MODULE_OPTIONS:
872 if 'default' in o:
873 if 'type' in o:
874 # we'll assume the declared type matches the
875 # supplied default value's type.
876 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
877 else:
878 # module not declaring it's type, so normalize the
879 # default value to be a string for consistent behavior
880 # with default and user-supplied option values.
881 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
882
883 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
884 log_level = cast(str, self.get_module_option("log_level"))
885 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
886 log_to_file = self.get_module_option("log_to_file")
887 assert isinstance(log_to_file, bool)
888 log_to_cluster = self.get_module_option("log_to_cluster")
889 assert isinstance(log_to_cluster, bool)
890 self._configure_logging(mgr_level, log_level, cluster_level,
891 log_to_file, log_to_cluster)
892
893 # for backwards compatibility
894 self._logger = self.getLogger()
895
896 self._version = self._ceph_get_version()
897
898 self._perf_schema_cache = None
899
900 # Keep a librados instance for those that need it.
901 self._rados: Optional[rados.Rados] = None
902
903 def __del__(self) -> None:
904 self._unconfigure_logging()
905
906 @classmethod
907 def _register_options(cls, module_name: str) -> None:
908 cls.MODULE_OPTIONS.append(
909 Option(name='log_level', type='str', default="", runtime=True,
910 enum_allowed=['info', 'debug', 'critical', 'error',
911 'warning', '']))
912 cls.MODULE_OPTIONS.append(
913 Option(name='log_to_file', type='bool', default=False, runtime=True))
914 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
915 cls.MODULE_OPTIONS.append(
916 Option(name='log_to_cluster', type='bool', default=False,
917 runtime=True))
918 cls.MODULE_OPTIONS.append(
919 Option(name='log_to_cluster_level', type='str', default='info',
920 runtime=True,
921 enum_allowed=['info', 'debug', 'critical', 'error',
922 'warning', '']))
923
924 @classmethod
925 def _register_commands(cls, module_name: str) -> None:
926 cls.COMMANDS.extend(CLICommand.dump_cmd_list())
927
928 @property
929 def log(self) -> logging.Logger:
930 return self._logger
931
932 def cluster_log(self, channel: str, priority: ClusterLogPrio, message: str) -> None:
933 """
934 :param channel: The log channel. This can be 'cluster', 'audit', ...
935 :param priority: The log message priority.
936 :param message: The message to log.
937 """
938 self._ceph_cluster_log(channel, priority.value, message)
939
940 @property
941 def version(self) -> str:
942 return self._version
943
944 @property
945 def release_name(self) -> str:
946 """
947 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
948 :return: Returns the release name of the Ceph version in lower case.
949 :rtype: str
950 """
951 return self._ceph_get_release_name()
952
953 def lookup_release_name(self, major: int) -> str:
954 return self._ceph_lookup_release_name(major)
955
956 def get_context(self) -> object:
957 """
958 :return: a Python capsule containing a C++ CephContext pointer
959 """
960 return self._ceph_get_context()
961
962 def notify(self, notify_type: str, notify_id: str) -> None:
963 """
964 Called by the ceph-mgr service to notify the Python plugin
965 that new state is available.
966
967 :param notify_type: string indicating what kind of notification,
968 such as osd_map, mon_map, fs_map, mon_status,
969 health, pg_summary, command, service_map
970 :param notify_id: string (may be empty) that optionally specifies
971 which entity is being notified about. With
972 "command" notifications this is set to the tag
973 ``from send_command``.
974 """
975 pass
976
977 def _config_notify(self) -> None:
978 # check logging options for changes
979 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
980 module_level = cast(str, self.get_module_option("log_level"))
981 cluster_level = cast(str, self.get_module_option("log_to_cluster_level"))
982 assert isinstance(cluster_level, str)
983 log_to_file = self.get_module_option("log_to_file", False)
984 assert isinstance(log_to_file, bool)
985 log_to_cluster = self.get_module_option("log_to_cluster", False)
986 assert isinstance(log_to_cluster, bool)
987 self._set_log_level(mgr_level, module_level, cluster_level)
988
989 if log_to_file != self.log_to_file:
990 if log_to_file:
991 self._enable_file_log()
992 else:
993 self._disable_file_log()
994 if log_to_cluster != self.log_to_cluster:
995 if log_to_cluster:
996 self._enable_cluster_log()
997 else:
998 self._disable_cluster_log()
999
1000 # call module subclass implementations
1001 self.config_notify()
1002
1003 def config_notify(self) -> None:
1004 """
1005 Called by the ceph-mgr service to notify the Python plugin
1006 that the configuration may have changed. Modules will want to
1007 refresh any configuration values stored in config variables.
1008 """
1009 pass
1010
1011 def serve(self) -> None:
1012 """
1013 Called by the ceph-mgr service to start any server that
1014 is provided by this Python plugin. The implementation
1015 of this function should block until ``shutdown`` is called.
1016
1017 You *must* implement ``shutdown`` if you implement ``serve``
1018 """
1019 pass
1020
1021 def shutdown(self) -> None:
1022 """
1023 Called by the ceph-mgr service to request that this
1024 module drop out of its serve() function. You do not
1025 need to implement this if you do not implement serve()
1026
1027 :return: None
1028 """
1029 if self._rados:
1030 addrs = self._rados.get_addrs()
1031 self._rados.shutdown()
1032 self._ceph_unregister_client(addrs)
1033
1034 def get(self, data_name: str):
1035 """
1036 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
1037
1038 :param str data_name: Valid things to fetch are osd_crush_map_text,
1039 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
1040 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
1041 health, mon_status, devices, device <devid>, pg_stats,
1042 pool_stats, pg_ready, osd_ping_times.
1043
1044 Note:
1045 All these structures have their own JSON representations: experiment
1046 or look at the C++ ``dump()`` methods to learn about them.
1047 """
1048 return self._ceph_get(data_name)
1049
1050 def _stattype_to_str(self, stattype: int) -> str:
1051
1052 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
1053 if typeonly == 0:
1054 return 'gauge'
1055 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
1056 # this lie matches the DaemonState decoding: only val, no counts
1057 return 'counter'
1058 if typeonly == self.PERFCOUNTER_COUNTER:
1059 return 'counter'
1060 if typeonly == self.PERFCOUNTER_HISTOGRAM:
1061 return 'histogram'
1062
1063 return ''
1064
1065 def _perfpath_to_path_labels(self, daemon: str,
1066 path: str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]:
1067 label_names = ("ceph_daemon",) # type: Tuple[str, ...]
1068 labels = (daemon,) # type: Tuple[str, ...]
1069
1070 if daemon.startswith('rbd-mirror.'):
1071 match = re.match(
1072 r'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
1073 path
1074 )
1075 if match:
1076 path = 'rbd_mirror_image_' + match.group(4)
1077 pool = match.group(1)
1078 namespace = match.group(2) or ''
1079 image = match.group(3)
1080 label_names += ('pool', 'namespace', 'image')
1081 labels += (pool, namespace, image)
1082
1083 return path, label_names, labels,
1084
1085 def _perfvalue_to_value(self, stattype: int, value: Union[int, float]) -> Union[float, int]:
1086 if stattype & self.PERFCOUNTER_TIME:
1087 # Convert from ns to seconds
1088 return value / 1000000000.0
1089 else:
1090 return value
1091
1092 def _unit_to_str(self, unit: int) -> str:
1093 if unit == self.NONE:
1094 return "/s"
1095 elif unit == self.BYTES:
1096 return "B/s"
1097 else:
1098 raise ValueError(f'bad unit "{unit}"')
1099
1100 @staticmethod
1101 def to_pretty_iec(n: int) -> str:
1102 for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
1103 (20, 'Mi'), (10, 'Ki')]:
1104 if n > 10 << bits:
1105 return str(n >> bits) + ' ' + suffix
1106 return str(n) + ' '
1107
1108 @staticmethod
1109 def get_pretty_row(elems: Sequence[str], width: int) -> str:
1110 """
1111 Takes an array of elements and returns a string with those elements
1112 formatted as a table row. Useful for polling modules.
1113
1114 :param elems: the elements to be printed
1115 :param width: the width of the terminal
1116 """
1117 n = len(elems)
1118 column_width = int(width / n)
1119
1120 ret = '|'
1121 for elem in elems:
1122 ret += '{0:>{w}} |'.format(elem, w=column_width - 2)
1123
1124 return ret
1125
1126 def get_pretty_header(self, elems: Sequence[str], width: int) -> str:
1127 """
1128 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
1129
1130 :param elems: the elements to be printed
1131 :param width: the width of the terminal
1132 """
1133 n = len(elems)
1134 column_width = int(width / n)
1135
1136 # dash line
1137 ret = '+'
1138 for i in range(0, n):
1139 ret += '-' * (column_width - 1) + '+'
1140 ret += '\n'
1141
1142 # title
1143 ret += self.get_pretty_row(elems, width)
1144 ret += '\n'
1145
1146 # dash line
1147 ret += '+'
1148 for i in range(0, n):
1149 ret += '-' * (column_width - 1) + '+'
1150 ret += '\n'
1151
1152 return ret
1153
1154 def get_server(self, hostname) -> ServerInfoT:
1155 """
1156 Called by the plugin to fetch metadata about a particular hostname from
1157 ceph-mgr.
1158
1159 This is information that ceph-mgr has gleaned from the daemon metadata
1160 reported by daemons running on a particular server.
1161
1162 :param hostname: a hostname
1163 """
1164 return cast(ServerInfoT, self._ceph_get_server(hostname))
1165
1166 def get_perf_schema(self,
1167 svc_type: str,
1168 svc_name: str) -> Dict[str,
1169 Dict[str, Dict[str, Union[str, int]]]]:
1170 """
1171 Called by the plugin to fetch perf counter schema info.
1172 svc_name can be nullptr, as can svc_type, in which case
1173 they are wildcards
1174
1175 :param str svc_type:
1176 :param str svc_name:
1177 :return: list of dicts describing the counters requested
1178 """
1179 return self._ceph_get_perf_schema(svc_type, svc_name)
1180
1181 def get_counter(self,
1182 svc_type: str,
1183 svc_name: str,
1184 path: str) -> Dict[str, List[Tuple[float, int]]]:
1185 """
1186 Called by the plugin to fetch the latest performance counter data for a
1187 particular counter on a particular service.
1188
1189 :param str svc_type:
1190 :param str svc_name:
1191 :param str path: a period-separated concatenation of the subsystem and the
1192 counter name, for example "mds.inodes".
1193 :return: A dict of counter names to their values. each value is a list of
1194 of two-tuples of (timestamp, value). This may be empty if no data is
1195 available.
1196 """
1197 return self._ceph_get_counter(svc_type, svc_name, path)
1198
1199 def get_latest_counter(self,
1200 svc_type: str,
1201 svc_name: str,
1202 path: str) -> Dict[str, Union[Tuple[float, int],
1203 Tuple[float, int, int]]]:
1204 """
1205 Called by the plugin to fetch only the newest performance counter data
1206 point for a particular counter on a particular service.
1207
1208 :param str svc_type:
1209 :param str svc_name:
1210 :param str path: a period-separated concatenation of the subsystem and the
1211 counter name, for example "mds.inodes".
1212 :return: A list of two-tuples of (timestamp, value) or three-tuple of
1213 (timestamp, value, count) is returned. This may be empty if no
1214 data is available.
1215 """
1216 return self._ceph_get_latest_counter(svc_type, svc_name, path)
1217
1218 def list_servers(self) -> List[ServerInfoT]:
1219 """
1220 Like ``get_server``, but gives information about all servers (i.e. all
1221 unique hostnames that have been mentioned in daemon metadata)
1222
1223 :return: a list of information about all servers
1224 :rtype: list
1225 """
1226 return cast(List[ServerInfoT], self._ceph_get_server(None))
1227
1228 def get_metadata(self,
1229 svc_type: str,
1230 svc_id: str,
1231 default: Optional[Dict[str, str]] = None) -> Optional[Dict[str, str]]:
1232 """
1233 Fetch the daemon metadata for a particular service.
1234
1235 ceph-mgr fetches metadata asynchronously, so are windows of time during
1236 addition/removal of services where the metadata is not available to
1237 modules. ``None`` is returned if no metadata is available.
1238
1239 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1240 :param str svc_id: service id. convert OSD integer IDs to strings when
1241 calling this
1242 :rtype: dict, or None if no metadata found
1243 """
1244 metadata = self._ceph_get_metadata(svc_type, svc_id)
1245 if metadata is None:
1246 return default
1247 return metadata
1248
1249 def get_daemon_status(self, svc_type: str, svc_id: str) -> Dict[str, str]:
1250 """
1251 Fetch the latest status for a particular service daemon.
1252
1253 This method may return ``None`` if no status information is
1254 available, for example because the daemon hasn't fully started yet.
1255
1256 :param svc_type: string (e.g., 'rgw')
1257 :param svc_id: string
1258 :return: dict, or None if the service is not found
1259 """
1260 return self._ceph_get_daemon_status(svc_type, svc_id)
1261
1262 def check_mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> HandleCommandResult:
1263 """
1264 Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
1265 if ``retval != 0``.
1266 """
1267
1268 r = HandleCommandResult(*self.mon_command(cmd_dict, inbuf))
1269 if r.retval:
1270 raise MonCommandFailed(f'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
1271 return r
1272
1273 def mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1274 """
1275 Helper for modules that do simple, synchronous mon command
1276 execution.
1277
1278 See send_command for general case.
1279
1280 :return: status int, out std, err str
1281 """
1282
1283 t1 = time.time()
1284 result = CommandResult()
1285 self.send_command(result, "mon", "", json.dumps(cmd_dict), "", inbuf)
1286 r = result.wait()
1287 t2 = time.time()
1288
1289 self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1290 cmd_dict['prefix'], r[0], t2 - t1
1291 ))
1292
1293 return r
1294
1295 def send_command(
1296 self,
1297 result: CommandResult,
1298 svc_type: str,
1299 svc_id: str,
1300 command: str,
1301 tag: str,
1302 inbuf: Optional[str] = None) -> None:
1303 """
1304 Called by the plugin to send a command to the mon
1305 cluster.
1306
1307 :param CommandResult result: an instance of the ``CommandResult``
1308 class, defined in the same module as MgrModule. This acts as a
1309 completion and stores the output of the command. Use
1310 ``CommandResult.wait()`` if you want to block on completion.
1311 :param str svc_type:
1312 :param str svc_id:
1313 :param str command: a JSON-serialized command. This uses the same
1314 format as the ceph command line, which is a dictionary of command
1315 arguments, with the extra ``prefix`` key containing the command
1316 name itself. Consult MonCommands.h for available commands and
1317 their expected arguments.
1318 :param str tag: used for nonblocking operation: when a command
1319 completes, the ``notify()`` callback on the MgrModule instance is
1320 triggered, with notify_type set to "command", and notify_id set to
1321 the tag of the command.
1322 :param str inbuf: input buffer for sending additional data.
1323 """
1324 self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
1325
1326 def tool_exec(
1327 self,
1328 args: List[str],
1329 timeout: int = 10,
1330 stdin: Optional[bytes] = None
1331 ) -> Tuple[int, str, str]:
1332 try:
1333 tool = args.pop(0)
1334 cmd = [
1335 tool,
1336 '-k', str(self.get_ceph_option('keyring')),
1337 '-n', f'mgr.{self.get_mgr_id()}',
1338 ] + args
1339 self.log.debug('exec: ' + ' '.join(cmd))
1340 p = subprocess.run(
1341 cmd,
1342 input=stdin,
1343 stdout=subprocess.PIPE,
1344 stderr=subprocess.PIPE,
1345 timeout=timeout,
1346 )
1347 except subprocess.TimeoutExpired as ex:
1348 self.log.error(ex)
1349 return -errno.ETIMEDOUT, '', str(ex)
1350 if p.returncode:
1351 self.log.error(f'Non-zero return from {cmd}: {p.stderr.decode()}')
1352 return p.returncode, p.stdout.decode(), p.stderr.decode()
1353
1354 def set_health_checks(self, checks: HealthChecksT) -> None:
1355 """
1356 Set the module's current map of health checks. Argument is a
1357 dict of check names to info, in this form:
1358
1359 ::
1360
1361 {
1362 'CHECK_FOO': {
1363 'severity': 'warning', # or 'error'
1364 'summary': 'summary string',
1365 'count': 4, # quantify badness
1366 'detail': [ 'list', 'of', 'detail', 'strings' ],
1367 },
1368 'CHECK_BAR': {
1369 'severity': 'error',
1370 'summary': 'bars are bad',
1371 'detail': [ 'too hard' ],
1372 },
1373 }
1374
1375 :param list: dict of health check dicts
1376 """
1377 self._ceph_set_health_checks(checks)
1378
1379 def _handle_command(self,
1380 inbuf: str,
1381 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1382 Tuple[int, str, str]]:
1383 if cmd['prefix'] not in CLICommand.COMMANDS:
1384 return self.handle_command(inbuf, cmd)
1385
1386 return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
1387
1388 def handle_command(self,
1389 inbuf: str,
1390 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1391 Tuple[int, str, str]]:
1392 """
1393 Called by ceph-mgr to request the plugin to handle one
1394 of the commands that it declared in self.COMMANDS
1395
1396 Return a status code, an output buffer, and an
1397 output string. The output buffer is for data results,
1398 the output string is for informative text.
1399
1400 :param inbuf: content of any "-i <file>" supplied to ceph cli
1401 :type inbuf: str
1402 :param cmd: from Ceph's cmdmap_t
1403 :type cmd: dict
1404
1405 :return: HandleCommandResult or a 3-tuple of (int, str, str)
1406 """
1407
1408 # Should never get called if they didn't declare
1409 # any ``COMMANDS``
1410 raise NotImplementedError()
1411
1412 def get_mgr_id(self) -> str:
1413 """
1414 Retrieve the name of the manager daemon where this plugin
1415 is currently being executed (i.e. the active manager).
1416
1417 :return: str
1418 """
1419 return self._ceph_get_mgr_id()
1420
1421 def get_ceph_conf_path(self) -> str:
1422 return self._ceph_get_ceph_conf_path()
1423
1424 def get_mgr_ip(self) -> str:
1425 ips = self.get("mgr_ips").get('ips', [])
1426 if not ips:
1427 return socket.gethostname()
1428 return ips[0]
1429
1430 def get_ceph_option(self, key: str) -> OptionValue:
1431 return self._ceph_get_option(key)
1432
1433 def get_foreign_ceph_option(self, entity: str, key: str) -> OptionValue:
1434 return self._ceph_get_foreign_option(entity, key)
1435
1436 def _validate_module_option(self, key: str) -> None:
1437 """
1438 Helper: don't allow get/set config callers to
1439 access config options that they didn't declare
1440 in their schema.
1441 """
1442 if key not in [o['name'] for o in self.MODULE_OPTIONS]:
1443 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1444 format(key, self.__class__.__name__))
1445
1446 def _get_module_option(self,
1447 key: str,
1448 default: OptionValue,
1449 localized_prefix: str = "") -> OptionValue:
1450 r = self._ceph_get_module_option(self.module_name, key,
1451 localized_prefix)
1452 if r is None:
1453 return self.MODULE_OPTION_DEFAULTS.get(key, default)
1454 else:
1455 return r
1456
1457 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
1458 """
1459 Retrieve the value of a persistent configuration setting
1460 """
1461 self._validate_module_option(key)
1462 return self._get_module_option(key, default)
1463
1464 def get_module_option_ex(self, module: str,
1465 key: str,
1466 default: OptionValue = None) -> OptionValue:
1467 """
1468 Retrieve the value of a persistent configuration setting
1469 for the specified module.
1470
1471 :param module: The name of the module, e.g. 'dashboard'
1472 or 'telemetry'.
1473 :param key: The configuration key, e.g. 'server_addr'.
1474 :param default: The default value to use when the
1475 returned value is ``None``. Defaults to ``None``.
1476 """
1477 if module == self.module_name:
1478 self._validate_module_option(key)
1479 r = self._ceph_get_module_option(module, key)
1480 return default if r is None else r
1481
1482 def get_store_prefix(self, key_prefix: str) -> Dict[str, str]:
1483 """
1484 Retrieve a dict of KV store keys to values, where the keys
1485 have the given prefix
1486
1487 :param str key_prefix:
1488 :return: str
1489 """
1490 return self._ceph_get_store_prefix(key_prefix)
1491
1492 def _set_localized(self,
1493 key: str,
1494 val: Optional[str],
1495 setter: Callable[[str, Optional[str]], None]) -> None:
1496 return setter(_get_localized_key(self.get_mgr_id(), key), val)
1497
1498 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
1499 """
1500 Retrieve localized configuration for this ceph-mgr instance
1501 """
1502 self._validate_module_option(key)
1503 return self._get_module_option(key, default, self.get_mgr_id())
1504
1505 def _set_module_option(self, key: str, val: Any) -> None:
1506 return self._ceph_set_module_option(self.module_name, key,
1507 None if val is None else str(val))
1508
1509 def set_module_option(self, key: str, val: Any) -> None:
1510 """
1511 Set the value of a persistent configuration setting
1512
1513 :param str key:
1514 :type val: str | None
1515 """
1516 self._validate_module_option(key)
1517 return self._set_module_option(key, val)
1518
1519 def set_module_option_ex(self, module: str, key: str, val: OptionValue) -> None:
1520 """
1521 Set the value of a persistent configuration setting
1522 for the specified module.
1523
1524 :param str module:
1525 :param str key:
1526 :param str val:
1527 """
1528 if module == self.module_name:
1529 self._validate_module_option(key)
1530 return self._ceph_set_module_option(module, key, str(val))
1531
1532 def set_localized_module_option(self, key: str, val: Optional[str]) -> None:
1533 """
1534 Set localized configuration for this ceph-mgr instance
1535 :param str key:
1536 :param str val:
1537 :return: str
1538 """
1539 self._validate_module_option(key)
1540 return self._set_localized(key, val, self._set_module_option)
1541
1542 def set_store(self, key: str, val: Optional[str]) -> None:
1543 """
1544 Set a value in this module's persistent key value store.
1545 If val is None, remove key from store
1546 """
1547 self._ceph_set_store(key, val)
1548
1549 def get_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
1550 """
1551 Get a value from this module's persistent key value store
1552 """
1553 r = self._ceph_get_store(key)
1554 if r is None:
1555 return default
1556 else:
1557 return r
1558
1559 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
1560 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
1561 if r is None:
1562 r = self._ceph_get_store(key)
1563 if r is None:
1564 r = default
1565 return r
1566
1567 def set_localized_store(self, key: str, val: Optional[str]) -> None:
1568 return self._set_localized(key, val, self.set_store)
1569
1570 def self_test(self) -> None:
1571 """
1572 Run a self-test on the module. Override this function and implement
1573 a best as possible self-test for (automated) testing of the module
1574
1575 Indicate any failures by raising an exception. This does not have
1576 to be pretty, it's mainly for picking up regressions during
1577 development, rather than use in the field.
1578
1579 :return: None, or an advisory string for developer interest, such
1580 as a json dump of some state.
1581 """
1582 pass
1583
1584 def get_osdmap(self) -> OSDMap:
1585 """
1586 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1587 OSDMap.
1588 :return: OSDMap
1589 """
1590 return cast(OSDMap, self._ceph_get_osdmap())
1591
1592 def get_latest(self, daemon_type: str, daemon_name: str, counter: str) -> int:
1593 data = self.get_latest_counter(
1594 daemon_type, daemon_name, counter)[counter]
1595 if data:
1596 return data[1]
1597 else:
1598 return 0
1599
1600 def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tuple[int, int]:
1601 data = self.get_latest_counter(
1602 daemon_type, daemon_name, counter)[counter]
1603 if data:
1604 # https://github.com/python/mypy/issues/1178
1605 _, value, count = cast(Tuple[float, int, int], data)
1606 return value, count
1607 else:
1608 return 0, 0
1609
1610 @profile_method()
1611 def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL,
1612 services: Sequence[str] = ("mds", "mon", "osd",
1613 "rbd-mirror", "rgw",
1614 "tcmu-runner")) -> Dict[str, dict]:
1615 """
1616 Return the perf counters currently known to this ceph-mgr
1617 instance, filtered by priority equal to or greater than `prio_limit`.
1618
1619 The result is a map of string to dict, associating services
1620 (like "osd.123") with their counters. The counter
1621 dict for each service maps counter paths to a counter
1622 info structure, which is the information from
1623 the schema, plus an additional "value" member with the latest
1624 value.
1625 """
1626
1627 result = defaultdict(dict) # type: Dict[str, dict]
1628
1629 for server in self.list_servers():
1630 for service in cast(List[ServiceInfoT], server['services']):
1631 if service['type'] not in services:
1632 continue
1633
1634 schemas = self.get_perf_schema(service['type'], service['id'])
1635 if not schemas:
1636 self.log.warning("No perf counter schema for {0}.{1}".format(
1637 service['type'], service['id']
1638 ))
1639 continue
1640
1641 # Value is returned in a potentially-multi-service format,
1642 # get just the service we're asking about
1643 svc_full_name = "{0}.{1}".format(
1644 service['type'], service['id'])
1645 schema = schemas[svc_full_name]
1646
1647 # Populate latest values
1648 for counter_path, counter_schema in schema.items():
1649 # self.log.debug("{0}: {1}".format(
1650 # counter_path, json.dumps(counter_schema)
1651 # ))
1652 priority = counter_schema['priority']
1653 assert isinstance(priority, int)
1654 if priority < prio_limit:
1655 continue
1656
1657 tp = counter_schema['type']
1658 assert isinstance(tp, int)
1659 counter_info = dict(counter_schema)
1660 # Also populate count for the long running avgs
1661 if tp & self.PERFCOUNTER_LONGRUNAVG:
1662 v, c = self.get_latest_avg(
1663 service['type'],
1664 service['id'],
1665 counter_path
1666 )
1667 counter_info['value'], counter_info['count'] = v, c
1668 result[svc_full_name][counter_path] = counter_info
1669 else:
1670 counter_info['value'] = self.get_latest(
1671 service['type'],
1672 service['id'],
1673 counter_path
1674 )
1675
1676 result[svc_full_name][counter_path] = counter_info
1677
1678 self.log.debug("returning {0} counter".format(len(result)))
1679
1680 return result
1681
1682 def set_uri(self, uri: str) -> None:
1683 """
1684 If the module exposes a service, then call this to publish the
1685 address once it is available.
1686
1687 :return: a string
1688 """
1689 return self._ceph_set_uri(uri)
1690
1691 def set_device_wear_level(self, devid: str, wear_level: float) -> None:
1692 return self._ceph_set_device_wear_level(devid, wear_level)
1693
1694 def have_mon_connection(self) -> bool:
1695 """
1696 Check whether this ceph-mgr daemon has an open connection
1697 to a monitor. If it doesn't, then it's likely that the
1698 information we have about the cluster is out of date,
1699 and/or the monitor cluster is down.
1700 """
1701
1702 return self._ceph_have_mon_connection()
1703
1704 def update_progress_event(self,
1705 evid: str,
1706 desc: str,
1707 progress: float,
1708 add_to_ceph_s: bool) -> None:
1709 return self._ceph_update_progress_event(evid, desc, progress, add_to_ceph_s)
1710
1711 def complete_progress_event(self, evid: str) -> None:
1712 return self._ceph_complete_progress_event(evid)
1713
1714 def clear_all_progress_events(self) -> None:
1715 return self._ceph_clear_all_progress_events()
1716
1717 @property
1718 def rados(self) -> rados.Rados:
1719 """
1720 A librados instance to be shared by any classes within
1721 this mgr module that want one.
1722 """
1723 if self._rados:
1724 return self._rados
1725
1726 ctx_capsule = self.get_context()
1727 self._rados = rados.Rados(context=ctx_capsule)
1728 self._rados.connect()
1729 self._ceph_register_client(self._rados.get_addrs())
1730 return self._rados
1731
1732 @staticmethod
1733 def can_run() -> Tuple[bool, str]:
1734 """
1735 Implement this function to report whether the module's dependencies
1736 are met. For example, if the module needs to import a particular
1737 dependency to work, then use a try/except around the import at
1738 file scope, and then report here if the import failed.
1739
1740 This will be called in a blocking way from the C++ code, so do not
1741 do any I/O that could block in this function.
1742
1743 :return a 2-tuple consisting of a boolean and explanatory string
1744 """
1745
1746 return True, ""
1747
1748 def remote(self, module_name: str, method_name: str, *args: Any, **kwargs: Any) -> Any:
1749 """
1750 Invoke a method on another module. All arguments, and the return
1751 value from the other module must be serializable.
1752
1753 Limitation: Do not import any modules within the called method.
1754 Otherwise you will get an error in Python 2::
1755
1756 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
1757
1758
1759
1760 :param module_name: Name of other module. If module isn't loaded,
1761 an ImportError exception is raised.
1762 :param method_name: Method name. If it does not exist, a NameError
1763 exception is raised.
1764 :param args: Argument tuple
1765 :param kwargs: Keyword argument dict
1766 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
1767 :raises ImportError: No such module
1768 """
1769 return self._ceph_dispatch_remote(module_name, method_name,
1770 args, kwargs)
1771
1772 def add_osd_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
1773 """
1774 Register an OSD perf query. Argument is a
1775 dict of the query parameters, in this form:
1776
1777 ::
1778
1779 {
1780 'key_descriptor': [
1781 {'type': subkey_type, 'regex': regex_pattern},
1782 ...
1783 ],
1784 'performance_counter_descriptors': [
1785 list, of, descriptor, types
1786 ],
1787 'limit': {'order_by': performance_counter_type, 'max_count': n},
1788 }
1789
1790 Valid subkey types:
1791 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
1792 'pg_id', 'object_name', 'snap_id'
1793 Valid performance counter types:
1794 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
1795 'latency', 'write_latency', 'read_latency'
1796
1797 :param object query: query
1798 :rtype: int (query id)
1799 """
1800 return self._ceph_add_osd_perf_query(query)
1801
1802 def remove_osd_perf_query(self, query_id: int) -> None:
1803 """
1804 Unregister an OSD perf query.
1805
1806 :param int query_id: query ID
1807 """
1808 return self._ceph_remove_osd_perf_query(query_id)
1809
1810 def get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
1811 """
1812 Get stats collected for an OSD perf query.
1813
1814 :param int query_id: query ID
1815 """
1816 return self._ceph_get_osd_perf_counters(query_id)
1817
1818 def add_mds_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
1819 """
1820 Register an MDS perf query. Argument is a
1821 dict of the query parameters, in this form:
1822
1823 ::
1824
1825 {
1826 'key_descriptor': [
1827 {'type': subkey_type, 'regex': regex_pattern},
1828 ...
1829 ],
1830 'performance_counter_descriptors': [
1831 list, of, descriptor, types
1832 ],
1833 }
1834
1835 NOTE: 'limit' and 'order_by' are not supported (yet).
1836
1837 Valid subkey types:
1838 'mds_rank', 'client_id'
1839 Valid performance counter types:
1840 'cap_hit_metric'
1841
1842 :param object query: query
1843 :rtype: int (query id)
1844 """
1845 return self._ceph_add_mds_perf_query(query)
1846
1847 def remove_mds_perf_query(self, query_id: int) -> None:
1848 """
1849 Unregister an MDS perf query.
1850
1851 :param int query_id: query ID
1852 """
1853 return self._ceph_remove_mds_perf_query(query_id)
1854
1855 def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
1856 """
1857 Get stats collected for an MDS perf query.
1858
1859 :param int query_id: query ID
1860 """
1861 return self._ceph_get_mds_perf_counters(query_id)
1862
1863 def is_authorized(self, arguments: Dict[str, str]) -> bool:
1864 """
1865 Verifies that the current session caps permit executing the py service
1866 or current module with the provided arguments. This provides a generic
1867 way to allow modules to restrict by more fine-grained controls (e.g.
1868 pools).
1869
1870 :param arguments: dict of key/value arguments to test
1871 """
1872 return self._ceph_is_authorized(arguments)
1873
1874 def send_rgwadmin_command(self, args: List[str],
1875 stdout_as_json: bool = True) -> Tuple[int, Union[str, dict], str]:
1876 try:
1877 cmd = [
1878 'radosgw-admin',
1879 '-c', str(self.get_ceph_conf_path()),
1880 '-k', str(self.get_ceph_option('keyring')),
1881 '-n', f'mgr.{self.get_mgr_id()}',
1882 ] + args
1883 self.log.debug('Executing %s', str(cmd))
1884 result = subprocess.run( # pylint: disable=subprocess-run-check
1885 cmd,
1886 stdout=subprocess.PIPE,
1887 stderr=subprocess.PIPE,
1888 timeout=10,
1889 )
1890 stdout = result.stdout.decode('utf-8')
1891 stderr = result.stderr.decode('utf-8')
1892 if stdout and stdout_as_json:
1893 stdout = json.loads(stdout)
1894 if result.returncode:
1895 self.log.debug('Error %s executing %s: %s', result.returncode, str(cmd), stderr)
1896 return result.returncode, stdout, stderr
1897 except subprocess.CalledProcessError as ex:
1898 self.log.exception('Error executing radosgw-admin %s: %s', str(ex.cmd), str(ex.output))
1899 raise
1900 except subprocess.TimeoutExpired as ex:
1901 self.log.error('Timeout (10s) executing radosgw-admin %s', str(ex.cmd))
1902 raise