]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
1730cdc226fca262f41ecf3106270c5156e0bb5b
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
1 import ceph_module # noqa
2
3 from typing import cast, Tuple, Any, Dict, Generic, Optional, Callable, List, \
4 Mapping, NamedTuple, Sequence, Union, TYPE_CHECKING
5 if TYPE_CHECKING:
6 import sys
7 if sys.version_info >= (3, 8):
8 from typing import Literal
9 else:
10 from typing_extensions import Literal
11
12 import inspect
13 import logging
14 import errno
15 import functools
16 import json
17 import subprocess
18 import threading
19 from collections import defaultdict
20 from enum import IntEnum, Enum
21 import rados
22 import re
23 import socket
24 import sqlite3
25 import sys
26 import time
27 from ceph_argparse import CephArgtype
28 from mgr_util import profile_method
29
30 if sys.version_info >= (3, 8):
31 from typing import get_args, get_origin
32 else:
33 def get_args(tp: Any) -> Any:
34 if tp is Generic:
35 return tp
36 else:
37 return getattr(tp, '__args__', ())
38
39 def get_origin(tp: Any) -> Any:
40 return getattr(tp, '__origin__', None)
41
42
43 ERROR_MSG_EMPTY_INPUT_FILE = 'Empty input file'
44 ERROR_MSG_NO_INPUT_FILE = 'Input file not specified'
45 # Full list of strings in "osd_types.cc:pg_state_string()"
46 PG_STATES = [
47 "active",
48 "clean",
49 "down",
50 "recovery_unfound",
51 "backfill_unfound",
52 "scrubbing",
53 "degraded",
54 "inconsistent",
55 "peering",
56 "repair",
57 "recovering",
58 "forced_recovery",
59 "backfill_wait",
60 "incomplete",
61 "stale",
62 "remapped",
63 "deep",
64 "backfilling",
65 "forced_backfill",
66 "backfill_toofull",
67 "recovery_wait",
68 "recovery_toofull",
69 "undersized",
70 "activating",
71 "peered",
72 "snaptrim",
73 "snaptrim_wait",
74 "snaptrim_error",
75 "creating",
76 "unknown",
77 "premerge",
78 "failed_repair",
79 "laggy",
80 "wait",
81 ]
82
83 NFS_GANESHA_SUPPORTED_FSALS = ['CEPH', 'RGW']
84 NFS_POOL_NAME = '.nfs'
85
86
87 class NotifyType(str, Enum):
88 mon_map = 'mon_map'
89 pg_summary = 'pg_summary'
90 health = 'health'
91 clog = 'clog'
92 osd_map = 'osd_map'
93 fs_map = 'fs_map'
94 command = 'command'
95
96 # these are disabled because there are no users.
97 # see Mgr.cc:
98 # service_map = 'service_map'
99 # mon_status = 'mon_status'
100 # see DaemonServer.cc:
101 # perf_schema_update = 'perf_schema_update'
102
103
104 class CommandResult(object):
105 """
106 Use with MgrModule.send_command
107 """
108
109 def __init__(self, tag: Optional[str] = None):
110 self.ev = threading.Event()
111 self.outs = ""
112 self.outb = ""
113 self.r = 0
114
115 # This is just a convenience for notifications from
116 # C++ land, to avoid passing addresses around in messages.
117 self.tag = tag if tag else ""
118
119 def complete(self, r: int, outb: str, outs: str) -> None:
120 self.r = r
121 self.outb = outb
122 self.outs = outs
123 self.ev.set()
124
125 def wait(self) -> Tuple[int, str, str]:
126 self.ev.wait()
127 return self.r, self.outb, self.outs
128
129
130 class HandleCommandResult(NamedTuple):
131 """
132 Tuple containing the result of `handle_command()`
133
134 Only write to stderr if there is an error, or in extraordinary circumstances
135
136 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
137 is critical information to include there.
138
139 Everything programmatically consumable should be put on stdout
140 """
141 retval: int = 0 # return code. E.g. 0 or -errno.EINVAL
142 stdout: str = "" # data of this result.
143 stderr: str = "" # Typically used for error messages.
144
145
146 class MonCommandFailed(RuntimeError): pass
147 class MgrDBNotReady(RuntimeError): pass
148
149
150 class OSDMap(ceph_module.BasePyOSDMap):
151 def get_epoch(self) -> int:
152 return self._get_epoch()
153
154 def get_crush_version(self) -> int:
155 return self._get_crush_version()
156
157 def dump(self) -> Dict[str, Any]:
158 return self._dump()
159
160 def get_pools(self) -> Dict[int, Dict[str, Any]]:
161 # FIXME: efficient implementation
162 d = self._dump()
163 return dict([(p['pool'], p) for p in d['pools']])
164
165 def get_pools_by_name(self) -> Dict[str, Dict[str, Any]]:
166 # FIXME: efficient implementation
167 d = self._dump()
168 return dict([(p['pool_name'], p) for p in d['pools']])
169
170 def new_incremental(self) -> 'OSDMapIncremental':
171 return self._new_incremental()
172
173 def apply_incremental(self, inc: 'OSDMapIncremental') -> 'OSDMap':
174 return self._apply_incremental(inc)
175
176 def get_crush(self) -> 'CRUSHMap':
177 return self._get_crush()
178
179 def get_pools_by_take(self, take: int) -> List[int]:
180 return self._get_pools_by_take(take).get('pools', [])
181
182 def calc_pg_upmaps(self, inc: 'OSDMapIncremental',
183 max_deviation: int,
184 max_iterations: int = 10,
185 pools: Optional[List[str]] = None) -> int:
186 if pools is None:
187 pools = []
188 return self._calc_pg_upmaps(
189 inc,
190 max_deviation, max_iterations, pools)
191
192 def map_pool_pgs_up(self, poolid: int) -> List[int]:
193 return self._map_pool_pgs_up(poolid)
194
195 def pg_to_up_acting_osds(self, pool_id: int, ps: int) -> Dict[str, Any]:
196 return self._pg_to_up_acting_osds(pool_id, ps)
197
198 def pool_raw_used_rate(self, pool_id: int) -> float:
199 return self._pool_raw_used_rate(pool_id)
200
201 @classmethod
202 def build_simple(cls, epoch: int = 1, uuid: Optional[str] = None, num_osd: int = -1) -> 'ceph_module.BasePyOSDMap':
203 return cls._build_simple(epoch, uuid, num_osd)
204
205 def get_ec_profile(self, name: str) -> Optional[List[Dict[str, str]]]:
206 # FIXME: efficient implementation
207 d = self._dump()
208 return d['erasure_code_profiles'].get(name, None)
209
210 def get_require_osd_release(self) -> str:
211 d = self._dump()
212 return d['require_osd_release']
213
214
215 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
216 def get_epoch(self) -> int:
217 return self._get_epoch()
218
219 def dump(self) -> Dict[str, Any]:
220 return self._dump()
221
222 def set_osd_reweights(self, weightmap: Dict[int, float]) -> None:
223 """
224 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
225 """
226 return self._set_osd_reweights(weightmap)
227
228 def set_crush_compat_weight_set_weights(self, weightmap: Dict[str, float]) -> None:
229 """
230 weightmap is a dict, int to float. devices only. e.g.,
231 { 0: 3.4, 1: 3.3, 2: 3.334 }
232 """
233 return self._set_crush_compat_weight_set_weights(weightmap)
234
235
236 class CRUSHMap(ceph_module.BasePyCRUSH):
237 ITEM_NONE = 0x7fffffff
238 DEFAULT_CHOOSE_ARGS = '-1'
239
240 def dump(self) -> Dict[str, Any]:
241 return self._dump()
242
243 def get_item_weight(self, item: int) -> Optional[int]:
244 return self._get_item_weight(item)
245
246 def get_item_name(self, item: int) -> Optional[str]:
247 return self._get_item_name(item)
248
249 def find_takes(self) -> List[int]:
250 return self._find_takes().get('takes', [])
251
252 def find_roots(self) -> List[int]:
253 return self._find_roots().get('roots', [])
254
255 def get_take_weight_osd_map(self, root: int) -> Dict[int, float]:
256 uglymap = self._get_take_weight_osd_map(root)
257 return {int(k): v for k, v in uglymap.get('weights', {}).items()}
258
259 @staticmethod
260 def have_default_choose_args(dump: Dict[str, Any]) -> bool:
261 return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
262
263 @staticmethod
264 def get_default_choose_args(dump: Dict[str, Any]) -> List[Dict[str, Any]]:
265 choose_args = dump.get('choose_args')
266 assert isinstance(choose_args, dict)
267 return choose_args.get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
268
269 def get_rule(self, rule_name: str) -> Optional[Dict[str, Any]]:
270 # TODO efficient implementation
271 for rule in self.dump()['rules']:
272 if rule_name == rule['rule_name']:
273 return rule
274
275 return None
276
277 def get_rule_by_id(self, rule_id: int) -> Optional[Dict[str, Any]]:
278 for rule in self.dump()['rules']:
279 if rule['rule_id'] == rule_id:
280 return rule
281
282 return None
283
284 def get_rule_root(self, rule_name: str) -> Optional[int]:
285 rule = self.get_rule(rule_name)
286 if rule is None:
287 return None
288
289 try:
290 first_take = next(s for s in rule['steps'] if s.get('op') == 'take')
291 except StopIteration:
292 logging.warning("CRUSH rule '{0}' has no 'take' step".format(
293 rule_name))
294 return None
295 else:
296 return first_take['item']
297
298 def get_osds_under(self, root_id: int) -> List[int]:
299 # TODO don't abuse dump like this
300 d = self.dump()
301 buckets = dict([(b['id'], b) for b in d['buckets']])
302
303 osd_list = []
304
305 def accumulate(b: Dict[str, Any]) -> None:
306 for item in b['items']:
307 if item['id'] >= 0:
308 osd_list.append(item['id'])
309 else:
310 try:
311 accumulate(buckets[item['id']])
312 except KeyError:
313 pass
314
315 accumulate(buckets[root_id])
316
317 return osd_list
318
319 def device_class_counts(self) -> Dict[str, int]:
320 result = defaultdict(int) # type: Dict[str, int]
321 # TODO don't abuse dump like this
322 d = self.dump()
323 for device in d['devices']:
324 cls = device.get('class', None)
325 result[cls] += 1
326
327 return dict(result)
328
329
330 HandlerFuncType = Callable[..., Tuple[int, str, str]]
331
332 def _extract_target_func(
333 f: HandlerFuncType
334 ) -> Tuple[HandlerFuncType, Dict[str, Any]]:
335 """In order to interoperate with other decorated functions,
336 we need to find the original function which will provide
337 the main set of arguments. While we descend through the
338 stack of wrapped functions, gather additional arguments
339 the decorators may want to provide.
340 """
341 # use getattr to keep mypy happy
342 wrapped = getattr(f, "__wrapped__", None)
343 if not wrapped:
344 return f, {}
345 extra_args = {}
346 while wrapped is not None:
347 extra_args.update(getattr(f, "extra_args", {}))
348 f = wrapped
349 wrapped = getattr(f, "__wrapped__", None)
350 return f, extra_args
351
352
353 class CLICommand(object):
354 COMMANDS = {} # type: Dict[str, CLICommand]
355
356 def __init__(self,
357 prefix: str,
358 perm: str = 'rw',
359 poll: bool = False):
360 self.prefix = prefix
361 self.perm = perm
362 self.poll = poll
363 self.func = None # type: Optional[Callable]
364 self.arg_spec = {} # type: Dict[str, Any]
365 self.first_default = -1
366
367 KNOWN_ARGS = '_', 'self', 'mgr', 'inbuf', 'return'
368
369 @classmethod
370 def _load_func_metadata(cls: Any, f: HandlerFuncType) -> Tuple[str, Dict[str, Any], int, str]:
371 f, extra_args = _extract_target_func(f)
372 desc = (inspect.getdoc(f) or '').replace('\n', ' ')
373 full_argspec = inspect.getfullargspec(f)
374 arg_spec = full_argspec.annotations
375 first_default = len(arg_spec)
376 if full_argspec.defaults:
377 first_default -= len(full_argspec.defaults)
378 args = []
379 positional = True
380 for index, arg in enumerate(full_argspec.args):
381 if arg in cls.KNOWN_ARGS:
382 continue
383 if arg == '_end_positional_':
384 positional = False
385 continue
386 if (
387 arg == 'format'
388 or arg_spec[arg] is Optional[bool]
389 or arg_spec[arg] is bool
390 ):
391 # implicit switch to non-positional on any
392 # Optional[bool] or the --format option
393 positional = False
394 assert arg in arg_spec, \
395 f"'{arg}' is not annotated for {f}: {full_argspec}"
396 has_default = index >= first_default
397 args.append(CephArgtype.to_argdesc(arg_spec[arg],
398 dict(name=arg),
399 has_default,
400 positional))
401 for argname, argtype in extra_args.items():
402 # avoid shadowing args from the function
403 if argname in arg_spec:
404 continue
405 arg_spec[argname] = argtype
406 args.append(CephArgtype.to_argdesc(
407 argtype, dict(name=arg), has_default=True, positional=False
408 ))
409 return desc, arg_spec, first_default, ' '.join(args)
410
411 def store_func_metadata(self, f: HandlerFuncType) -> None:
412 self.desc, self.arg_spec, self.first_default, self.args = \
413 self._load_func_metadata(f)
414
415 def __call__(self, func: HandlerFuncType) -> HandlerFuncType:
416 self.store_func_metadata(func)
417 self.func = func
418 self.COMMANDS[self.prefix] = self
419 return self.func
420
421 def _get_arg_value(self, kwargs_switch: bool, key: str, val: Any) -> Tuple[bool, str, Any]:
422 def start_kwargs() -> bool:
423 if isinstance(val, str) and '=' in val:
424 k, v = val.split('=', 1)
425 if k in self.arg_spec:
426 return True
427 return False
428
429 if not kwargs_switch:
430 kwargs_switch = start_kwargs()
431
432 if kwargs_switch:
433 k, v = val.split('=', 1)
434 else:
435 k, v = key, val
436 return kwargs_switch, k.replace('-', '_'), v
437
438 def _collect_args_by_argspec(self, cmd_dict: Dict[str, Any]) -> Dict[str, Any]:
439 kwargs = {}
440 kwargs_switch = False
441 for index, (name, tp) in enumerate(self.arg_spec.items()):
442 if name in CLICommand.KNOWN_ARGS:
443 continue
444 assert self.first_default >= 0
445 raw_v = cmd_dict.get(name)
446 if index >= self.first_default:
447 if raw_v is None:
448 continue
449 kwargs_switch, k, v = self._get_arg_value(kwargs_switch,
450 name, raw_v)
451 kwargs[k] = CephArgtype.cast_to(tp, v)
452 return kwargs
453
454 def call(self,
455 mgr: Any,
456 cmd_dict: Dict[str, Any],
457 inbuf: Optional[str] = None) -> HandleCommandResult:
458 kwargs = self._collect_args_by_argspec(cmd_dict)
459 if inbuf:
460 kwargs['inbuf'] = inbuf
461 assert self.func
462 return self.func(mgr, **kwargs)
463
464 def dump_cmd(self) -> Dict[str, Union[str, bool]]:
465 return {
466 'cmd': '{} {}'.format(self.prefix, self.args),
467 'desc': self.desc,
468 'perm': self.perm,
469 'poll': self.poll,
470 }
471
472 @classmethod
473 def dump_cmd_list(cls) -> List[Dict[str, Union[str, bool]]]:
474 return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
475
476
477 def CLIReadCommand(prefix: str, poll: bool = False) -> CLICommand:
478 return CLICommand(prefix, "r", poll)
479
480
481 def CLIWriteCommand(prefix: str, poll: bool = False) -> CLICommand:
482 return CLICommand(prefix, "w", poll)
483
484
485 def CLICheckNonemptyFileInput(desc: str) -> Callable[[HandlerFuncType], HandlerFuncType]:
486 def CheckFileInput(func: HandlerFuncType) -> HandlerFuncType:
487 @functools.wraps(func)
488 def check(*args: Any, **kwargs: Any) -> Tuple[int, str, str]:
489 if 'inbuf' not in kwargs:
490 return -errno.EINVAL, '', f'{ERROR_MSG_NO_INPUT_FILE}: Please specify the file '\
491 f'containing {desc} with "-i" option'
492 if isinstance(kwargs['inbuf'], str):
493 # Delete new line separator at EOF (it may have been added by a text editor).
494 kwargs['inbuf'] = kwargs['inbuf'].rstrip('\r\n').rstrip('\n')
495 if not kwargs['inbuf'] or not kwargs['inbuf'].strip():
496 return -errno.EINVAL, '', f'{ERROR_MSG_EMPTY_INPUT_FILE}: Please add {desc} to '\
497 'the file'
498 return func(*args, **kwargs)
499 check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
500 return check
501 return CheckFileInput
502
503 def CLIRequiresDB(func: HandlerFuncType) -> HandlerFuncType:
504 @functools.wraps(func)
505 def check(self: MgrModule, *args: Any, **kwargs: Any) -> Tuple[int, str, str]:
506 if not self.db_ready():
507 return -errno.EAGAIN, "", "mgr db not yet available"
508 return func(self, *args, **kwargs)
509 check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
510 return check
511
512 def _get_localized_key(prefix: str, key: str) -> str:
513 return '{}/{}'.format(prefix, key)
514
515
516 """
517 MODULE_OPTIONS types and Option Class
518 """
519 if TYPE_CHECKING:
520 OptionTypeLabel = Literal[
521 'uint', 'int', 'str', 'float', 'bool', 'addr', 'addrvec', 'uuid', 'size', 'secs']
522
523
524 # common/options.h: value_t
525 OptionValue = Optional[Union[bool, int, float, str]]
526
527
528 class Option(Dict):
529 """
530 Helper class to declare options for MODULE_OPTIONS list.
531 TODO: Replace with typing.TypedDict when in python_version >= 3.8
532 """
533
534 def __init__(
535 self,
536 name: str,
537 default: OptionValue = None,
538 type: 'OptionTypeLabel' = 'str',
539 desc: Optional[str] = None,
540 long_desc: Optional[str] = None,
541 min: OptionValue = None,
542 max: OptionValue = None,
543 enum_allowed: Optional[List[str]] = None,
544 tags: Optional[List[str]] = None,
545 see_also: Optional[List[str]] = None,
546 runtime: bool = False,
547 ):
548 super(Option, self).__init__(
549 (k, v) for k, v in vars().items()
550 if k != 'self' and v is not None)
551
552
553 class Command(dict):
554 """
555 Helper class to declare options for COMMANDS list.
556
557 It also allows to specify prefix and args separately, as well as storing a
558 handler callable.
559
560 Usage:
561 >>> def handler(): return 0, "", ""
562 >>> Command(prefix="example",
563 ... handler=handler,
564 ... perm='w')
565 {'perm': 'w', 'poll': False}
566 """
567
568 def __init__(
569 self,
570 prefix: str,
571 handler: HandlerFuncType,
572 perm: str = "rw",
573 poll: bool = False,
574 ):
575 super().__init__(perm=perm,
576 poll=poll)
577 self.prefix = prefix
578 self.handler = handler
579
580 @staticmethod
581 def returns_command_result(instance: Any,
582 f: HandlerFuncType) -> Callable[..., HandleCommandResult]:
583 @functools.wraps(f)
584 def wrapper(mgr: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
585 retval, stdout, stderr = f(instance or mgr, *args, **kwargs)
586 return HandleCommandResult(retval, stdout, stderr)
587 wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined]
588 return wrapper
589
590 def register(self, instance: bool = False) -> HandlerFuncType:
591 """
592 Register a CLICommand handler. It allows an instance to register bound
593 methods. In that case, the mgr instance is not passed, and it's expected
594 to be available in the class instance.
595 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
596 items.
597 """
598 cmd = CLICommand(prefix=self.prefix, perm=self['perm'])
599 return cmd(self.returns_command_result(instance, self.handler))
600
601
602 class CPlusPlusHandler(logging.Handler):
603 def __init__(self, module_inst: Any):
604 super(CPlusPlusHandler, self).__init__()
605 self._module = module_inst
606 self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
607 .format(module_inst.module_name)))
608
609 def emit(self, record: logging.LogRecord) -> None:
610 if record.levelno >= self.level:
611 self._module._ceph_log(self.format(record))
612
613
614 class ClusterLogHandler(logging.Handler):
615 def __init__(self, module_inst: Any):
616 super().__init__()
617 self._module = module_inst
618 self.setFormatter(logging.Formatter("%(message)s"))
619
620 def emit(self, record: logging.LogRecord) -> None:
621 levelmap = {
622 logging.DEBUG: MgrModule.ClusterLogPrio.DEBUG,
623 logging.INFO: MgrModule.ClusterLogPrio.INFO,
624 logging.WARNING: MgrModule.ClusterLogPrio.WARN,
625 logging.ERROR: MgrModule.ClusterLogPrio.ERROR,
626 logging.CRITICAL: MgrModule.ClusterLogPrio.ERROR,
627 }
628 level = levelmap[record.levelno]
629 if record.levelno >= self.level:
630 self._module.cluster_log(self._module.module_name,
631 level,
632 self.format(record))
633
634
635 class FileHandler(logging.FileHandler):
636 def __init__(self, module_inst: Any):
637 path = module_inst.get_ceph_option("log_file")
638 idx = path.rfind(".log")
639 if idx != -1:
640 self.path = "{}.{}.log".format(path[:idx], module_inst.module_name)
641 else:
642 self.path = "{}.{}".format(path, module_inst.module_name)
643 super(FileHandler, self).__init__(self.path, delay=True)
644 self.setFormatter(logging.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
645
646
647 class MgrModuleLoggingMixin(object):
648 def _configure_logging(self,
649 mgr_level: str,
650 module_level: str,
651 cluster_level: str,
652 log_to_file: bool,
653 log_to_cluster: bool) -> None:
654 self._mgr_level: Optional[str] = None
655 self._module_level: Optional[str] = None
656 self._root_logger = logging.getLogger()
657
658 self._unconfigure_logging()
659
660 # the ceph log handler is initialized only once
661 self._mgr_log_handler = CPlusPlusHandler(self)
662 self._cluster_log_handler = ClusterLogHandler(self)
663 self._file_log_handler = FileHandler(self)
664
665 self.log_to_file = log_to_file
666 self.log_to_cluster = log_to_cluster
667
668 self._root_logger.addHandler(self._mgr_log_handler)
669 if log_to_file:
670 self._root_logger.addHandler(self._file_log_handler)
671 if log_to_cluster:
672 self._root_logger.addHandler(self._cluster_log_handler)
673
674 self._root_logger.setLevel(logging.NOTSET)
675 self._set_log_level(mgr_level, module_level, cluster_level)
676
677 def _unconfigure_logging(self) -> None:
678 # remove existing handlers:
679 rm_handlers = [
680 h for h in self._root_logger.handlers
681 if (isinstance(h, CPlusPlusHandler) or
682 isinstance(h, FileHandler) or
683 isinstance(h, ClusterLogHandler))]
684 for h in rm_handlers:
685 self._root_logger.removeHandler(h)
686 self.log_to_file = False
687 self.log_to_cluster = False
688
689 def _set_log_level(self,
690 mgr_level: str,
691 module_level: str,
692 cluster_level: str) -> None:
693 self._cluster_log_handler.setLevel(cluster_level.upper())
694
695 module_level = module_level.upper() if module_level else ''
696 if not self._module_level:
697 # using debug_mgr level
698 if not module_level and self._mgr_level == mgr_level:
699 # no change in module level neither in debug_mgr
700 return
701 else:
702 if self._module_level == module_level:
703 # no change in module level
704 return
705
706 if not self._module_level and not module_level:
707 level = self._ceph_log_level_to_python(mgr_level)
708 self.getLogger().debug("setting log level based on debug_mgr: %s (%s)",
709 level, mgr_level)
710 elif self._module_level and not module_level:
711 level = self._ceph_log_level_to_python(mgr_level)
712 self.getLogger().warning("unsetting module log level, falling back to "
713 "debug_mgr level: %s (%s)", level, mgr_level)
714 elif module_level:
715 level = module_level
716 self.getLogger().debug("setting log level: %s", level)
717
718 self._module_level = module_level
719 self._mgr_level = mgr_level
720
721 self._mgr_log_handler.setLevel(level)
722 self._file_log_handler.setLevel(level)
723
724 def _enable_file_log(self) -> None:
725 # enable file log
726 self.getLogger().warning("enabling logging to file")
727 self.log_to_file = True
728 self._root_logger.addHandler(self._file_log_handler)
729
730 def _disable_file_log(self) -> None:
731 # disable file log
732 self.getLogger().warning("disabling logging to file")
733 self.log_to_file = False
734 self._root_logger.removeHandler(self._file_log_handler)
735
736 def _enable_cluster_log(self) -> None:
737 # enable cluster log
738 self.getLogger().warning("enabling logging to cluster")
739 self.log_to_cluster = True
740 self._root_logger.addHandler(self._cluster_log_handler)
741
742 def _disable_cluster_log(self) -> None:
743 # disable cluster log
744 self.getLogger().warning("disabling logging to cluster")
745 self.log_to_cluster = False
746 self._root_logger.removeHandler(self._cluster_log_handler)
747
748 def _ceph_log_level_to_python(self, log_level: str) -> str:
749 if log_level:
750 try:
751 ceph_log_level = int(log_level.split("/", 1)[0])
752 except ValueError:
753 ceph_log_level = 0
754 else:
755 ceph_log_level = 0
756
757 log_level = "DEBUG"
758 if ceph_log_level <= 0:
759 log_level = "CRITICAL"
760 elif ceph_log_level <= 1:
761 log_level = "WARNING"
762 elif ceph_log_level <= 4:
763 log_level = "INFO"
764 return log_level
765
766 def getLogger(self, name: Optional[str] = None) -> logging.Logger:
767 return logging.getLogger(name)
768
769
770 class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
771 """
772 Standby modules only implement a serve and shutdown method, they
773 are not permitted to implement commands and they do not receive
774 any notifications.
775
776 They only have access to the mgrmap (for accessing service URI info
777 from their active peer), and to configuration settings (read only).
778 """
779
780 MODULE_OPTIONS: List[Option] = []
781 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
782
783 def __init__(self, module_name: str, capsule: Any):
784 super(MgrStandbyModule, self).__init__(capsule)
785 self.module_name = module_name
786
787 # see also MgrModule.__init__()
788 for o in self.MODULE_OPTIONS:
789 if 'default' in o:
790 if 'type' in o:
791 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
792 else:
793 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
794
795 # mock does not return a str
796 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
797 log_level = cast(str, self.get_module_option("log_level"))
798 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
799 self._configure_logging(mgr_level, log_level, cluster_level,
800 False, False)
801
802 # for backwards compatibility
803 self._logger = self.getLogger()
804
805 def __del__(self) -> None:
806 self._cleanup()
807 self._unconfigure_logging()
808
809 def _cleanup(self) -> None:
810 pass
811
812 @classmethod
813 def _register_options(cls, module_name: str) -> None:
814 cls.MODULE_OPTIONS.append(
815 Option(name='log_level', type='str', default="", runtime=True,
816 enum_allowed=['info', 'debug', 'critical', 'error',
817 'warning', '']))
818 cls.MODULE_OPTIONS.append(
819 Option(name='log_to_file', type='bool', default=False, runtime=True))
820 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
821 cls.MODULE_OPTIONS.append(
822 Option(name='log_to_cluster', type='bool', default=False,
823 runtime=True))
824 cls.MODULE_OPTIONS.append(
825 Option(name='log_to_cluster_level', type='str', default='info',
826 runtime=True,
827 enum_allowed=['info', 'debug', 'critical', 'error',
828 'warning', '']))
829
830 @property
831 def log(self) -> logging.Logger:
832 return self._logger
833
834 def serve(self) -> None:
835 """
836 The serve method is mandatory for standby modules.
837 :return:
838 """
839 raise NotImplementedError()
840
841 def get_mgr_id(self) -> str:
842 return self._ceph_get_mgr_id()
843
844 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
845 """
846 Retrieve the value of a persistent configuration setting
847
848 :param default: the default value of the config if it is not found
849 """
850 r = self._ceph_get_module_option(key)
851 if r is None:
852 return self.MODULE_OPTION_DEFAULTS.get(key, default)
853 else:
854 return r
855
856 def get_ceph_option(self, key: str) -> OptionValue:
857 return self._ceph_get_option(key)
858
859 def get_store(self, key: str) -> Optional[str]:
860 """
861 Retrieve the value of a persistent KV store entry
862
863 :param key: String
864 :return: Byte string or None
865 """
866 return self._ceph_get_store(key)
867
868 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
869 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
870 if r is None:
871 r = self._ceph_get_store(key)
872 if r is None:
873 r = default
874 return r
875
876 def get_active_uri(self) -> str:
877 return self._ceph_get_active_uri()
878
879 def get(self, data_name: str) -> Dict[str, Any]:
880 return self._ceph_get(data_name)
881
882 def get_mgr_ip(self) -> str:
883 ips = self.get("mgr_ips").get('ips', [])
884 if not ips:
885 return socket.gethostname()
886 return ips[0]
887
888 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
889 r = self._ceph_get_module_option(key, self.get_mgr_id())
890 if r is None:
891 return self.MODULE_OPTION_DEFAULTS.get(key, default)
892 else:
893 return r
894
895
896 HealthChecksT = Mapping[str, Mapping[str, Union[int, str, Sequence[str]]]]
897 # {"type": service_type, "id": service_id}
898 ServiceInfoT = Dict[str, str]
899 # {"hostname": hostname,
900 # "ceph_version": version,
901 # "services": [service_info, ..]}
902 ServerInfoT = Dict[str, Union[str, List[ServiceInfoT]]]
903 PerfCounterT = Dict[str, Any]
904
905
906 class API:
907 def DecoratorFactory(attr: str, default: Any): # type: ignore
908 class DecoratorClass:
909 _ATTR_TOKEN = f'__ATTR_{attr.upper()}__'
910
911 def __init__(self, value: Any=default) -> None:
912 self.value = value
913
914 def __call__(self, func: Callable) -> Any:
915 setattr(func, self._ATTR_TOKEN, self.value)
916 return func
917
918 @classmethod
919 def get(cls, func: Callable) -> Any:
920 return getattr(func, cls._ATTR_TOKEN, default)
921
922 return DecoratorClass
923
924 perm = DecoratorFactory('perm', default='r')
925 expose = DecoratorFactory('expose', default=False)(True)
926
927
928 class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
929 MGR_POOL_NAME = ".mgr"
930
931 COMMANDS = [] # type: List[Any]
932 MODULE_OPTIONS: List[Option] = []
933 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
934
935 # Database Schema
936 SCHEMA = None # type: Optional[str]
937 SCHEMA_VERSIONED = None # type: Optional[List[str]]
938
939 # Priority definitions for perf counters
940 PRIO_CRITICAL = 10
941 PRIO_INTERESTING = 8
942 PRIO_USEFUL = 5
943 PRIO_UNINTERESTING = 2
944 PRIO_DEBUGONLY = 0
945
946 # counter value types
947 PERFCOUNTER_TIME = 1
948 PERFCOUNTER_U64 = 2
949
950 # counter types
951 PERFCOUNTER_LONGRUNAVG = 4
952 PERFCOUNTER_COUNTER = 8
953 PERFCOUNTER_HISTOGRAM = 0x10
954 PERFCOUNTER_TYPE_MASK = ~3
955
956 # units supported
957 BYTES = 0
958 NONE = 1
959
960 # Cluster log priorities
961 class ClusterLogPrio(IntEnum):
962 DEBUG = 0
963 INFO = 1
964 SEC = 2
965 WARN = 3
966 ERROR = 4
967
968 def __init__(self, module_name: str, py_modules_ptr: object, this_ptr: object):
969 self.module_name = module_name
970 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
971
972 for o in self.MODULE_OPTIONS:
973 if 'default' in o:
974 if 'type' in o:
975 # we'll assume the declared type matches the
976 # supplied default value's type.
977 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
978 else:
979 # module not declaring it's type, so normalize the
980 # default value to be a string for consistent behavior
981 # with default and user-supplied option values.
982 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
983
984 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
985 log_level = cast(str, self.get_module_option("log_level"))
986 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
987 log_to_file = self.get_module_option("log_to_file")
988 assert isinstance(log_to_file, bool)
989 log_to_cluster = self.get_module_option("log_to_cluster")
990 assert isinstance(log_to_cluster, bool)
991 self._configure_logging(mgr_level, log_level, cluster_level,
992 log_to_file, log_to_cluster)
993
994 # for backwards compatibility
995 self._logger = self.getLogger()
996
997 self._db = None # type: Optional[sqlite3.Connection]
998
999 self._version = self._ceph_get_version()
1000
1001 self._perf_schema_cache = None
1002
1003 # Keep a librados instance for those that need it.
1004 self._rados: Optional[rados.Rados] = None
1005
1006 # this does not change over the lifetime of an active mgr
1007 self._mgr_ips: Optional[str] = None
1008
1009 self._db_lock = threading.Lock()
1010
1011 def __del__(self) -> None:
1012 self._unconfigure_logging()
1013
1014 @classmethod
1015 def _register_options(cls, module_name: str) -> None:
1016 cls.MODULE_OPTIONS.append(
1017 Option(name='log_level', type='str', default="", runtime=True,
1018 enum_allowed=['info', 'debug', 'critical', 'error',
1019 'warning', '']))
1020 cls.MODULE_OPTIONS.append(
1021 Option(name='log_to_file', type='bool', default=False, runtime=True))
1022 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
1023 cls.MODULE_OPTIONS.append(
1024 Option(name='log_to_cluster', type='bool', default=False,
1025 runtime=True))
1026 cls.MODULE_OPTIONS.append(
1027 Option(name='log_to_cluster_level', type='str', default='info',
1028 runtime=True,
1029 enum_allowed=['info', 'debug', 'critical', 'error',
1030 'warning', '']))
1031
1032 @classmethod
1033 def _register_commands(cls, module_name: str) -> None:
1034 cls.COMMANDS.extend(CLICommand.dump_cmd_list())
1035
1036 @property
1037 def log(self) -> logging.Logger:
1038 return self._logger
1039
1040 def cluster_log(self, channel: str, priority: ClusterLogPrio, message: str) -> None:
1041 """
1042 :param channel: The log channel. This can be 'cluster', 'audit', ...
1043 :param priority: The log message priority.
1044 :param message: The message to log.
1045 """
1046 self._ceph_cluster_log(channel, priority.value, message)
1047
1048 @property
1049 def version(self) -> str:
1050 return self._version
1051
1052 @API.expose
1053 def pool_exists(self, name: str) -> bool:
1054 pools = [p['pool_name'] for p in self.get('osd_map')['pools']]
1055 return name in pools
1056
1057 @API.expose
1058 def have_enough_osds(self) -> bool:
1059 # wait until we have enough OSDs to allow the pool to be healthy
1060 ready = 0
1061 for osd in self.get("osd_map")["osds"]:
1062 if osd["up"] and osd["in"]:
1063 ready += 1
1064
1065 need = cast(int, self.get_ceph_option("osd_pool_default_size"))
1066 return ready >= need
1067
1068 @API.perm('w')
1069 @API.expose
1070 def rename_pool(self, srcpool: str, destpool: str) -> None:
1071 c = {
1072 'prefix': 'osd pool rename',
1073 'format': 'json',
1074 'srcpool': srcpool,
1075 'destpool': destpool,
1076 }
1077 self.check_mon_command(c)
1078
1079 @API.perm('w')
1080 @API.expose
1081 def create_pool(self, pool: str) -> None:
1082 c = {
1083 'prefix': 'osd pool create',
1084 'format': 'json',
1085 'pool': pool,
1086 'pg_num': 1,
1087 'pg_num_min': 1,
1088 'pg_num_max': 32,
1089 }
1090 self.check_mon_command(c)
1091
1092 @API.perm('w')
1093 @API.expose
1094 def appify_pool(self, pool: str, app: str) -> None:
1095 c = {
1096 'prefix': 'osd pool application enable',
1097 'format': 'json',
1098 'pool': pool,
1099 'app': app,
1100 'yes_i_really_mean_it': True
1101 }
1102 self.check_mon_command(c)
1103
1104 @API.perm('w')
1105 @API.expose
1106 def create_mgr_pool(self) -> None:
1107 self.log.info("creating mgr pool")
1108
1109 ov = self.get_module_option_ex('devicehealth', 'pool_name', 'device_health_metrics')
1110 devhealth = cast(str, ov)
1111 if devhealth is not None and self.pool_exists(devhealth):
1112 self.log.debug("reusing devicehealth pool")
1113 self.rename_pool(devhealth, self.MGR_POOL_NAME)
1114 self.appify_pool(self.MGR_POOL_NAME, 'mgr')
1115 else:
1116 self.log.debug("creating new mgr pool")
1117 self.create_pool(self.MGR_POOL_NAME)
1118 self.appify_pool(self.MGR_POOL_NAME, 'mgr')
1119
1120 def create_skeleton_schema(self, db: sqlite3.Connection) -> None:
1121 SQL = """
1122 CREATE TABLE IF NOT EXISTS MgrModuleKV (
1123 key TEXT PRIMARY KEY,
1124 value NOT NULL
1125 ) WITHOUT ROWID;
1126 INSERT OR IGNORE INTO MgrModuleKV (key, value) VALUES ('__version', 0);
1127 """
1128
1129 db.executescript(SQL)
1130
1131 def update_schema_version(self, db: sqlite3.Connection, version: int) -> None:
1132 SQL = "UPDATE OR ROLLBACK MgrModuleKV SET value = ? WHERE key = '__version';"
1133
1134 db.execute(SQL, (version,))
1135
1136 def set_kv(self, key: str, value: Any) -> None:
1137 SQL = "INSERT OR REPLACE INTO MgrModuleKV (key, value) VALUES (?, ?);"
1138
1139 assert key[:2] != "__"
1140
1141 self.log.debug(f"set_kv('{key}', '{value}')")
1142
1143 with self._db_lock, self.db:
1144 self.db.execute(SQL, (key, value))
1145
1146 @API.expose
1147 def get_kv(self, key: str) -> Any:
1148 SQL = "SELECT value FROM MgrModuleKV WHERE key = ?;"
1149
1150 assert key[:2] != "__"
1151
1152 self.log.debug(f"get_kv('{key}')")
1153
1154 with self._db_lock, self.db:
1155 cur = self.db.execute(SQL, (key,))
1156 row = cur.fetchone()
1157 if row is None:
1158 return None
1159 else:
1160 v = row['value']
1161 self.log.debug(f" = {v}")
1162 return v
1163
1164 def maybe_upgrade(self, db: sqlite3.Connection, version: int) -> None:
1165 if version <= 0:
1166 self.log.info(f"creating main.db for {self.module_name}")
1167 assert self.SCHEMA is not None
1168 cur = db.executescript(self.SCHEMA)
1169 self.update_schema_version(db, 1)
1170 else:
1171 assert self.SCHEMA_VERSIONED is not None
1172 latest = len(self.SCHEMA_VERSIONED)
1173 if latest < version:
1174 raise RuntimeError(f"main.db version is newer ({version}) than module ({latest})")
1175 for i in range(version, latest):
1176 self.log.info(f"upgrading main.db for {self.module_name} from {i-1}:{i}")
1177 SQL = self.SCHEMA_VERSIONED[i]
1178 db.executescript(SQL)
1179 if version < latest:
1180 self.update_schema_version(db, latest)
1181
1182 def load_schema(self, db: sqlite3.Connection) -> None:
1183 SQL = """
1184 SELECT value FROM MgrModuleKV WHERE key = '__version';
1185 """
1186
1187 with db:
1188 self.create_skeleton_schema(db)
1189 cur = db.execute(SQL)
1190 row = cur.fetchone()
1191 self.maybe_upgrade(db, int(row['value']))
1192 assert cur.fetchone() is None
1193 cur.close()
1194
1195 def configure_db(self, db: sqlite3.Connection) -> None:
1196 db.execute('PRAGMA FOREIGN_KEYS = 1')
1197 db.execute('PRAGMA JOURNAL_MODE = PERSIST')
1198 db.execute('PRAGMA PAGE_SIZE = 65536')
1199 db.execute('PRAGMA CACHE_SIZE = 64')
1200 db.row_factory = sqlite3.Row
1201 self.load_schema(db)
1202
1203 def open_db(self) -> Optional[sqlite3.Connection]:
1204 if not self.pool_exists(self.MGR_POOL_NAME):
1205 if not self.have_enough_osds():
1206 return None
1207 self.create_mgr_pool()
1208 uri = f"file:///{self.MGR_POOL_NAME}:{self.module_name}/main.db?vfs=ceph";
1209 self.log.debug(f"using uri {uri}")
1210 db = sqlite3.connect(uri, check_same_thread=False, uri=True)
1211 self.configure_db(db)
1212 return db
1213
1214 @API.expose
1215 def db_ready(self) -> bool:
1216 with self._db_lock:
1217 try:
1218 return self.db is not None
1219 except MgrDBNotReady:
1220 return False
1221
1222 @property
1223 def db(self) -> sqlite3.Connection:
1224 assert self._db_lock.locked()
1225 if self._db is not None:
1226 return self._db
1227 db_allowed = self.get_ceph_option("mgr_pool")
1228 if not db_allowed:
1229 raise MgrDBNotReady();
1230 self._db = self.open_db()
1231 if self._db is None:
1232 raise MgrDBNotReady();
1233 return self._db
1234
1235 @property
1236 def release_name(self) -> str:
1237 """
1238 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
1239 :return: Returns the release name of the Ceph version in lower case.
1240 :rtype: str
1241 """
1242 return self._ceph_get_release_name()
1243
1244 @API.expose
1245 def lookup_release_name(self, major: int) -> str:
1246 return self._ceph_lookup_release_name(major)
1247
1248 def get_context(self) -> object:
1249 """
1250 :return: a Python capsule containing a C++ CephContext pointer
1251 """
1252 return self._ceph_get_context()
1253
1254 def notify(self, notify_type: NotifyType, notify_id: str) -> None:
1255 """
1256 Called by the ceph-mgr service to notify the Python plugin
1257 that new state is available. This method is *only* called for
1258 notify_types that are listed in the NOTIFY_TYPES string list
1259 member of the module class.
1260
1261 :param notify_type: string indicating what kind of notification,
1262 such as osd_map, mon_map, fs_map, mon_status,
1263 health, pg_summary, command, service_map
1264 :param notify_id: string (may be empty) that optionally specifies
1265 which entity is being notified about. With
1266 "command" notifications this is set to the tag
1267 ``from send_command``.
1268 """
1269 pass
1270
1271 def _config_notify(self) -> None:
1272 # check logging options for changes
1273 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
1274 module_level = cast(str, self.get_module_option("log_level"))
1275 cluster_level = cast(str, self.get_module_option("log_to_cluster_level"))
1276 assert isinstance(cluster_level, str)
1277 log_to_file = self.get_module_option("log_to_file", False)
1278 assert isinstance(log_to_file, bool)
1279 log_to_cluster = self.get_module_option("log_to_cluster", False)
1280 assert isinstance(log_to_cluster, bool)
1281 self._set_log_level(mgr_level, module_level, cluster_level)
1282
1283 if log_to_file != self.log_to_file:
1284 if log_to_file:
1285 self._enable_file_log()
1286 else:
1287 self._disable_file_log()
1288 if log_to_cluster != self.log_to_cluster:
1289 if log_to_cluster:
1290 self._enable_cluster_log()
1291 else:
1292 self._disable_cluster_log()
1293
1294 # call module subclass implementations
1295 self.config_notify()
1296
1297 def config_notify(self) -> None:
1298 """
1299 Called by the ceph-mgr service to notify the Python plugin
1300 that the configuration may have changed. Modules will want to
1301 refresh any configuration values stored in config variables.
1302 """
1303 pass
1304
1305 def serve(self) -> None:
1306 """
1307 Called by the ceph-mgr service to start any server that
1308 is provided by this Python plugin. The implementation
1309 of this function should block until ``shutdown`` is called.
1310
1311 You *must* implement ``shutdown`` if you implement ``serve``
1312 """
1313 pass
1314
1315 def shutdown(self) -> None:
1316 """
1317 Called by the ceph-mgr service to request that this
1318 module drop out of its serve() function. You do not
1319 need to implement this if you do not implement serve()
1320
1321 :return: None
1322 """
1323 if self._rados:
1324 addrs = self._rados.get_addrs()
1325 self._rados.shutdown()
1326 self._ceph_unregister_client(addrs)
1327
1328 @API.expose
1329 def get(self, data_name: str) -> Any:
1330 """
1331 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
1332
1333 :param str data_name: Valid things to fetch are osdmap_crush_map_text,
1334 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
1335 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
1336 health, mon_status, devices, device <devid>, pg_stats,
1337 pool_stats, pg_ready, osd_ping_times, mgr_map, mgr_ips,
1338 modified_config_options, service_map, mds_metadata,
1339 have_local_config_map, osd_pool_stats, pg_status.
1340
1341 Note:
1342 All these structures have their own JSON representations: experiment
1343 or look at the C++ ``dump()`` methods to learn about them.
1344 """
1345 obj = self._ceph_get(data_name)
1346 if isinstance(obj, bytes):
1347 obj = json.loads(obj)
1348
1349 return obj
1350
1351 def _stattype_to_str(self, stattype: int) -> str:
1352
1353 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
1354 if typeonly == 0:
1355 return 'gauge'
1356 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
1357 # this lie matches the DaemonState decoding: only val, no counts
1358 return 'counter'
1359 if typeonly == self.PERFCOUNTER_COUNTER:
1360 return 'counter'
1361 if typeonly == self.PERFCOUNTER_HISTOGRAM:
1362 return 'histogram'
1363
1364 return ''
1365
1366 def _perfpath_to_path_labels(self, daemon: str,
1367 path: str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]:
1368 if daemon.startswith('rgw.'):
1369 label_name = 'instance_id'
1370 daemon = daemon[len('rgw.'):]
1371 else:
1372 label_name = 'ceph_daemon'
1373
1374 label_names = (label_name,) # type: Tuple[str, ...]
1375 labels = (daemon,) # type: Tuple[str, ...]
1376
1377 if daemon.startswith('rbd-mirror.'):
1378 match = re.match(
1379 r'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
1380 path
1381 )
1382 if match:
1383 path = 'rbd_mirror_image_' + match.group(4)
1384 pool = match.group(1)
1385 namespace = match.group(2) or ''
1386 image = match.group(3)
1387 label_names += ('pool', 'namespace', 'image')
1388 labels += (pool, namespace, image)
1389
1390 return path, label_names, labels,
1391
1392 def _perfvalue_to_value(self, stattype: int, value: Union[int, float]) -> Union[float, int]:
1393 if stattype & self.PERFCOUNTER_TIME:
1394 # Convert from ns to seconds
1395 return value / 1000000000.0
1396 else:
1397 return value
1398
1399 def _unit_to_str(self, unit: int) -> str:
1400 if unit == self.NONE:
1401 return "/s"
1402 elif unit == self.BYTES:
1403 return "B/s"
1404 else:
1405 raise ValueError(f'bad unit "{unit}"')
1406
1407 @staticmethod
1408 def to_pretty_iec(n: int) -> str:
1409 for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
1410 (20, 'Mi'), (10, 'Ki')]:
1411 if n > 10 << bits:
1412 return str(n >> bits) + ' ' + suffix
1413 return str(n) + ' '
1414
1415 @staticmethod
1416 def get_pretty_row(elems: Sequence[str], width: int) -> str:
1417 """
1418 Takes an array of elements and returns a string with those elements
1419 formatted as a table row. Useful for polling modules.
1420
1421 :param elems: the elements to be printed
1422 :param width: the width of the terminal
1423 """
1424 n = len(elems)
1425 column_width = int(width / n)
1426
1427 ret = '|'
1428 for elem in elems:
1429 ret += '{0:>{w}} |'.format(elem, w=column_width - 2)
1430
1431 return ret
1432
1433 def get_pretty_header(self, elems: Sequence[str], width: int) -> str:
1434 """
1435 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
1436
1437 :param elems: the elements to be printed
1438 :param width: the width of the terminal
1439 """
1440 n = len(elems)
1441 column_width = int(width / n)
1442
1443 # dash line
1444 ret = '+'
1445 for i in range(0, n):
1446 ret += '-' * (column_width - 1) + '+'
1447 ret += '\n'
1448
1449 # title
1450 ret += self.get_pretty_row(elems, width)
1451 ret += '\n'
1452
1453 # dash line
1454 ret += '+'
1455 for i in range(0, n):
1456 ret += '-' * (column_width - 1) + '+'
1457 ret += '\n'
1458
1459 return ret
1460
1461 @API.expose
1462 def get_server(self, hostname: str) -> ServerInfoT:
1463 """
1464 Called by the plugin to fetch metadata about a particular hostname from
1465 ceph-mgr.
1466
1467 This is information that ceph-mgr has gleaned from the daemon metadata
1468 reported by daemons running on a particular server.
1469
1470 :param hostname: a hostname
1471 """
1472 return cast(ServerInfoT, self._ceph_get_server(hostname))
1473
1474 @API.expose
1475 def get_perf_schema(self,
1476 svc_type: str,
1477 svc_name: str) -> Dict[str,
1478 Dict[str, Dict[str, Union[str, int]]]]:
1479 """
1480 Called by the plugin to fetch perf counter schema info.
1481 svc_name can be nullptr, as can svc_type, in which case
1482 they are wildcards
1483
1484 :param str svc_type:
1485 :param str svc_name:
1486 :return: list of dicts describing the counters requested
1487 """
1488 return self._ceph_get_perf_schema(svc_type, svc_name)
1489
1490 def get_rocksdb_version(self) -> str:
1491 """
1492 Called by the plugin to fetch the latest RocksDB version number.
1493
1494 :return: str representing the major, minor, and patch RocksDB version numbers
1495 """
1496 return self._ceph_get_rocksdb_version()
1497
1498 @API.expose
1499 def get_counter(self,
1500 svc_type: str,
1501 svc_name: str,
1502 path: str) -> Dict[str, List[Tuple[float, int]]]:
1503 """
1504 Called by the plugin to fetch the latest performance counter data for a
1505 particular counter on a particular service.
1506
1507 :param str svc_type:
1508 :param str svc_name:
1509 :param str path: a period-separated concatenation of the subsystem and the
1510 counter name, for example "mds.inodes".
1511 :return: A dict of counter names to their values. each value is a list of
1512 of two-tuples of (timestamp, value). This may be empty if no data is
1513 available.
1514 """
1515 return self._ceph_get_counter(svc_type, svc_name, path)
1516
1517 @API.expose
1518 def get_latest_counter(self,
1519 svc_type: str,
1520 svc_name: str,
1521 path: str) -> Dict[str, Union[Tuple[float, int],
1522 Tuple[float, int, int]]]:
1523 """
1524 Called by the plugin to fetch only the newest performance counter data
1525 point for a particular counter on a particular service.
1526
1527 :param str svc_type:
1528 :param str svc_name:
1529 :param str path: a period-separated concatenation of the subsystem and the
1530 counter name, for example "mds.inodes".
1531 :return: A list of two-tuples of (timestamp, value) or three-tuple of
1532 (timestamp, value, count) is returned. This may be empty if no
1533 data is available.
1534 """
1535 return self._ceph_get_latest_counter(svc_type, svc_name, path)
1536
1537 @API.expose
1538 def list_servers(self) -> List[ServerInfoT]:
1539 """
1540 Like ``get_server``, but gives information about all servers (i.e. all
1541 unique hostnames that have been mentioned in daemon metadata)
1542
1543 :return: a list of information about all servers
1544 :rtype: list
1545 """
1546 return cast(List[ServerInfoT], self._ceph_get_server(None))
1547
1548 def get_metadata(self,
1549 svc_type: str,
1550 svc_id: str,
1551 default: Optional[Dict[str, str]] = None) -> Optional[Dict[str, str]]:
1552 """
1553 Fetch the daemon metadata for a particular service.
1554
1555 ceph-mgr fetches metadata asynchronously, so are windows of time during
1556 addition/removal of services where the metadata is not available to
1557 modules. ``None`` is returned if no metadata is available.
1558
1559 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1560 :param str svc_id: service id. convert OSD integer IDs to strings when
1561 calling this
1562 :rtype: dict, or None if no metadata found
1563 """
1564 metadata = self._ceph_get_metadata(svc_type, svc_id)
1565 if metadata is None:
1566 return default
1567 return metadata
1568
1569 @API.expose
1570 def get_daemon_status(self, svc_type: str, svc_id: str) -> Dict[str, str]:
1571 """
1572 Fetch the latest status for a particular service daemon.
1573
1574 This method may return ``None`` if no status information is
1575 available, for example because the daemon hasn't fully started yet.
1576
1577 :param svc_type: string (e.g., 'rgw')
1578 :param svc_id: string
1579 :return: dict, or None if the service is not found
1580 """
1581 return self._ceph_get_daemon_status(svc_type, svc_id)
1582
1583 def check_mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> HandleCommandResult:
1584 """
1585 Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
1586 if ``retval != 0``.
1587 """
1588
1589 r = HandleCommandResult(*self.mon_command(cmd_dict, inbuf))
1590 if r.retval:
1591 raise MonCommandFailed(f'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
1592 return r
1593
1594 def mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1595 """
1596 Helper for modules that do simple, synchronous mon command
1597 execution.
1598
1599 See send_command for general case.
1600
1601 :return: status int, out std, err str
1602 """
1603
1604 t1 = time.time()
1605 result = CommandResult()
1606 self.send_command(result, "mon", "", json.dumps(cmd_dict), "", inbuf)
1607 r = result.wait()
1608 t2 = time.time()
1609
1610 self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1611 cmd_dict['prefix'], r[0], t2 - t1
1612 ))
1613
1614 return r
1615
1616 def osd_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1617 """
1618 Helper for osd command execution.
1619
1620 See send_command for general case. Also, see osd/OSD.cc for available commands.
1621
1622 :param dict cmd_dict: expects a prefix and an osd id, i.e.:
1623 cmd_dict = {
1624 'prefix': 'perf histogram dump',
1625 'id': '0'
1626 }
1627 :return: status int, out std, err str
1628 """
1629 t1 = time.time()
1630 result = CommandResult()
1631 self.send_command(result, "osd", cmd_dict['id'], json.dumps(cmd_dict), "", inbuf)
1632 r = result.wait()
1633 t2 = time.time()
1634
1635 self.log.debug("osd_command: '{0}' -> {1} in {2:.3f}s".format(
1636 cmd_dict['prefix'], r[0], t2 - t1
1637 ))
1638
1639 return r
1640
1641 def tell_command(self, daemon_type: str, daemon_id: str, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1642 """
1643 Helper for `ceph tell` command execution.
1644
1645 See send_command for general case.
1646
1647 :param dict cmd_dict: expects a prefix i.e.:
1648 cmd_dict = {
1649 'prefix': 'heap',
1650 'heapcmd': 'stats',
1651 }
1652 :return: status int, out std, err str
1653 """
1654 t1 = time.time()
1655 result = CommandResult()
1656 self.send_command(result, daemon_type, daemon_id, json.dumps(cmd_dict), "", inbuf)
1657 r = result.wait()
1658 t2 = time.time()
1659
1660 self.log.debug("tell_command on {0}.{1}: '{2}' -> {3} in {4:.5f}s".format(
1661 daemon_type, daemon_id, cmd_dict['prefix'], r[0], t2 - t1
1662 ))
1663
1664 return r
1665
1666 def send_command(
1667 self,
1668 result: CommandResult,
1669 svc_type: str,
1670 svc_id: str,
1671 command: str,
1672 tag: str,
1673 inbuf: Optional[str] = None) -> None:
1674 """
1675 Called by the plugin to send a command to the mon
1676 cluster.
1677
1678 :param CommandResult result: an instance of the ``CommandResult``
1679 class, defined in the same module as MgrModule. This acts as a
1680 completion and stores the output of the command. Use
1681 ``CommandResult.wait()`` if you want to block on completion.
1682 :param str svc_type:
1683 :param str svc_id:
1684 :param str command: a JSON-serialized command. This uses the same
1685 format as the ceph command line, which is a dictionary of command
1686 arguments, with the extra ``prefix`` key containing the command
1687 name itself. Consult MonCommands.h for available commands and
1688 their expected arguments.
1689 :param str tag: used for nonblocking operation: when a command
1690 completes, the ``notify()`` callback on the MgrModule instance is
1691 triggered, with notify_type set to "command", and notify_id set to
1692 the tag of the command.
1693 :param str inbuf: input buffer for sending additional data.
1694 """
1695 self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
1696
1697 def tool_exec(
1698 self,
1699 args: List[str],
1700 timeout: int = 10,
1701 stdin: Optional[bytes] = None
1702 ) -> Tuple[int, str, str]:
1703 try:
1704 tool = args.pop(0)
1705 cmd = [
1706 tool,
1707 '-k', str(self.get_ceph_option('keyring')),
1708 '-n', f'mgr.{self.get_mgr_id()}',
1709 ] + args
1710 self.log.debug('exec: ' + ' '.join(cmd))
1711 p = subprocess.run(
1712 cmd,
1713 input=stdin,
1714 stdout=subprocess.PIPE,
1715 stderr=subprocess.PIPE,
1716 timeout=timeout,
1717 )
1718 except subprocess.TimeoutExpired as ex:
1719 self.log.error(ex)
1720 return -errno.ETIMEDOUT, '', str(ex)
1721 if p.returncode:
1722 self.log.error(f'Non-zero return from {cmd}: {p.stderr.decode()}')
1723 return p.returncode, p.stdout.decode(), p.stderr.decode()
1724
1725 def set_health_checks(self, checks: HealthChecksT) -> None:
1726 """
1727 Set the module's current map of health checks. Argument is a
1728 dict of check names to info, in this form:
1729
1730 ::
1731
1732 {
1733 'CHECK_FOO': {
1734 'severity': 'warning', # or 'error'
1735 'summary': 'summary string',
1736 'count': 4, # quantify badness
1737 'detail': [ 'list', 'of', 'detail', 'strings' ],
1738 },
1739 'CHECK_BAR': {
1740 'severity': 'error',
1741 'summary': 'bars are bad',
1742 'detail': [ 'too hard' ],
1743 },
1744 }
1745
1746 :param list: dict of health check dicts
1747 """
1748 self._ceph_set_health_checks(checks)
1749
1750 def _handle_command(self,
1751 inbuf: str,
1752 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1753 Tuple[int, str, str]]:
1754 if cmd['prefix'] not in CLICommand.COMMANDS:
1755 return self.handle_command(inbuf, cmd)
1756
1757 return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
1758
1759 def handle_command(self,
1760 inbuf: str,
1761 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1762 Tuple[int, str, str]]:
1763 """
1764 Called by ceph-mgr to request the plugin to handle one
1765 of the commands that it declared in self.COMMANDS
1766
1767 Return a status code, an output buffer, and an
1768 output string. The output buffer is for data results,
1769 the output string is for informative text.
1770
1771 :param inbuf: content of any "-i <file>" supplied to ceph cli
1772 :type inbuf: str
1773 :param cmd: from Ceph's cmdmap_t
1774 :type cmd: dict
1775
1776 :return: HandleCommandResult or a 3-tuple of (int, str, str)
1777 """
1778
1779 # Should never get called if they didn't declare
1780 # any ``COMMANDS``
1781 raise NotImplementedError()
1782
1783 def get_mgr_id(self) -> str:
1784 """
1785 Retrieve the name of the manager daemon where this plugin
1786 is currently being executed (i.e. the active manager).
1787
1788 :return: str
1789 """
1790 return self._ceph_get_mgr_id()
1791
1792 @API.expose
1793 def get_ceph_conf_path(self) -> str:
1794 return self._ceph_get_ceph_conf_path()
1795
1796 @API.expose
1797 def get_mgr_ip(self) -> str:
1798 if not self._mgr_ips:
1799 ips = self.get("mgr_ips").get('ips', [])
1800 if not ips:
1801 return socket.gethostname()
1802 self._mgr_ips = ips[0]
1803 assert self._mgr_ips is not None
1804 return self._mgr_ips
1805
1806 @API.expose
1807 def get_ceph_option(self, key: str) -> OptionValue:
1808 return self._ceph_get_option(key)
1809
1810 @API.expose
1811 def get_foreign_ceph_option(self, entity: str, key: str) -> OptionValue:
1812 return self._ceph_get_foreign_option(entity, key)
1813
1814 def _validate_module_option(self, key: str) -> None:
1815 """
1816 Helper: don't allow get/set config callers to
1817 access config options that they didn't declare
1818 in their schema.
1819 """
1820 if key not in [o['name'] for o in self.MODULE_OPTIONS]:
1821 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1822 format(key, self.__class__.__name__))
1823
1824 def _get_module_option(self,
1825 key: str,
1826 default: OptionValue,
1827 localized_prefix: str = "") -> OptionValue:
1828 r = self._ceph_get_module_option(self.module_name, key,
1829 localized_prefix)
1830 if r is None:
1831 return self.MODULE_OPTION_DEFAULTS.get(key, default)
1832 else:
1833 return r
1834
1835 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
1836 """
1837 Retrieve the value of a persistent configuration setting
1838 """
1839 self._validate_module_option(key)
1840 return self._get_module_option(key, default)
1841
1842 def get_module_option_ex(self, module: str,
1843 key: str,
1844 default: OptionValue = None) -> OptionValue:
1845 """
1846 Retrieve the value of a persistent configuration setting
1847 for the specified module.
1848
1849 :param module: The name of the module, e.g. 'dashboard'
1850 or 'telemetry'.
1851 :param key: The configuration key, e.g. 'server_addr'.
1852 :param default: The default value to use when the
1853 returned value is ``None``. Defaults to ``None``.
1854 """
1855 if module == self.module_name:
1856 self._validate_module_option(key)
1857 r = self._ceph_get_module_option(module, key)
1858 return default if r is None else r
1859
1860 @API.expose
1861 def get_store_prefix(self, key_prefix: str) -> Dict[str, str]:
1862 """
1863 Retrieve a dict of KV store keys to values, where the keys
1864 have the given prefix
1865
1866 :param str key_prefix:
1867 :return: str
1868 """
1869 return self._ceph_get_store_prefix(key_prefix)
1870
1871 def _set_localized(self,
1872 key: str,
1873 val: Optional[str],
1874 setter: Callable[[str, Optional[str]], None]) -> None:
1875 return setter(_get_localized_key(self.get_mgr_id(), key), val)
1876
1877 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
1878 """
1879 Retrieve localized configuration for this ceph-mgr instance
1880 """
1881 self._validate_module_option(key)
1882 return self._get_module_option(key, default, self.get_mgr_id())
1883
1884 def _set_module_option(self, key: str, val: Any) -> None:
1885 return self._ceph_set_module_option(self.module_name, key,
1886 None if val is None else str(val))
1887
1888 def set_module_option(self, key: str, val: Any) -> None:
1889 """
1890 Set the value of a persistent configuration setting
1891
1892 :param str key:
1893 :type val: str | None
1894 :raises ValueError: if `val` cannot be parsed or it is out of the specified range
1895 """
1896 self._validate_module_option(key)
1897 return self._set_module_option(key, val)
1898
1899 def set_module_option_ex(self, module: str, key: str, val: OptionValue) -> None:
1900 """
1901 Set the value of a persistent configuration setting
1902 for the specified module.
1903
1904 :param str module:
1905 :param str key:
1906 :param str val:
1907 """
1908 if module == self.module_name:
1909 self._validate_module_option(key)
1910 return self._ceph_set_module_option(module, key, str(val))
1911
1912 @API.perm('w')
1913 @API.expose
1914 def set_localized_module_option(self, key: str, val: Optional[str]) -> None:
1915 """
1916 Set localized configuration for this ceph-mgr instance
1917 :param str key:
1918 :param str val:
1919 :return: str
1920 """
1921 self._validate_module_option(key)
1922 return self._set_localized(key, val, self._set_module_option)
1923
1924 @API.perm('w')
1925 @API.expose
1926 def set_store(self, key: str, val: Optional[str]) -> None:
1927 """
1928 Set a value in this module's persistent key value store.
1929 If val is None, remove key from store
1930 """
1931 self._ceph_set_store(key, val)
1932
1933 @API.expose
1934 def get_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
1935 """
1936 Get a value from this module's persistent key value store
1937 """
1938 r = self._ceph_get_store(key)
1939 if r is None:
1940 return default
1941 else:
1942 return r
1943
1944 @API.expose
1945 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
1946 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
1947 if r is None:
1948 r = self._ceph_get_store(key)
1949 if r is None:
1950 r = default
1951 return r
1952
1953 @API.perm('w')
1954 @API.expose
1955 def set_localized_store(self, key: str, val: Optional[str]) -> None:
1956 return self._set_localized(key, val, self.set_store)
1957
1958 def self_test(self) -> Optional[str]:
1959 """
1960 Run a self-test on the module. Override this function and implement
1961 a best as possible self-test for (automated) testing of the module
1962
1963 Indicate any failures by raising an exception. This does not have
1964 to be pretty, it's mainly for picking up regressions during
1965 development, rather than use in the field.
1966
1967 :return: None, or an advisory string for developer interest, such
1968 as a json dump of some state.
1969 """
1970 pass
1971
1972 def get_osdmap(self) -> OSDMap:
1973 """
1974 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1975 OSDMap.
1976 :return: OSDMap
1977 """
1978 return cast(OSDMap, self._ceph_get_osdmap())
1979
1980 @API.expose
1981 def get_latest(self, daemon_type: str, daemon_name: str, counter: str) -> int:
1982 data = self.get_latest_counter(
1983 daemon_type, daemon_name, counter)[counter]
1984 if data:
1985 return data[1]
1986 else:
1987 return 0
1988
1989 @API.expose
1990 def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tuple[int, int]:
1991 data = self.get_latest_counter(
1992 daemon_type, daemon_name, counter)[counter]
1993 if data:
1994 # https://github.com/python/mypy/issues/1178
1995 _, value, count = cast(Tuple[float, int, int], data)
1996 return value, count
1997 else:
1998 return 0, 0
1999
2000 @API.expose
2001 @profile_method()
2002 def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL,
2003 services: Sequence[str] = ("mds", "mon", "osd",
2004 "rbd-mirror", "rgw",
2005 "tcmu-runner")) -> Dict[str, dict]:
2006 """
2007 Return the perf counters currently known to this ceph-mgr
2008 instance, filtered by priority equal to or greater than `prio_limit`.
2009
2010 The result is a map of string to dict, associating services
2011 (like "osd.123") with their counters. The counter
2012 dict for each service maps counter paths to a counter
2013 info structure, which is the information from
2014 the schema, plus an additional "value" member with the latest
2015 value.
2016 """
2017
2018 result = defaultdict(dict) # type: Dict[str, dict]
2019
2020 for server in self.list_servers():
2021 for service in cast(List[ServiceInfoT], server['services']):
2022 if service['type'] not in services:
2023 continue
2024
2025 schemas = self.get_perf_schema(service['type'], service['id'])
2026 if not schemas:
2027 self.log.warning("No perf counter schema for {0}.{1}".format(
2028 service['type'], service['id']
2029 ))
2030 continue
2031
2032 # Value is returned in a potentially-multi-service format,
2033 # get just the service we're asking about
2034 svc_full_name = "{0}.{1}".format(
2035 service['type'], service['id'])
2036 schema = schemas[svc_full_name]
2037
2038 # Populate latest values
2039 for counter_path, counter_schema in schema.items():
2040 # self.log.debug("{0}: {1}".format(
2041 # counter_path, json.dumps(counter_schema)
2042 # ))
2043 priority = counter_schema['priority']
2044 assert isinstance(priority, int)
2045 if priority < prio_limit:
2046 continue
2047
2048 tp = counter_schema['type']
2049 assert isinstance(tp, int)
2050 counter_info = dict(counter_schema)
2051 # Also populate count for the long running avgs
2052 if tp & self.PERFCOUNTER_LONGRUNAVG:
2053 v, c = self.get_latest_avg(
2054 service['type'],
2055 service['id'],
2056 counter_path
2057 )
2058 counter_info['value'], counter_info['count'] = v, c
2059 result[svc_full_name][counter_path] = counter_info
2060 else:
2061 counter_info['value'] = self.get_latest(
2062 service['type'],
2063 service['id'],
2064 counter_path
2065 )
2066
2067 result[svc_full_name][counter_path] = counter_info
2068
2069 self.log.debug("returning {0} counter".format(len(result)))
2070
2071 return result
2072
2073 @API.expose
2074 def set_uri(self, uri: str) -> None:
2075 """
2076 If the module exposes a service, then call this to publish the
2077 address once it is available.
2078
2079 :return: a string
2080 """
2081 return self._ceph_set_uri(uri)
2082
2083 @API.perm('w')
2084 @API.expose
2085 def set_device_wear_level(self, devid: str, wear_level: float) -> None:
2086 return self._ceph_set_device_wear_level(devid, wear_level)
2087
2088 @API.expose
2089 def have_mon_connection(self) -> bool:
2090 """
2091 Check whether this ceph-mgr daemon has an open connection
2092 to a monitor. If it doesn't, then it's likely that the
2093 information we have about the cluster is out of date,
2094 and/or the monitor cluster is down.
2095 """
2096
2097 return self._ceph_have_mon_connection()
2098
2099 def update_progress_event(self,
2100 evid: str,
2101 desc: str,
2102 progress: float,
2103 add_to_ceph_s: bool) -> None:
2104 return self._ceph_update_progress_event(evid, desc, progress, add_to_ceph_s)
2105
2106 @API.perm('w')
2107 @API.expose
2108 def complete_progress_event(self, evid: str) -> None:
2109 return self._ceph_complete_progress_event(evid)
2110
2111 @API.perm('w')
2112 @API.expose
2113 def clear_all_progress_events(self) -> None:
2114 return self._ceph_clear_all_progress_events()
2115
2116 @property
2117 def rados(self) -> rados.Rados:
2118 """
2119 A librados instance to be shared by any classes within
2120 this mgr module that want one.
2121 """
2122 if self._rados:
2123 return self._rados
2124
2125 ctx_capsule = self.get_context()
2126 self._rados = rados.Rados(context=ctx_capsule)
2127 self._rados.connect()
2128 self._ceph_register_client(self._rados.get_addrs())
2129 return self._rados
2130
2131 @staticmethod
2132 def can_run() -> Tuple[bool, str]:
2133 """
2134 Implement this function to report whether the module's dependencies
2135 are met. For example, if the module needs to import a particular
2136 dependency to work, then use a try/except around the import at
2137 file scope, and then report here if the import failed.
2138
2139 This will be called in a blocking way from the C++ code, so do not
2140 do any I/O that could block in this function.
2141
2142 :return a 2-tuple consisting of a boolean and explanatory string
2143 """
2144
2145 return True, ""
2146
2147 @API.expose
2148 def remote(self, module_name: str, method_name: str, *args: Any, **kwargs: Any) -> Any:
2149 """
2150 Invoke a method on another module. All arguments, and the return
2151 value from the other module must be serializable.
2152
2153 Limitation: Do not import any modules within the called method.
2154 Otherwise you will get an error in Python 2::
2155
2156 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
2157
2158
2159
2160 :param module_name: Name of other module. If module isn't loaded,
2161 an ImportError exception is raised.
2162 :param method_name: Method name. If it does not exist, a NameError
2163 exception is raised.
2164 :param args: Argument tuple
2165 :param kwargs: Keyword argument dict
2166 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
2167 :raises ImportError: No such module
2168 """
2169 return self._ceph_dispatch_remote(module_name, method_name,
2170 args, kwargs)
2171
2172 def add_osd_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
2173 """
2174 Register an OSD perf query. Argument is a
2175 dict of the query parameters, in this form:
2176
2177 ::
2178
2179 {
2180 'key_descriptor': [
2181 {'type': subkey_type, 'regex': regex_pattern},
2182 ...
2183 ],
2184 'performance_counter_descriptors': [
2185 list, of, descriptor, types
2186 ],
2187 'limit': {'order_by': performance_counter_type, 'max_count': n},
2188 }
2189
2190 Valid subkey types:
2191 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
2192 'pg_id', 'object_name', 'snap_id'
2193 Valid performance counter types:
2194 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
2195 'latency', 'write_latency', 'read_latency'
2196
2197 :param object query: query
2198 :rtype: int (query id)
2199 """
2200 return self._ceph_add_osd_perf_query(query)
2201
2202 @API.perm('w')
2203 @API.expose
2204 def remove_osd_perf_query(self, query_id: int) -> None:
2205 """
2206 Unregister an OSD perf query.
2207
2208 :param int query_id: query ID
2209 """
2210 return self._ceph_remove_osd_perf_query(query_id)
2211
2212 @API.expose
2213 def get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
2214 """
2215 Get stats collected for an OSD perf query.
2216
2217 :param int query_id: query ID
2218 """
2219 return self._ceph_get_osd_perf_counters(query_id)
2220
2221 def add_mds_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
2222 """
2223 Register an MDS perf query. Argument is a
2224 dict of the query parameters, in this form:
2225
2226 ::
2227
2228 {
2229 'key_descriptor': [
2230 {'type': subkey_type, 'regex': regex_pattern},
2231 ...
2232 ],
2233 'performance_counter_descriptors': [
2234 list, of, descriptor, types
2235 ],
2236 }
2237
2238 NOTE: 'limit' and 'order_by' are not supported (yet).
2239
2240 Valid subkey types:
2241 'mds_rank', 'client_id'
2242 Valid performance counter types:
2243 'cap_hit_metric'
2244
2245 :param object query: query
2246 :rtype: int (query id)
2247 """
2248 return self._ceph_add_mds_perf_query(query)
2249
2250 @API.perm('w')
2251 @API.expose
2252 def remove_mds_perf_query(self, query_id: int) -> None:
2253 """
2254 Unregister an MDS perf query.
2255
2256 :param int query_id: query ID
2257 """
2258 return self._ceph_remove_mds_perf_query(query_id)
2259
2260 @API.expose
2261
2262 def reregister_mds_perf_queries(self) -> None:
2263 """
2264 Re-register MDS perf queries.
2265 """
2266 return self._ceph_reregister_mds_perf_queries()
2267
2268 def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
2269 """
2270 Get stats collected for an MDS perf query.
2271
2272 :param int query_id: query ID
2273 """
2274 return self._ceph_get_mds_perf_counters(query_id)
2275
2276 def is_authorized(self, arguments: Dict[str, str]) -> bool:
2277 """
2278 Verifies that the current session caps permit executing the py service
2279 or current module with the provided arguments. This provides a generic
2280 way to allow modules to restrict by more fine-grained controls (e.g.
2281 pools).
2282
2283 :param arguments: dict of key/value arguments to test
2284 """
2285 return self._ceph_is_authorized(arguments)
2286
2287 @API.expose
2288 def send_rgwadmin_command(self, args: List[str],
2289 stdout_as_json: bool = True) -> Tuple[int, Union[str, dict], str]:
2290 try:
2291 cmd = [
2292 'radosgw-admin',
2293 '-c', str(self.get_ceph_conf_path()),
2294 '-k', str(self.get_ceph_option('keyring')),
2295 '-n', f'mgr.{self.get_mgr_id()}',
2296 ] + args
2297 self.log.debug('Executing %s', str(cmd))
2298 result = subprocess.run( # pylint: disable=subprocess-run-check
2299 cmd,
2300 stdout=subprocess.PIPE,
2301 stderr=subprocess.PIPE,
2302 timeout=10,
2303 )
2304 stdout = result.stdout.decode('utf-8')
2305 stderr = result.stderr.decode('utf-8')
2306 if stdout and stdout_as_json:
2307 stdout = json.loads(stdout)
2308 if result.returncode:
2309 self.log.debug('Error %s executing %s: %s', result.returncode, str(cmd), stderr)
2310 return result.returncode, stdout, stderr
2311 except subprocess.CalledProcessError as ex:
2312 self.log.exception('Error executing radosgw-admin %s: %s', str(ex.cmd), str(ex.output))
2313 raise
2314 except subprocess.TimeoutExpired as ex:
2315 self.log.error('Timeout (10s) executing radosgw-admin %s', str(ex.cmd))
2316 raise