]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
1 import ceph_module # noqa
2
3 from typing import cast, Tuple, Any, Dict, Generic, Optional, Callable, List, \
4 Mapping, NamedTuple, Sequence, Union, Set, TYPE_CHECKING
5 if TYPE_CHECKING:
6 import sys
7 if sys.version_info >= (3, 8):
8 from typing import Literal
9 else:
10 from typing_extensions import Literal
11
12 import inspect
13 import logging
14 import errno
15 import functools
16 import json
17 import subprocess
18 import threading
19 from collections import defaultdict
20 from enum import IntEnum, Enum
21 import rados
22 import re
23 import socket
24 import sqlite3
25 import sys
26 import time
27 from ceph_argparse import CephArgtype
28 from mgr_util import profile_method
29
30 if sys.version_info >= (3, 8):
31 from typing import get_args, get_origin
32 else:
33 def get_args(tp: Any) -> Any:
34 if tp is Generic:
35 return tp
36 else:
37 return getattr(tp, '__args__', ())
38
39 def get_origin(tp: Any) -> Any:
40 return getattr(tp, '__origin__', None)
41
42
43 ERROR_MSG_EMPTY_INPUT_FILE = 'Empty input file'
44 ERROR_MSG_NO_INPUT_FILE = 'Input file not specified'
45 # Full list of strings in "osd_types.cc:pg_state_string()"
46 PG_STATES = [
47 "active",
48 "clean",
49 "down",
50 "recovery_unfound",
51 "backfill_unfound",
52 "scrubbing",
53 "degraded",
54 "inconsistent",
55 "peering",
56 "repair",
57 "recovering",
58 "forced_recovery",
59 "backfill_wait",
60 "incomplete",
61 "stale",
62 "remapped",
63 "deep",
64 "backfilling",
65 "forced_backfill",
66 "backfill_toofull",
67 "recovery_wait",
68 "recovery_toofull",
69 "undersized",
70 "activating",
71 "peered",
72 "snaptrim",
73 "snaptrim_wait",
74 "snaptrim_error",
75 "creating",
76 "unknown",
77 "premerge",
78 "failed_repair",
79 "laggy",
80 "wait",
81 ]
82
83 NFS_GANESHA_SUPPORTED_FSALS = ['CEPH', 'RGW']
84 NFS_POOL_NAME = '.nfs'
85
86
87 class NotifyType(str, Enum):
88 mon_map = 'mon_map'
89 pg_summary = 'pg_summary'
90 health = 'health'
91 clog = 'clog'
92 osd_map = 'osd_map'
93 fs_map = 'fs_map'
94 command = 'command'
95
96 # these are disabled because there are no users.
97 # see Mgr.cc:
98 # service_map = 'service_map'
99 # mon_status = 'mon_status'
100 # see DaemonServer.cc:
101 # perf_schema_update = 'perf_schema_update'
102
103
104 class CommandResult(object):
105 """
106 Use with MgrModule.send_command
107 """
108
109 def __init__(self, tag: Optional[str] = None):
110 self.ev = threading.Event()
111 self.outs = ""
112 self.outb = ""
113 self.r = 0
114
115 # This is just a convenience for notifications from
116 # C++ land, to avoid passing addresses around in messages.
117 self.tag = tag if tag else ""
118
119 def complete(self, r: int, outb: str, outs: str) -> None:
120 self.r = r
121 self.outb = outb
122 self.outs = outs
123 self.ev.set()
124
125 def wait(self) -> Tuple[int, str, str]:
126 self.ev.wait()
127 return self.r, self.outb, self.outs
128
129
130 class HandleCommandResult(NamedTuple):
131 """
132 Tuple containing the result of `handle_command()`
133
134 Only write to stderr if there is an error, or in extraordinary circumstances
135
136 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
137 is critical information to include there.
138
139 Everything programmatically consumable should be put on stdout
140 """
141 retval: int = 0 # return code. E.g. 0 or -errno.EINVAL
142 stdout: str = "" # data of this result.
143 stderr: str = "" # Typically used for error messages.
144
145
146 class MonCommandFailed(RuntimeError): pass
147 class MgrDBNotReady(RuntimeError): pass
148
149
150 class OSDMap(ceph_module.BasePyOSDMap):
151 def get_epoch(self) -> int:
152 return self._get_epoch()
153
154 def get_crush_version(self) -> int:
155 return self._get_crush_version()
156
157 def dump(self) -> Dict[str, Any]:
158 return self._dump()
159
160 def get_pools(self) -> Dict[int, Dict[str, Any]]:
161 # FIXME: efficient implementation
162 d = self._dump()
163 return dict([(p['pool'], p) for p in d['pools']])
164
165 def get_pools_by_name(self) -> Dict[str, Dict[str, Any]]:
166 # FIXME: efficient implementation
167 d = self._dump()
168 return dict([(p['pool_name'], p) for p in d['pools']])
169
170 def new_incremental(self) -> 'OSDMapIncremental':
171 return self._new_incremental()
172
173 def apply_incremental(self, inc: 'OSDMapIncremental') -> 'OSDMap':
174 return self._apply_incremental(inc)
175
176 def get_crush(self) -> 'CRUSHMap':
177 return self._get_crush()
178
179 def get_pools_by_take(self, take: int) -> List[int]:
180 return self._get_pools_by_take(take).get('pools', [])
181
182 def calc_pg_upmaps(self, inc: 'OSDMapIncremental',
183 max_deviation: int,
184 max_iterations: int = 10,
185 pools: Optional[List[str]] = None) -> int:
186 if pools is None:
187 pools = []
188 return self._calc_pg_upmaps(
189 inc,
190 max_deviation, max_iterations, pools)
191
192 def map_pool_pgs_up(self, poolid: int) -> List[int]:
193 return self._map_pool_pgs_up(poolid)
194
195 def pg_to_up_acting_osds(self, pool_id: int, ps: int) -> Dict[str, Any]:
196 return self._pg_to_up_acting_osds(pool_id, ps)
197
198 def pool_raw_used_rate(self, pool_id: int) -> float:
199 return self._pool_raw_used_rate(pool_id)
200
201 @classmethod
202 def build_simple(cls, epoch: int = 1, uuid: Optional[str] = None, num_osd: int = -1) -> 'ceph_module.BasePyOSDMap':
203 return cls._build_simple(epoch, uuid, num_osd)
204
205 def get_ec_profile(self, name: str) -> Optional[List[Dict[str, str]]]:
206 # FIXME: efficient implementation
207 d = self._dump()
208 return d['erasure_code_profiles'].get(name, None)
209
210 def get_require_osd_release(self) -> str:
211 d = self._dump()
212 return d['require_osd_release']
213
214
215 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
216 def get_epoch(self) -> int:
217 return self._get_epoch()
218
219 def dump(self) -> Dict[str, Any]:
220 return self._dump()
221
222 def set_osd_reweights(self, weightmap: Dict[int, float]) -> None:
223 """
224 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
225 """
226 return self._set_osd_reweights(weightmap)
227
228 def set_crush_compat_weight_set_weights(self, weightmap: Dict[str, float]) -> None:
229 """
230 weightmap is a dict, int to float. devices only. e.g.,
231 { 0: 3.4, 1: 3.3, 2: 3.334 }
232 """
233 return self._set_crush_compat_weight_set_weights(weightmap)
234
235
236 class CRUSHMap(ceph_module.BasePyCRUSH):
237 ITEM_NONE = 0x7fffffff
238 DEFAULT_CHOOSE_ARGS = '-1'
239
240 def dump(self) -> Dict[str, Any]:
241 return self._dump()
242
243 def get_item_weight(self, item: int) -> Optional[int]:
244 return self._get_item_weight(item)
245
246 def get_item_name(self, item: int) -> Optional[str]:
247 return self._get_item_name(item)
248
249 def find_takes(self) -> List[int]:
250 return self._find_takes().get('takes', [])
251
252 def find_roots(self) -> List[int]:
253 return self._find_roots().get('roots', [])
254
255 def get_take_weight_osd_map(self, root: int) -> Dict[int, float]:
256 uglymap = self._get_take_weight_osd_map(root)
257 return {int(k): v for k, v in uglymap.get('weights', {}).items()}
258
259 @staticmethod
260 def have_default_choose_args(dump: Dict[str, Any]) -> bool:
261 return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
262
263 @staticmethod
264 def get_default_choose_args(dump: Dict[str, Any]) -> List[Dict[str, Any]]:
265 choose_args = dump.get('choose_args')
266 assert isinstance(choose_args, dict)
267 return choose_args.get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
268
269 def get_rule(self, rule_name: str) -> Optional[Dict[str, Any]]:
270 # TODO efficient implementation
271 for rule in self.dump()['rules']:
272 if rule_name == rule['rule_name']:
273 return rule
274
275 return None
276
277 def get_rule_by_id(self, rule_id: int) -> Optional[Dict[str, Any]]:
278 for rule in self.dump()['rules']:
279 if rule['rule_id'] == rule_id:
280 return rule
281
282 return None
283
284 def get_rule_root(self, rule_name: str) -> Optional[int]:
285 rule = self.get_rule(rule_name)
286 if rule is None:
287 return None
288
289 try:
290 first_take = next(s for s in rule['steps'] if s.get('op') == 'take')
291 except StopIteration:
292 logging.warning("CRUSH rule '{0}' has no 'take' step".format(
293 rule_name))
294 return None
295 else:
296 return first_take['item']
297
298 def get_osds_under(self, root_id: int) -> List[int]:
299 # TODO don't abuse dump like this
300 d = self.dump()
301 buckets = dict([(b['id'], b) for b in d['buckets']])
302
303 osd_list = []
304
305 def accumulate(b: Dict[str, Any]) -> None:
306 for item in b['items']:
307 if item['id'] >= 0:
308 osd_list.append(item['id'])
309 else:
310 try:
311 accumulate(buckets[item['id']])
312 except KeyError:
313 pass
314
315 accumulate(buckets[root_id])
316
317 return osd_list
318
319 def device_class_counts(self) -> Dict[str, int]:
320 result = defaultdict(int) # type: Dict[str, int]
321 # TODO don't abuse dump like this
322 d = self.dump()
323 for device in d['devices']:
324 cls = device.get('class', None)
325 result[cls] += 1
326
327 return dict(result)
328
329
330 HandlerFuncType = Callable[..., Tuple[int, str, str]]
331
332 def _extract_target_func(
333 f: HandlerFuncType
334 ) -> Tuple[HandlerFuncType, Dict[str, Any]]:
335 """In order to interoperate with other decorated functions,
336 we need to find the original function which will provide
337 the main set of arguments. While we descend through the
338 stack of wrapped functions, gather additional arguments
339 the decorators may want to provide.
340 """
341 # use getattr to keep mypy happy
342 wrapped = getattr(f, "__wrapped__", None)
343 if not wrapped:
344 return f, {}
345 extra_args: Dict[str, Any] = {}
346 while wrapped is not None:
347 extra_args.update(getattr(f, "extra_args", {}))
348 f = wrapped
349 wrapped = getattr(f, "__wrapped__", None)
350 return f, extra_args
351
352
353 class CLICommand(object):
354 COMMANDS = {} # type: Dict[str, CLICommand]
355
356 def __init__(self,
357 prefix: str,
358 perm: str = 'rw',
359 poll: bool = False):
360 self.prefix = prefix
361 self.perm = perm
362 self.poll = poll
363 self.func = None # type: Optional[Callable]
364 self.arg_spec = {} # type: Dict[str, Any]
365 self.first_default = -1
366
367 KNOWN_ARGS = '_', 'self', 'mgr', 'inbuf', 'return'
368
369 @classmethod
370 def _load_func_metadata(cls: Any, f: HandlerFuncType) -> Tuple[str, Dict[str, Any], int, str]:
371 f, extra_args = _extract_target_func(f)
372 desc = (inspect.getdoc(f) or '').replace('\n', ' ')
373 full_argspec = inspect.getfullargspec(f)
374 arg_spec = full_argspec.annotations
375 first_default = len(arg_spec)
376 if full_argspec.defaults:
377 first_default -= len(full_argspec.defaults)
378 args = []
379 positional = True
380 for index, arg in enumerate(full_argspec.args):
381 if arg in cls.KNOWN_ARGS:
382 # record that this function takes an inbuf if it is present
383 # in the full_argspec and not already in the arg_spec
384 if arg == 'inbuf' and 'inbuf' not in arg_spec:
385 arg_spec['inbuf'] = 'str'
386 continue
387 if arg == '_end_positional_':
388 positional = False
389 continue
390 if (
391 arg == 'format'
392 or arg_spec[arg] is Optional[bool]
393 or arg_spec[arg] is bool
394 ):
395 # implicit switch to non-positional on any
396 # Optional[bool] or the --format option
397 positional = False
398 assert arg in arg_spec, \
399 f"'{arg}' is not annotated for {f}: {full_argspec}"
400 has_default = index >= first_default
401 args.append(CephArgtype.to_argdesc(arg_spec[arg],
402 dict(name=arg),
403 has_default,
404 positional))
405 for argname, argtype in extra_args.items():
406 # avoid shadowing args from the function
407 if argname in arg_spec:
408 continue
409 arg_spec[argname] = argtype
410 args.append(CephArgtype.to_argdesc(
411 argtype, dict(name=argname), has_default=True, positional=False
412 ))
413 return desc, arg_spec, first_default, ' '.join(args)
414
415 def store_func_metadata(self, f: HandlerFuncType) -> None:
416 self.desc, self.arg_spec, self.first_default, self.args = \
417 self._load_func_metadata(f)
418
419 def __call__(self, func: HandlerFuncType) -> HandlerFuncType:
420 self.store_func_metadata(func)
421 self.func = func
422 self.COMMANDS[self.prefix] = self
423 return self.func
424
425 def _get_arg_value(self, kwargs_switch: bool, key: str, val: Any) -> Tuple[bool, str, Any]:
426 def start_kwargs() -> bool:
427 if isinstance(val, str) and '=' in val:
428 k, v = val.split('=', 1)
429 if k in self.arg_spec:
430 return True
431 return False
432
433 if not kwargs_switch:
434 kwargs_switch = start_kwargs()
435
436 if kwargs_switch:
437 k, v = val.split('=', 1)
438 else:
439 k, v = key, val
440 return kwargs_switch, k.replace('-', '_'), v
441
442 def _collect_args_by_argspec(self, cmd_dict: Dict[str, Any]) -> Tuple[Dict[str, Any], Set[str]]:
443 kwargs = {}
444 special_args = set()
445 kwargs_switch = False
446 for index, (name, tp) in enumerate(self.arg_spec.items()):
447 if name in CLICommand.KNOWN_ARGS:
448 special_args.add(name)
449 continue
450 assert self.first_default >= 0
451 raw_v = cmd_dict.get(name)
452 if index >= self.first_default:
453 if raw_v is None:
454 continue
455 kwargs_switch, k, v = self._get_arg_value(kwargs_switch,
456 name, raw_v)
457 kwargs[k] = CephArgtype.cast_to(tp, v)
458 return kwargs, special_args
459
460 def call(self,
461 mgr: Any,
462 cmd_dict: Dict[str, Any],
463 inbuf: Optional[str] = None) -> HandleCommandResult:
464 kwargs, specials = self._collect_args_by_argspec(cmd_dict)
465 if inbuf:
466 if 'inbuf' not in specials:
467 return HandleCommandResult(
468 -errno.EINVAL,
469 '',
470 'Invalid command: Input file data (-i) not supported',
471 )
472 kwargs['inbuf'] = inbuf
473 assert self.func
474 return self.func(mgr, **kwargs)
475
476 def dump_cmd(self) -> Dict[str, Union[str, bool]]:
477 return {
478 'cmd': '{} {}'.format(self.prefix, self.args),
479 'desc': self.desc,
480 'perm': self.perm,
481 'poll': self.poll,
482 }
483
484 @classmethod
485 def dump_cmd_list(cls) -> List[Dict[str, Union[str, bool]]]:
486 return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
487
488
489 def CLIReadCommand(prefix: str, poll: bool = False) -> CLICommand:
490 return CLICommand(prefix, "r", poll)
491
492
493 def CLIWriteCommand(prefix: str, poll: bool = False) -> CLICommand:
494 return CLICommand(prefix, "w", poll)
495
496
497 def CLICheckNonemptyFileInput(desc: str) -> Callable[[HandlerFuncType], HandlerFuncType]:
498 def CheckFileInput(func: HandlerFuncType) -> HandlerFuncType:
499 @functools.wraps(func)
500 def check(*args: Any, **kwargs: Any) -> Tuple[int, str, str]:
501 if 'inbuf' not in kwargs:
502 return -errno.EINVAL, '', f'{ERROR_MSG_NO_INPUT_FILE}: Please specify the file '\
503 f'containing {desc} with "-i" option'
504 if isinstance(kwargs['inbuf'], str):
505 # Delete new line separator at EOF (it may have been added by a text editor).
506 kwargs['inbuf'] = kwargs['inbuf'].rstrip('\r\n').rstrip('\n')
507 if not kwargs['inbuf'] or not kwargs['inbuf'].strip():
508 return -errno.EINVAL, '', f'{ERROR_MSG_EMPTY_INPUT_FILE}: Please add {desc} to '\
509 'the file'
510 return func(*args, **kwargs)
511 check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
512 return check
513 return CheckFileInput
514
515 def CLIRequiresDB(func: HandlerFuncType) -> HandlerFuncType:
516 @functools.wraps(func)
517 def check(self: MgrModule, *args: Any, **kwargs: Any) -> Tuple[int, str, str]:
518 if not self.db_ready():
519 return -errno.EAGAIN, "", "mgr db not yet available"
520 return func(self, *args, **kwargs)
521 check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
522 return check
523
524 def _get_localized_key(prefix: str, key: str) -> str:
525 return '{}/{}'.format(prefix, key)
526
527
528 """
529 MODULE_OPTIONS types and Option Class
530 """
531 if TYPE_CHECKING:
532 OptionTypeLabel = Literal[
533 'uint', 'int', 'str', 'float', 'bool', 'addr', 'addrvec', 'uuid', 'size', 'secs']
534
535
536 # common/options.h: value_t
537 OptionValue = Optional[Union[bool, int, float, str]]
538
539
540 class Option(Dict):
541 """
542 Helper class to declare options for MODULE_OPTIONS list.
543 TODO: Replace with typing.TypedDict when in python_version >= 3.8
544 """
545
546 def __init__(
547 self,
548 name: str,
549 default: OptionValue = None,
550 type: 'OptionTypeLabel' = 'str',
551 desc: Optional[str] = None,
552 long_desc: Optional[str] = None,
553 min: OptionValue = None,
554 max: OptionValue = None,
555 enum_allowed: Optional[List[str]] = None,
556 tags: Optional[List[str]] = None,
557 see_also: Optional[List[str]] = None,
558 runtime: bool = False,
559 ):
560 super(Option, self).__init__(
561 (k, v) for k, v in vars().items()
562 if k != 'self' and v is not None)
563
564
565 class Command(dict):
566 """
567 Helper class to declare options for COMMANDS list.
568
569 It also allows to specify prefix and args separately, as well as storing a
570 handler callable.
571
572 Usage:
573 >>> def handler(): return 0, "", ""
574 >>> Command(prefix="example",
575 ... handler=handler,
576 ... perm='w')
577 {'perm': 'w', 'poll': False}
578 """
579
580 def __init__(
581 self,
582 prefix: str,
583 handler: HandlerFuncType,
584 perm: str = "rw",
585 poll: bool = False,
586 ):
587 super().__init__(perm=perm,
588 poll=poll)
589 self.prefix = prefix
590 self.handler = handler
591
592 @staticmethod
593 def returns_command_result(instance: Any,
594 f: HandlerFuncType) -> Callable[..., HandleCommandResult]:
595 @functools.wraps(f)
596 def wrapper(mgr: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
597 retval, stdout, stderr = f(instance or mgr, *args, **kwargs)
598 return HandleCommandResult(retval, stdout, stderr)
599 wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined]
600 return wrapper
601
602 def register(self, instance: bool = False) -> HandlerFuncType:
603 """
604 Register a CLICommand handler. It allows an instance to register bound
605 methods. In that case, the mgr instance is not passed, and it's expected
606 to be available in the class instance.
607 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
608 items.
609 """
610 cmd = CLICommand(prefix=self.prefix, perm=self['perm'])
611 return cmd(self.returns_command_result(instance, self.handler))
612
613
614 class CPlusPlusHandler(logging.Handler):
615 def __init__(self, module_inst: Any):
616 super(CPlusPlusHandler, self).__init__()
617 self._module = module_inst
618 self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
619 .format(module_inst.module_name)))
620
621 def emit(self, record: logging.LogRecord) -> None:
622 if record.levelno >= self.level:
623 self._module._ceph_log(self.format(record))
624
625
626 class ClusterLogHandler(logging.Handler):
627 def __init__(self, module_inst: Any):
628 super().__init__()
629 self._module = module_inst
630 self.setFormatter(logging.Formatter("%(message)s"))
631
632 def emit(self, record: logging.LogRecord) -> None:
633 levelmap = {
634 logging.DEBUG: MgrModule.ClusterLogPrio.DEBUG,
635 logging.INFO: MgrModule.ClusterLogPrio.INFO,
636 logging.WARNING: MgrModule.ClusterLogPrio.WARN,
637 logging.ERROR: MgrModule.ClusterLogPrio.ERROR,
638 logging.CRITICAL: MgrModule.ClusterLogPrio.ERROR,
639 }
640 level = levelmap[record.levelno]
641 if record.levelno >= self.level:
642 self._module.cluster_log(self._module.module_name,
643 level,
644 self.format(record))
645
646
647 class FileHandler(logging.FileHandler):
648 def __init__(self, module_inst: Any):
649 path = module_inst.get_ceph_option("log_file")
650 idx = path.rfind(".log")
651 if idx != -1:
652 self.path = "{}.{}.log".format(path[:idx], module_inst.module_name)
653 else:
654 self.path = "{}.{}".format(path, module_inst.module_name)
655 super(FileHandler, self).__init__(self.path, delay=True)
656 self.setFormatter(logging.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
657
658
659 class MgrModuleLoggingMixin(object):
660 def _configure_logging(self,
661 mgr_level: str,
662 module_level: str,
663 cluster_level: str,
664 log_to_file: bool,
665 log_to_cluster: bool) -> None:
666 self._mgr_level: Optional[str] = None
667 self._module_level: Optional[str] = None
668 self._root_logger = logging.getLogger()
669
670 self._unconfigure_logging()
671
672 # the ceph log handler is initialized only once
673 self._mgr_log_handler = CPlusPlusHandler(self)
674 self._cluster_log_handler = ClusterLogHandler(self)
675 self._file_log_handler = FileHandler(self)
676
677 self.log_to_file = log_to_file
678 self.log_to_cluster = log_to_cluster
679
680 self._root_logger.addHandler(self._mgr_log_handler)
681 if log_to_file:
682 self._root_logger.addHandler(self._file_log_handler)
683 if log_to_cluster:
684 self._root_logger.addHandler(self._cluster_log_handler)
685
686 self._root_logger.setLevel(logging.NOTSET)
687 self._set_log_level(mgr_level, module_level, cluster_level)
688
689 def _unconfigure_logging(self) -> None:
690 # remove existing handlers:
691 rm_handlers = [
692 h for h in self._root_logger.handlers
693 if (isinstance(h, CPlusPlusHandler) or
694 isinstance(h, FileHandler) or
695 isinstance(h, ClusterLogHandler))]
696 for h in rm_handlers:
697 self._root_logger.removeHandler(h)
698 self.log_to_file = False
699 self.log_to_cluster = False
700
701 def _set_log_level(self,
702 mgr_level: str,
703 module_level: str,
704 cluster_level: str) -> None:
705 self._cluster_log_handler.setLevel(cluster_level.upper())
706
707 module_level = module_level.upper() if module_level else ''
708 if not self._module_level:
709 # using debug_mgr level
710 if not module_level and self._mgr_level == mgr_level:
711 # no change in module level neither in debug_mgr
712 return
713 else:
714 if self._module_level == module_level:
715 # no change in module level
716 return
717
718 if not self._module_level and not module_level:
719 level = self._ceph_log_level_to_python(mgr_level)
720 self.getLogger().debug("setting log level based on debug_mgr: %s (%s)",
721 level, mgr_level)
722 elif self._module_level and not module_level:
723 level = self._ceph_log_level_to_python(mgr_level)
724 self.getLogger().warning("unsetting module log level, falling back to "
725 "debug_mgr level: %s (%s)", level, mgr_level)
726 elif module_level:
727 level = module_level
728 self.getLogger().debug("setting log level: %s", level)
729
730 self._module_level = module_level
731 self._mgr_level = mgr_level
732
733 self._mgr_log_handler.setLevel(level)
734 self._file_log_handler.setLevel(level)
735
736 def _enable_file_log(self) -> None:
737 # enable file log
738 self.getLogger().warning("enabling logging to file")
739 self.log_to_file = True
740 self._root_logger.addHandler(self._file_log_handler)
741
742 def _disable_file_log(self) -> None:
743 # disable file log
744 self.getLogger().warning("disabling logging to file")
745 self.log_to_file = False
746 self._root_logger.removeHandler(self._file_log_handler)
747
748 def _enable_cluster_log(self) -> None:
749 # enable cluster log
750 self.getLogger().warning("enabling logging to cluster")
751 self.log_to_cluster = True
752 self._root_logger.addHandler(self._cluster_log_handler)
753
754 def _disable_cluster_log(self) -> None:
755 # disable cluster log
756 self.getLogger().warning("disabling logging to cluster")
757 self.log_to_cluster = False
758 self._root_logger.removeHandler(self._cluster_log_handler)
759
760 def _ceph_log_level_to_python(self, log_level: str) -> str:
761 if log_level:
762 try:
763 ceph_log_level = int(log_level.split("/", 1)[0])
764 except ValueError:
765 ceph_log_level = 0
766 else:
767 ceph_log_level = 0
768
769 log_level = "DEBUG"
770 if ceph_log_level <= 0:
771 log_level = "CRITICAL"
772 elif ceph_log_level <= 1:
773 log_level = "WARNING"
774 elif ceph_log_level <= 4:
775 log_level = "INFO"
776 return log_level
777
778 def getLogger(self, name: Optional[str] = None) -> logging.Logger:
779 return logging.getLogger(name)
780
781
782 class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
783 """
784 Standby modules only implement a serve and shutdown method, they
785 are not permitted to implement commands and they do not receive
786 any notifications.
787
788 They only have access to the mgrmap (for accessing service URI info
789 from their active peer), and to configuration settings (read only).
790 """
791
792 MODULE_OPTIONS: List[Option] = []
793 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
794
795 def __init__(self, module_name: str, capsule: Any):
796 super(MgrStandbyModule, self).__init__(capsule)
797 self.module_name = module_name
798
799 # see also MgrModule.__init__()
800 for o in self.MODULE_OPTIONS:
801 if 'default' in o:
802 if 'type' in o:
803 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
804 else:
805 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
806
807 # mock does not return a str
808 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
809 log_level = cast(str, self.get_module_option("log_level"))
810 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
811 self._configure_logging(mgr_level, log_level, cluster_level,
812 False, False)
813
814 # for backwards compatibility
815 self._logger = self.getLogger()
816
817 def __del__(self) -> None:
818 self._cleanup()
819 self._unconfigure_logging()
820
821 def _cleanup(self) -> None:
822 pass
823
824 @classmethod
825 def _register_options(cls, module_name: str) -> None:
826 cls.MODULE_OPTIONS.append(
827 Option(name='log_level', type='str', default="", runtime=True,
828 enum_allowed=['info', 'debug', 'critical', 'error',
829 'warning', '']))
830 cls.MODULE_OPTIONS.append(
831 Option(name='log_to_file', type='bool', default=False, runtime=True))
832 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
833 cls.MODULE_OPTIONS.append(
834 Option(name='log_to_cluster', type='bool', default=False,
835 runtime=True))
836 cls.MODULE_OPTIONS.append(
837 Option(name='log_to_cluster_level', type='str', default='info',
838 runtime=True,
839 enum_allowed=['info', 'debug', 'critical', 'error',
840 'warning', '']))
841
842 @property
843 def log(self) -> logging.Logger:
844 return self._logger
845
846 def serve(self) -> None:
847 """
848 The serve method is mandatory for standby modules.
849 :return:
850 """
851 raise NotImplementedError()
852
853 def get_mgr_id(self) -> str:
854 return self._ceph_get_mgr_id()
855
856 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
857 """
858 Retrieve the value of a persistent configuration setting
859
860 :param default: the default value of the config if it is not found
861 """
862 r = self._ceph_get_module_option(key)
863 if r is None:
864 return self.MODULE_OPTION_DEFAULTS.get(key, default)
865 else:
866 return r
867
868 def get_ceph_option(self, key: str) -> OptionValue:
869 return self._ceph_get_option(key)
870
871 def get_store(self, key: str) -> Optional[str]:
872 """
873 Retrieve the value of a persistent KV store entry
874
875 :param key: String
876 :return: Byte string or None
877 """
878 return self._ceph_get_store(key)
879
880 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
881 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
882 if r is None:
883 r = self._ceph_get_store(key)
884 if r is None:
885 r = default
886 return r
887
888 def get_active_uri(self) -> str:
889 return self._ceph_get_active_uri()
890
891 def get(self, data_name: str) -> Dict[str, Any]:
892 return self._ceph_get(data_name)
893
894 def get_mgr_ip(self) -> str:
895 ips = self.get("mgr_ips").get('ips', [])
896 if not ips:
897 return socket.gethostname()
898 return ips[0]
899
900 def get_hostname(self) -> str:
901 return socket.gethostname()
902
903 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
904 r = self._ceph_get_module_option(key, self.get_mgr_id())
905 if r is None:
906 return self.MODULE_OPTION_DEFAULTS.get(key, default)
907 else:
908 return r
909
910
911 HealthChecksT = Mapping[str, Mapping[str, Union[int, str, Sequence[str]]]]
912 # {"type": service_type, "id": service_id}
913 ServiceInfoT = Dict[str, str]
914 # {"hostname": hostname,
915 # "ceph_version": version,
916 # "services": [service_info, ..]}
917 ServerInfoT = Dict[str, Union[str, List[ServiceInfoT]]]
918 PerfCounterT = Dict[str, Any]
919
920
921 class API:
922 def DecoratorFactory(attr: str, default: Any): # type: ignore
923 class DecoratorClass:
924 _ATTR_TOKEN = f'__ATTR_{attr.upper()}__'
925
926 def __init__(self, value: Any=default) -> None:
927 self.value = value
928
929 def __call__(self, func: Callable) -> Any:
930 setattr(func, self._ATTR_TOKEN, self.value)
931 return func
932
933 @classmethod
934 def get(cls, func: Callable) -> Any:
935 return getattr(func, cls._ATTR_TOKEN, default)
936
937 return DecoratorClass
938
939 perm = DecoratorFactory('perm', default='r')
940 expose = DecoratorFactory('expose', default=False)(True)
941
942
943 class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
944 MGR_POOL_NAME = ".mgr"
945
946 COMMANDS = [] # type: List[Any]
947 MODULE_OPTIONS: List[Option] = []
948 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
949
950 # Database Schema
951 SCHEMA = None # type: Optional[str]
952 SCHEMA_VERSIONED = None # type: Optional[List[str]]
953
954 # Priority definitions for perf counters
955 PRIO_CRITICAL = 10
956 PRIO_INTERESTING = 8
957 PRIO_USEFUL = 5
958 PRIO_UNINTERESTING = 2
959 PRIO_DEBUGONLY = 0
960
961 # counter value types
962 PERFCOUNTER_TIME = 1
963 PERFCOUNTER_U64 = 2
964
965 # counter types
966 PERFCOUNTER_LONGRUNAVG = 4
967 PERFCOUNTER_COUNTER = 8
968 PERFCOUNTER_HISTOGRAM = 0x10
969 PERFCOUNTER_TYPE_MASK = ~3
970
971 # units supported
972 BYTES = 0
973 NONE = 1
974
975 # Cluster log priorities
976 class ClusterLogPrio(IntEnum):
977 DEBUG = 0
978 INFO = 1
979 SEC = 2
980 WARN = 3
981 ERROR = 4
982
983 def __init__(self, module_name: str, py_modules_ptr: object, this_ptr: object):
984 self.module_name = module_name
985 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
986
987 for o in self.MODULE_OPTIONS:
988 if 'default' in o:
989 if 'type' in o:
990 # we'll assume the declared type matches the
991 # supplied default value's type.
992 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
993 else:
994 # module not declaring it's type, so normalize the
995 # default value to be a string for consistent behavior
996 # with default and user-supplied option values.
997 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
998
999 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
1000 log_level = cast(str, self.get_module_option("log_level"))
1001 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
1002 log_to_file = self.get_module_option("log_to_file")
1003 assert isinstance(log_to_file, bool)
1004 log_to_cluster = self.get_module_option("log_to_cluster")
1005 assert isinstance(log_to_cluster, bool)
1006 self._configure_logging(mgr_level, log_level, cluster_level,
1007 log_to_file, log_to_cluster)
1008
1009 # for backwards compatibility
1010 self._logger = self.getLogger()
1011
1012 self._db = None # type: Optional[sqlite3.Connection]
1013
1014 self._version = self._ceph_get_version()
1015
1016 self._perf_schema_cache = None
1017
1018 # Keep a librados instance for those that need it.
1019 self._rados: Optional[rados.Rados] = None
1020
1021 # this does not change over the lifetime of an active mgr
1022 self._mgr_ips: Optional[str] = None
1023
1024 self._db_lock = threading.Lock()
1025
1026 def __del__(self) -> None:
1027 self._unconfigure_logging()
1028
1029 @classmethod
1030 def _register_options(cls, module_name: str) -> None:
1031 cls.MODULE_OPTIONS.append(
1032 Option(name='log_level', type='str', default="", runtime=True,
1033 enum_allowed=['info', 'debug', 'critical', 'error',
1034 'warning', '']))
1035 cls.MODULE_OPTIONS.append(
1036 Option(name='log_to_file', type='bool', default=False, runtime=True))
1037 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
1038 cls.MODULE_OPTIONS.append(
1039 Option(name='log_to_cluster', type='bool', default=False,
1040 runtime=True))
1041 cls.MODULE_OPTIONS.append(
1042 Option(name='log_to_cluster_level', type='str', default='info',
1043 runtime=True,
1044 enum_allowed=['info', 'debug', 'critical', 'error',
1045 'warning', '']))
1046
1047 @classmethod
1048 def _register_commands(cls, module_name: str) -> None:
1049 cls.COMMANDS.extend(CLICommand.dump_cmd_list())
1050
1051 @property
1052 def log(self) -> logging.Logger:
1053 return self._logger
1054
1055 def cluster_log(self, channel: str, priority: ClusterLogPrio, message: str) -> None:
1056 """
1057 :param channel: The log channel. This can be 'cluster', 'audit', ...
1058 :param priority: The log message priority.
1059 :param message: The message to log.
1060 """
1061 self._ceph_cluster_log(channel, priority.value, message)
1062
1063 @property
1064 def version(self) -> str:
1065 return self._version
1066
1067 @API.expose
1068 def pool_exists(self, name: str) -> bool:
1069 pools = [p['pool_name'] for p in self.get('osd_map')['pools']]
1070 return name in pools
1071
1072 @API.expose
1073 def have_enough_osds(self) -> bool:
1074 # wait until we have enough OSDs to allow the pool to be healthy
1075 ready = 0
1076 for osd in self.get("osd_map")["osds"]:
1077 if osd["up"] and osd["in"]:
1078 ready += 1
1079
1080 need = cast(int, self.get_ceph_option("osd_pool_default_size"))
1081 return ready >= need
1082
1083 @API.perm('w')
1084 @API.expose
1085 def rename_pool(self, srcpool: str, destpool: str) -> None:
1086 c = {
1087 'prefix': 'osd pool rename',
1088 'format': 'json',
1089 'srcpool': srcpool,
1090 'destpool': destpool,
1091 'yes_i_really_mean_it': True
1092 }
1093 self.check_mon_command(c)
1094
1095 @API.perm('w')
1096 @API.expose
1097 def create_pool(self, pool: str) -> None:
1098 c = {
1099 'prefix': 'osd pool create',
1100 'format': 'json',
1101 'pool': pool,
1102 'pg_num': 1,
1103 'pg_num_min': 1,
1104 'pg_num_max': 32,
1105 'yes_i_really_mean_it': True
1106 }
1107 self.check_mon_command(c)
1108
1109 @API.perm('w')
1110 @API.expose
1111 def appify_pool(self, pool: str, app: str) -> None:
1112 c = {
1113 'prefix': 'osd pool application enable',
1114 'format': 'json',
1115 'pool': pool,
1116 'app': app,
1117 'yes_i_really_mean_it': True
1118 }
1119 self.check_mon_command(c)
1120
1121 @API.perm('w')
1122 @API.expose
1123 def create_mgr_pool(self) -> None:
1124 self.log.info("creating mgr pool")
1125
1126 ov = self.get_module_option_ex('devicehealth', 'pool_name', 'device_health_metrics')
1127 devhealth = cast(str, ov)
1128 if devhealth is not None and self.pool_exists(devhealth):
1129 self.log.debug("reusing devicehealth pool")
1130 self.rename_pool(devhealth, self.MGR_POOL_NAME)
1131 self.appify_pool(self.MGR_POOL_NAME, 'mgr')
1132 else:
1133 self.log.debug("creating new mgr pool")
1134 self.create_pool(self.MGR_POOL_NAME)
1135 self.appify_pool(self.MGR_POOL_NAME, 'mgr')
1136
1137 def create_skeleton_schema(self, db: sqlite3.Connection) -> None:
1138 SQL = """
1139 CREATE TABLE IF NOT EXISTS MgrModuleKV (
1140 key TEXT PRIMARY KEY,
1141 value NOT NULL
1142 ) WITHOUT ROWID;
1143 INSERT OR IGNORE INTO MgrModuleKV (key, value) VALUES ('__version', 0);
1144 """
1145
1146 db.executescript(SQL)
1147
1148 def update_schema_version(self, db: sqlite3.Connection, version: int) -> None:
1149 SQL = "UPDATE OR ROLLBACK MgrModuleKV SET value = ? WHERE key = '__version';"
1150
1151 db.execute(SQL, (version,))
1152
1153 def set_kv(self, key: str, value: Any) -> None:
1154 SQL = "INSERT OR REPLACE INTO MgrModuleKV (key, value) VALUES (?, ?);"
1155
1156 assert key[:2] != "__"
1157
1158 self.log.debug(f"set_kv('{key}', '{value}')")
1159
1160 with self._db_lock, self.db:
1161 self.db.execute(SQL, (key, value))
1162
1163 @API.expose
1164 def get_kv(self, key: str) -> Any:
1165 SQL = "SELECT value FROM MgrModuleKV WHERE key = ?;"
1166
1167 assert key[:2] != "__"
1168
1169 self.log.debug(f"get_kv('{key}')")
1170
1171 with self._db_lock, self.db:
1172 cur = self.db.execute(SQL, (key,))
1173 row = cur.fetchone()
1174 if row is None:
1175 return None
1176 else:
1177 v = row['value']
1178 self.log.debug(f" = {v}")
1179 return v
1180
1181 def maybe_upgrade(self, db: sqlite3.Connection, version: int) -> None:
1182 if version <= 0:
1183 self.log.info(f"creating main.db for {self.module_name}")
1184 assert self.SCHEMA is not None
1185 cur = db.executescript(self.SCHEMA)
1186 self.update_schema_version(db, 1)
1187 else:
1188 assert self.SCHEMA_VERSIONED is not None
1189 latest = len(self.SCHEMA_VERSIONED)
1190 if latest < version:
1191 raise RuntimeError(f"main.db version is newer ({version}) than module ({latest})")
1192 for i in range(version, latest):
1193 self.log.info(f"upgrading main.db for {self.module_name} from {i-1}:{i}")
1194 SQL = self.SCHEMA_VERSIONED[i]
1195 db.executescript(SQL)
1196 if version < latest:
1197 self.update_schema_version(db, latest)
1198
1199 def load_schema(self, db: sqlite3.Connection) -> None:
1200 SQL = """
1201 SELECT value FROM MgrModuleKV WHERE key = '__version';
1202 """
1203
1204 with db:
1205 self.create_skeleton_schema(db)
1206 cur = db.execute(SQL)
1207 row = cur.fetchone()
1208 self.maybe_upgrade(db, int(row['value']))
1209 assert cur.fetchone() is None
1210 cur.close()
1211
1212 def configure_db(self, db: sqlite3.Connection) -> None:
1213 db.execute('PRAGMA FOREIGN_KEYS = 1')
1214 db.execute('PRAGMA JOURNAL_MODE = PERSIST')
1215 db.execute('PRAGMA PAGE_SIZE = 65536')
1216 db.execute('PRAGMA CACHE_SIZE = 64')
1217 db.execute('PRAGMA TEMP_STORE = memory')
1218 db.row_factory = sqlite3.Row
1219 self.load_schema(db)
1220
1221 def open_db(self) -> Optional[sqlite3.Connection]:
1222 if not self.pool_exists(self.MGR_POOL_NAME):
1223 if not self.have_enough_osds():
1224 return None
1225 self.create_mgr_pool()
1226 uri = f"file:///{self.MGR_POOL_NAME}:{self.module_name}/main.db?vfs=ceph";
1227 self.log.debug(f"using uri {uri}")
1228 db = sqlite3.connect(uri, check_same_thread=False, uri=True)
1229 self.configure_db(db)
1230 return db
1231
1232 @API.expose
1233 def db_ready(self) -> bool:
1234 with self._db_lock:
1235 try:
1236 return self.db is not None
1237 except MgrDBNotReady:
1238 return False
1239
1240 @property
1241 def db(self) -> sqlite3.Connection:
1242 assert self._db_lock.locked()
1243 if self._db is not None:
1244 return self._db
1245 db_allowed = self.get_ceph_option("mgr_pool")
1246 if not db_allowed:
1247 raise MgrDBNotReady();
1248 self._db = self.open_db()
1249 if self._db is None:
1250 raise MgrDBNotReady();
1251 return self._db
1252
1253 @property
1254 def release_name(self) -> str:
1255 """
1256 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
1257 :return: Returns the release name of the Ceph version in lower case.
1258 :rtype: str
1259 """
1260 return self._ceph_get_release_name()
1261
1262 @API.expose
1263 def lookup_release_name(self, major: int) -> str:
1264 return self._ceph_lookup_release_name(major)
1265
1266 def get_context(self) -> object:
1267 """
1268 :return: a Python capsule containing a C++ CephContext pointer
1269 """
1270 return self._ceph_get_context()
1271
1272 def notify(self, notify_type: NotifyType, notify_id: str) -> None:
1273 """
1274 Called by the ceph-mgr service to notify the Python plugin
1275 that new state is available. This method is *only* called for
1276 notify_types that are listed in the NOTIFY_TYPES string list
1277 member of the module class.
1278
1279 :param notify_type: string indicating what kind of notification,
1280 such as osd_map, mon_map, fs_map, mon_status,
1281 health, pg_summary, command, service_map
1282 :param notify_id: string (may be empty) that optionally specifies
1283 which entity is being notified about. With
1284 "command" notifications this is set to the tag
1285 ``from send_command``.
1286 """
1287 pass
1288
1289 def _config_notify(self) -> None:
1290 # check logging options for changes
1291 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
1292 module_level = cast(str, self.get_module_option("log_level"))
1293 cluster_level = cast(str, self.get_module_option("log_to_cluster_level"))
1294 assert isinstance(cluster_level, str)
1295 log_to_file = self.get_module_option("log_to_file", False)
1296 assert isinstance(log_to_file, bool)
1297 log_to_cluster = self.get_module_option("log_to_cluster", False)
1298 assert isinstance(log_to_cluster, bool)
1299 self._set_log_level(mgr_level, module_level, cluster_level)
1300
1301 if log_to_file != self.log_to_file:
1302 if log_to_file:
1303 self._enable_file_log()
1304 else:
1305 self._disable_file_log()
1306 if log_to_cluster != self.log_to_cluster:
1307 if log_to_cluster:
1308 self._enable_cluster_log()
1309 else:
1310 self._disable_cluster_log()
1311
1312 # call module subclass implementations
1313 self.config_notify()
1314
1315 def config_notify(self) -> None:
1316 """
1317 Called by the ceph-mgr service to notify the Python plugin
1318 that the configuration may have changed. Modules will want to
1319 refresh any configuration values stored in config variables.
1320 """
1321 pass
1322
1323 def serve(self) -> None:
1324 """
1325 Called by the ceph-mgr service to start any server that
1326 is provided by this Python plugin. The implementation
1327 of this function should block until ``shutdown`` is called.
1328
1329 You *must* implement ``shutdown`` if you implement ``serve``
1330 """
1331 pass
1332
1333 def shutdown(self) -> None:
1334 """
1335 Called by the ceph-mgr service to request that this
1336 module drop out of its serve() function. You do not
1337 need to implement this if you do not implement serve()
1338
1339 :return: None
1340 """
1341 if self._rados:
1342 addrs = self._rados.get_addrs()
1343 self._rados.shutdown()
1344 self._ceph_unregister_client(addrs)
1345 self._rados = None
1346
1347 @API.expose
1348 def get(self, data_name: str) -> Any:
1349 """
1350 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
1351
1352 :param str data_name: Valid things to fetch are osdmap_crush_map_text,
1353 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
1354 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
1355 health, mon_status, devices, device <devid>, pg_stats,
1356 pool_stats, pg_ready, osd_ping_times, mgr_map, mgr_ips,
1357 modified_config_options, service_map, mds_metadata,
1358 have_local_config_map, osd_pool_stats, pg_status.
1359
1360 Note:
1361 All these structures have their own JSON representations: experiment
1362 or look at the C++ ``dump()`` methods to learn about them.
1363 """
1364 obj = self._ceph_get(data_name)
1365 if isinstance(obj, bytes):
1366 obj = json.loads(obj)
1367
1368 return obj
1369
1370 def _stattype_to_str(self, stattype: int) -> str:
1371
1372 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
1373 if typeonly == 0:
1374 return 'gauge'
1375 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
1376 # this lie matches the DaemonState decoding: only val, no counts
1377 return 'counter'
1378 if typeonly == self.PERFCOUNTER_COUNTER:
1379 return 'counter'
1380 if typeonly == self.PERFCOUNTER_HISTOGRAM:
1381 return 'histogram'
1382
1383 return ''
1384
1385 def _perfpath_to_path_labels(self, daemon: str,
1386 path: str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]:
1387 if daemon.startswith('rgw.'):
1388 label_name = 'instance_id'
1389 daemon = daemon[len('rgw.'):]
1390 else:
1391 label_name = 'ceph_daemon'
1392
1393 label_names = (label_name,) # type: Tuple[str, ...]
1394 labels = (daemon,) # type: Tuple[str, ...]
1395
1396 if daemon.startswith('rbd-mirror.'):
1397 match = re.match(
1398 r'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
1399 path
1400 )
1401 if match:
1402 path = 'rbd_mirror_image_' + match.group(4)
1403 pool = match.group(1)
1404 namespace = match.group(2) or ''
1405 image = match.group(3)
1406 label_names += ('pool', 'namespace', 'image')
1407 labels += (pool, namespace, image)
1408
1409 return path, label_names, labels,
1410
1411 def _perfvalue_to_value(self, stattype: int, value: Union[int, float]) -> Union[float, int]:
1412 if stattype & self.PERFCOUNTER_TIME:
1413 # Convert from ns to seconds
1414 return value / 1000000000.0
1415 else:
1416 return value
1417
1418 def _unit_to_str(self, unit: int) -> str:
1419 if unit == self.NONE:
1420 return "/s"
1421 elif unit == self.BYTES:
1422 return "B/s"
1423 else:
1424 raise ValueError(f'bad unit "{unit}"')
1425
1426 @staticmethod
1427 def to_pretty_iec(n: int) -> str:
1428 for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
1429 (20, 'Mi'), (10, 'Ki')]:
1430 if n > 10 << bits:
1431 return str(n >> bits) + ' ' + suffix
1432 return str(n) + ' '
1433
1434 @staticmethod
1435 def get_pretty_row(elems: Sequence[str], width: int) -> str:
1436 """
1437 Takes an array of elements and returns a string with those elements
1438 formatted as a table row. Useful for polling modules.
1439
1440 :param elems: the elements to be printed
1441 :param width: the width of the terminal
1442 """
1443 n = len(elems)
1444 column_width = int(width / n)
1445
1446 ret = '|'
1447 for elem in elems:
1448 ret += '{0:>{w}} |'.format(elem, w=column_width - 2)
1449
1450 return ret
1451
1452 def get_pretty_header(self, elems: Sequence[str], width: int) -> str:
1453 """
1454 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
1455
1456 :param elems: the elements to be printed
1457 :param width: the width of the terminal
1458 """
1459 n = len(elems)
1460 column_width = int(width / n)
1461
1462 # dash line
1463 ret = '+'
1464 for i in range(0, n):
1465 ret += '-' * (column_width - 1) + '+'
1466 ret += '\n'
1467
1468 # title
1469 ret += self.get_pretty_row(elems, width)
1470 ret += '\n'
1471
1472 # dash line
1473 ret += '+'
1474 for i in range(0, n):
1475 ret += '-' * (column_width - 1) + '+'
1476 ret += '\n'
1477
1478 return ret
1479
1480 @API.expose
1481 def get_server(self, hostname: str) -> ServerInfoT:
1482 """
1483 Called by the plugin to fetch metadata about a particular hostname from
1484 ceph-mgr.
1485
1486 This is information that ceph-mgr has gleaned from the daemon metadata
1487 reported by daemons running on a particular server.
1488
1489 :param hostname: a hostname
1490 """
1491 return cast(ServerInfoT, self._ceph_get_server(hostname))
1492
1493 @API.expose
1494 def get_perf_schema(self,
1495 svc_type: str,
1496 svc_name: str) -> Dict[str,
1497 Dict[str, Dict[str, Union[str, int]]]]:
1498 """
1499 Called by the plugin to fetch perf counter schema info.
1500 svc_name can be nullptr, as can svc_type, in which case
1501 they are wildcards
1502
1503 :param str svc_type:
1504 :param str svc_name:
1505 :return: list of dicts describing the counters requested
1506 """
1507 return self._ceph_get_perf_schema(svc_type, svc_name)
1508
1509 def get_rocksdb_version(self) -> str:
1510 """
1511 Called by the plugin to fetch the latest RocksDB version number.
1512
1513 :return: str representing the major, minor, and patch RocksDB version numbers
1514 """
1515 return self._ceph_get_rocksdb_version()
1516
1517 @API.expose
1518 def get_counter(self,
1519 svc_type: str,
1520 svc_name: str,
1521 path: str) -> Dict[str, List[Tuple[float, int]]]:
1522 """
1523 Called by the plugin to fetch the latest performance counter data for a
1524 particular counter on a particular service.
1525
1526 :param str svc_type:
1527 :param str svc_name:
1528 :param str path: a period-separated concatenation of the subsystem and the
1529 counter name, for example "mds.inodes".
1530 :return: A dict of counter names to their values. each value is a list of
1531 of two-tuples of (timestamp, value). This may be empty if no data is
1532 available.
1533 """
1534 return self._ceph_get_counter(svc_type, svc_name, path)
1535
1536 @API.expose
1537 def get_latest_counter(self,
1538 svc_type: str,
1539 svc_name: str,
1540 path: str) -> Dict[str, Union[Tuple[float, int],
1541 Tuple[float, int, int]]]:
1542 """
1543 Called by the plugin to fetch only the newest performance counter data
1544 point for a particular counter on a particular service.
1545
1546 :param str svc_type:
1547 :param str svc_name:
1548 :param str path: a period-separated concatenation of the subsystem and the
1549 counter name, for example "mds.inodes".
1550 :return: A list of two-tuples of (timestamp, value) or three-tuple of
1551 (timestamp, value, count) is returned. This may be empty if no
1552 data is available.
1553 """
1554 return self._ceph_get_latest_counter(svc_type, svc_name, path)
1555
1556 @API.expose
1557 def list_servers(self) -> List[ServerInfoT]:
1558 """
1559 Like ``get_server``, but gives information about all servers (i.e. all
1560 unique hostnames that have been mentioned in daemon metadata)
1561
1562 :return: a list of information about all servers
1563 :rtype: list
1564 """
1565 return cast(List[ServerInfoT], self._ceph_get_server(None))
1566
1567 def get_metadata(self,
1568 svc_type: str,
1569 svc_id: str,
1570 default: Optional[Dict[str, str]] = None) -> Optional[Dict[str, str]]:
1571 """
1572 Fetch the daemon metadata for a particular service.
1573
1574 ceph-mgr fetches metadata asynchronously, so are windows of time during
1575 addition/removal of services where the metadata is not available to
1576 modules. ``None`` is returned if no metadata is available.
1577
1578 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1579 :param str svc_id: service id. convert OSD integer IDs to strings when
1580 calling this
1581 :rtype: dict, or None if no metadata found
1582 """
1583 metadata = self._ceph_get_metadata(svc_type, svc_id)
1584 if not metadata:
1585 return default
1586 return metadata
1587
1588 @API.expose
1589 def get_daemon_status(self, svc_type: str, svc_id: str) -> Dict[str, str]:
1590 """
1591 Fetch the latest status for a particular service daemon.
1592
1593 This method may return ``None`` if no status information is
1594 available, for example because the daemon hasn't fully started yet.
1595
1596 :param svc_type: string (e.g., 'rgw')
1597 :param svc_id: string
1598 :return: dict, or None if the service is not found
1599 """
1600 return self._ceph_get_daemon_status(svc_type, svc_id)
1601
1602 def check_mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> HandleCommandResult:
1603 """
1604 Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
1605 if ``retval != 0``.
1606 """
1607
1608 r = HandleCommandResult(*self.mon_command(cmd_dict, inbuf))
1609 if r.retval:
1610 raise MonCommandFailed(f'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
1611 return r
1612
1613 def mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1614 """
1615 Helper for modules that do simple, synchronous mon command
1616 execution.
1617
1618 See send_command for general case.
1619
1620 :return: status int, out std, err str
1621 """
1622
1623 t1 = time.time()
1624 result = CommandResult()
1625 self.send_command(result, "mon", "", json.dumps(cmd_dict), "", inbuf)
1626 r = result.wait()
1627 t2 = time.time()
1628
1629 self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1630 cmd_dict['prefix'], r[0], t2 - t1
1631 ))
1632
1633 return r
1634
1635 def osd_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1636 """
1637 Helper for osd command execution.
1638
1639 See send_command for general case. Also, see osd/OSD.cc for available commands.
1640
1641 :param dict cmd_dict: expects a prefix and an osd id, i.e.:
1642 cmd_dict = {
1643 'prefix': 'perf histogram dump',
1644 'id': '0'
1645 }
1646 :return: status int, out std, err str
1647 """
1648 t1 = time.time()
1649 result = CommandResult()
1650 self.send_command(result, "osd", cmd_dict['id'], json.dumps(cmd_dict), "", inbuf)
1651 r = result.wait()
1652 t2 = time.time()
1653
1654 self.log.debug("osd_command: '{0}' -> {1} in {2:.3f}s".format(
1655 cmd_dict['prefix'], r[0], t2 - t1
1656 ))
1657
1658 return r
1659
1660 def tell_command(self, daemon_type: str, daemon_id: str, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1661 """
1662 Helper for `ceph tell` command execution.
1663
1664 See send_command for general case.
1665
1666 :param dict cmd_dict: expects a prefix i.e.:
1667 cmd_dict = {
1668 'prefix': 'heap',
1669 'heapcmd': 'stats',
1670 }
1671 :return: status int, out std, err str
1672 """
1673 t1 = time.time()
1674 result = CommandResult()
1675 self.send_command(result, daemon_type, daemon_id, json.dumps(cmd_dict), "", inbuf)
1676 r = result.wait()
1677 t2 = time.time()
1678
1679 self.log.debug("tell_command on {0}.{1}: '{2}' -> {3} in {4:.5f}s".format(
1680 daemon_type, daemon_id, cmd_dict['prefix'], r[0], t2 - t1
1681 ))
1682
1683 return r
1684
1685 def send_command(
1686 self,
1687 result: CommandResult,
1688 svc_type: str,
1689 svc_id: str,
1690 command: str,
1691 tag: str,
1692 inbuf: Optional[str] = None) -> None:
1693 """
1694 Called by the plugin to send a command to the mon
1695 cluster.
1696
1697 :param CommandResult result: an instance of the ``CommandResult``
1698 class, defined in the same module as MgrModule. This acts as a
1699 completion and stores the output of the command. Use
1700 ``CommandResult.wait()`` if you want to block on completion.
1701 :param str svc_type:
1702 :param str svc_id:
1703 :param str command: a JSON-serialized command. This uses the same
1704 format as the ceph command line, which is a dictionary of command
1705 arguments, with the extra ``prefix`` key containing the command
1706 name itself. Consult MonCommands.h for available commands and
1707 their expected arguments.
1708 :param str tag: used for nonblocking operation: when a command
1709 completes, the ``notify()`` callback on the MgrModule instance is
1710 triggered, with notify_type set to "command", and notify_id set to
1711 the tag of the command.
1712 :param str inbuf: input buffer for sending additional data.
1713 """
1714 self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
1715
1716 def tool_exec(
1717 self,
1718 args: List[str],
1719 timeout: int = 10,
1720 stdin: Optional[bytes] = None
1721 ) -> Tuple[int, str, str]:
1722 try:
1723 tool = args.pop(0)
1724 cmd = [
1725 tool,
1726 '-k', str(self.get_ceph_option('keyring')),
1727 '-n', f'mgr.{self.get_mgr_id()}',
1728 ] + args
1729 self.log.debug('exec: ' + ' '.join(cmd))
1730 p = subprocess.run(
1731 cmd,
1732 input=stdin,
1733 stdout=subprocess.PIPE,
1734 stderr=subprocess.PIPE,
1735 timeout=timeout,
1736 )
1737 except subprocess.TimeoutExpired as ex:
1738 self.log.error(ex)
1739 return -errno.ETIMEDOUT, '', str(ex)
1740 if p.returncode:
1741 self.log.error(f'Non-zero return from {cmd}: {p.stderr.decode()}')
1742 return p.returncode, p.stdout.decode(), p.stderr.decode()
1743
1744 def set_health_checks(self, checks: HealthChecksT) -> None:
1745 """
1746 Set the module's current map of health checks. Argument is a
1747 dict of check names to info, in this form:
1748
1749 ::
1750
1751 {
1752 'CHECK_FOO': {
1753 'severity': 'warning', # or 'error'
1754 'summary': 'summary string',
1755 'count': 4, # quantify badness
1756 'detail': [ 'list', 'of', 'detail', 'strings' ],
1757 },
1758 'CHECK_BAR': {
1759 'severity': 'error',
1760 'summary': 'bars are bad',
1761 'detail': [ 'too hard' ],
1762 },
1763 }
1764
1765 :param list: dict of health check dicts
1766 """
1767 self._ceph_set_health_checks(checks)
1768
1769 def _handle_command(self,
1770 inbuf: str,
1771 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1772 Tuple[int, str, str]]:
1773 if cmd['prefix'] not in CLICommand.COMMANDS:
1774 return self.handle_command(inbuf, cmd)
1775
1776 return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
1777
1778 def handle_command(self,
1779 inbuf: str,
1780 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1781 Tuple[int, str, str]]:
1782 """
1783 Called by ceph-mgr to request the plugin to handle one
1784 of the commands that it declared in self.COMMANDS
1785
1786 Return a status code, an output buffer, and an
1787 output string. The output buffer is for data results,
1788 the output string is for informative text.
1789
1790 :param inbuf: content of any "-i <file>" supplied to ceph cli
1791 :type inbuf: str
1792 :param cmd: from Ceph's cmdmap_t
1793 :type cmd: dict
1794
1795 :return: HandleCommandResult or a 3-tuple of (int, str, str)
1796 """
1797
1798 # Should never get called if they didn't declare
1799 # any ``COMMANDS``
1800 raise NotImplementedError()
1801
1802 def get_mgr_id(self) -> str:
1803 """
1804 Retrieve the name of the manager daemon where this plugin
1805 is currently being executed (i.e. the active manager).
1806
1807 :return: str
1808 """
1809 return self._ceph_get_mgr_id()
1810
1811 @API.expose
1812 def get_ceph_conf_path(self) -> str:
1813 return self._ceph_get_ceph_conf_path()
1814
1815 @API.expose
1816 def get_mgr_ip(self) -> str:
1817 if not self._mgr_ips:
1818 ips = self.get("mgr_ips").get('ips', [])
1819 if not ips:
1820 return socket.gethostname()
1821 self._mgr_ips = ips[0]
1822 assert self._mgr_ips is not None
1823 return self._mgr_ips
1824
1825 @API.expose
1826 def get_hostname(self) -> str:
1827 return socket.gethostname()
1828
1829 @API.expose
1830 def get_ceph_option(self, key: str) -> OptionValue:
1831 return self._ceph_get_option(key)
1832
1833 @API.expose
1834 def get_foreign_ceph_option(self, entity: str, key: str) -> OptionValue:
1835 return self._ceph_get_foreign_option(entity, key)
1836
1837 def _validate_module_option(self, key: str) -> None:
1838 """
1839 Helper: don't allow get/set config callers to
1840 access config options that they didn't declare
1841 in their schema.
1842 """
1843 if key not in [o['name'] for o in self.MODULE_OPTIONS]:
1844 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1845 format(key, self.__class__.__name__))
1846
1847 def _get_module_option(self,
1848 key: str,
1849 default: OptionValue,
1850 localized_prefix: str = "") -> OptionValue:
1851 r = self._ceph_get_module_option(self.module_name, key,
1852 localized_prefix)
1853 if r is None:
1854 return self.MODULE_OPTION_DEFAULTS.get(key, default)
1855 else:
1856 return r
1857
1858 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
1859 """
1860 Retrieve the value of a persistent configuration setting
1861 """
1862 self._validate_module_option(key)
1863 return self._get_module_option(key, default)
1864
1865 def get_module_option_ex(self, module: str,
1866 key: str,
1867 default: OptionValue = None) -> OptionValue:
1868 """
1869 Retrieve the value of a persistent configuration setting
1870 for the specified module.
1871
1872 :param module: The name of the module, e.g. 'dashboard'
1873 or 'telemetry'.
1874 :param key: The configuration key, e.g. 'server_addr'.
1875 :param default: The default value to use when the
1876 returned value is ``None``. Defaults to ``None``.
1877 """
1878 if module == self.module_name:
1879 self._validate_module_option(key)
1880 r = self._ceph_get_module_option(module, key)
1881 return default if r is None else r
1882
1883 @API.expose
1884 def get_store_prefix(self, key_prefix: str) -> Dict[str, str]:
1885 """
1886 Retrieve a dict of KV store keys to values, where the keys
1887 have the given prefix
1888
1889 :param str key_prefix:
1890 :return: str
1891 """
1892 return self._ceph_get_store_prefix(key_prefix)
1893
1894 def _set_localized(self,
1895 key: str,
1896 val: Optional[str],
1897 setter: Callable[[str, Optional[str]], None]) -> None:
1898 return setter(_get_localized_key(self.get_mgr_id(), key), val)
1899
1900 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
1901 """
1902 Retrieve localized configuration for this ceph-mgr instance
1903 """
1904 self._validate_module_option(key)
1905 return self._get_module_option(key, default, self.get_mgr_id())
1906
1907 def _set_module_option(self, key: str, val: Any) -> None:
1908 return self._ceph_set_module_option(self.module_name, key,
1909 None if val is None else str(val))
1910
1911 def set_module_option(self, key: str, val: Any) -> None:
1912 """
1913 Set the value of a persistent configuration setting
1914
1915 :param str key:
1916 :type val: str | None
1917 :raises ValueError: if `val` cannot be parsed or it is out of the specified range
1918 """
1919 self._validate_module_option(key)
1920 return self._set_module_option(key, val)
1921
1922 def set_module_option_ex(self, module: str, key: str, val: OptionValue) -> None:
1923 """
1924 Set the value of a persistent configuration setting
1925 for the specified module.
1926
1927 :param str module:
1928 :param str key:
1929 :param str val:
1930 """
1931 if module == self.module_name:
1932 self._validate_module_option(key)
1933 return self._ceph_set_module_option(module, key, str(val))
1934
1935 @API.perm('w')
1936 @API.expose
1937 def set_localized_module_option(self, key: str, val: Optional[str]) -> None:
1938 """
1939 Set localized configuration for this ceph-mgr instance
1940 :param str key:
1941 :param str val:
1942 :return: str
1943 """
1944 self._validate_module_option(key)
1945 return self._set_localized(key, val, self._set_module_option)
1946
1947 @API.perm('w')
1948 @API.expose
1949 def set_store(self, key: str, val: Optional[str]) -> None:
1950 """
1951 Set a value in this module's persistent key value store.
1952 If val is None, remove key from store
1953 """
1954 self._ceph_set_store(key, val)
1955
1956 @API.expose
1957 def get_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
1958 """
1959 Get a value from this module's persistent key value store
1960 """
1961 r = self._ceph_get_store(key)
1962 if r is None:
1963 return default
1964 else:
1965 return r
1966
1967 @API.expose
1968 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
1969 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
1970 if r is None:
1971 r = self._ceph_get_store(key)
1972 if r is None:
1973 r = default
1974 return r
1975
1976 @API.perm('w')
1977 @API.expose
1978 def set_localized_store(self, key: str, val: Optional[str]) -> None:
1979 return self._set_localized(key, val, self.set_store)
1980
1981 def self_test(self) -> Optional[str]:
1982 """
1983 Run a self-test on the module. Override this function and implement
1984 a best as possible self-test for (automated) testing of the module
1985
1986 Indicate any failures by raising an exception. This does not have
1987 to be pretty, it's mainly for picking up regressions during
1988 development, rather than use in the field.
1989
1990 :return: None, or an advisory string for developer interest, such
1991 as a json dump of some state.
1992 """
1993 pass
1994
1995 def get_osdmap(self) -> OSDMap:
1996 """
1997 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1998 OSDMap.
1999 :return: OSDMap
2000 """
2001 return cast(OSDMap, self._ceph_get_osdmap())
2002
2003 @API.expose
2004 def get_latest(self, daemon_type: str, daemon_name: str, counter: str) -> int:
2005 data = self.get_latest_counter(
2006 daemon_type, daemon_name, counter)[counter]
2007 if data:
2008 return data[1]
2009 else:
2010 return 0
2011
2012 @API.expose
2013 def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tuple[int, int]:
2014 data = self.get_latest_counter(
2015 daemon_type, daemon_name, counter)[counter]
2016 if data:
2017 # https://github.com/python/mypy/issues/1178
2018 _, value, count = cast(Tuple[float, int, int], data)
2019 return value, count
2020 else:
2021 return 0, 0
2022
2023 @API.expose
2024 @profile_method()
2025 def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL,
2026 services: Sequence[str] = ("mds", "mon", "osd",
2027 "rbd-mirror", "rgw",
2028 "tcmu-runner")) -> Dict[str, dict]:
2029 """
2030 Return the perf counters currently known to this ceph-mgr
2031 instance, filtered by priority equal to or greater than `prio_limit`.
2032
2033 The result is a map of string to dict, associating services
2034 (like "osd.123") with their counters. The counter
2035 dict for each service maps counter paths to a counter
2036 info structure, which is the information from
2037 the schema, plus an additional "value" member with the latest
2038 value.
2039 """
2040
2041 result = defaultdict(dict) # type: Dict[str, dict]
2042
2043 for server in self.list_servers():
2044 for service in cast(List[ServiceInfoT], server['services']):
2045 if service['type'] not in services:
2046 continue
2047
2048 schemas = self.get_perf_schema(service['type'], service['id'])
2049 if not schemas:
2050 self.log.warning("No perf counter schema for {0}.{1}".format(
2051 service['type'], service['id']
2052 ))
2053 continue
2054
2055 # Value is returned in a potentially-multi-service format,
2056 # get just the service we're asking about
2057 svc_full_name = "{0}.{1}".format(
2058 service['type'], service['id'])
2059 schema = schemas[svc_full_name]
2060
2061 # Populate latest values
2062 for counter_path, counter_schema in schema.items():
2063 # self.log.debug("{0}: {1}".format(
2064 # counter_path, json.dumps(counter_schema)
2065 # ))
2066 priority = counter_schema['priority']
2067 assert isinstance(priority, int)
2068 if priority < prio_limit:
2069 continue
2070
2071 tp = counter_schema['type']
2072 assert isinstance(tp, int)
2073 counter_info = dict(counter_schema)
2074 # Also populate count for the long running avgs
2075 if tp & self.PERFCOUNTER_LONGRUNAVG:
2076 v, c = self.get_latest_avg(
2077 service['type'],
2078 service['id'],
2079 counter_path
2080 )
2081 counter_info['value'], counter_info['count'] = v, c
2082 result[svc_full_name][counter_path] = counter_info
2083 else:
2084 counter_info['value'] = self.get_latest(
2085 service['type'],
2086 service['id'],
2087 counter_path
2088 )
2089
2090 result[svc_full_name][counter_path] = counter_info
2091
2092 self.log.debug("returning {0} counter".format(len(result)))
2093
2094 return result
2095
2096 @API.expose
2097 def set_uri(self, uri: str) -> None:
2098 """
2099 If the module exposes a service, then call this to publish the
2100 address once it is available.
2101
2102 :return: a string
2103 """
2104 return self._ceph_set_uri(uri)
2105
2106 @API.perm('w')
2107 @API.expose
2108 def set_device_wear_level(self, devid: str, wear_level: float) -> None:
2109 return self._ceph_set_device_wear_level(devid, wear_level)
2110
2111 @API.expose
2112 def have_mon_connection(self) -> bool:
2113 """
2114 Check whether this ceph-mgr daemon has an open connection
2115 to a monitor. If it doesn't, then it's likely that the
2116 information we have about the cluster is out of date,
2117 and/or the monitor cluster is down.
2118 """
2119
2120 return self._ceph_have_mon_connection()
2121
2122 def update_progress_event(self,
2123 evid: str,
2124 desc: str,
2125 progress: float,
2126 add_to_ceph_s: bool) -> None:
2127 return self._ceph_update_progress_event(evid, desc, progress, add_to_ceph_s)
2128
2129 @API.perm('w')
2130 @API.expose
2131 def complete_progress_event(self, evid: str) -> None:
2132 return self._ceph_complete_progress_event(evid)
2133
2134 @API.perm('w')
2135 @API.expose
2136 def clear_all_progress_events(self) -> None:
2137 return self._ceph_clear_all_progress_events()
2138
2139 @property
2140 def rados(self) -> rados.Rados:
2141 """
2142 A librados instance to be shared by any classes within
2143 this mgr module that want one.
2144 """
2145 if self._rados:
2146 return self._rados
2147
2148 ctx_capsule = self.get_context()
2149 self._rados = rados.Rados(context=ctx_capsule)
2150 self._rados.connect()
2151 self._ceph_register_client(self._rados.get_addrs())
2152 return self._rados
2153
2154 @staticmethod
2155 def can_run() -> Tuple[bool, str]:
2156 """
2157 Implement this function to report whether the module's dependencies
2158 are met. For example, if the module needs to import a particular
2159 dependency to work, then use a try/except around the import at
2160 file scope, and then report here if the import failed.
2161
2162 This will be called in a blocking way from the C++ code, so do not
2163 do any I/O that could block in this function.
2164
2165 :return a 2-tuple consisting of a boolean and explanatory string
2166 """
2167
2168 return True, ""
2169
2170 @API.expose
2171 def remote(self, module_name: str, method_name: str, *args: Any, **kwargs: Any) -> Any:
2172 """
2173 Invoke a method on another module. All arguments, and the return
2174 value from the other module must be serializable.
2175
2176 Limitation: Do not import any modules within the called method.
2177 Otherwise you will get an error in Python 2::
2178
2179 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
2180
2181
2182
2183 :param module_name: Name of other module. If module isn't loaded,
2184 an ImportError exception is raised.
2185 :param method_name: Method name. If it does not exist, a NameError
2186 exception is raised.
2187 :param args: Argument tuple
2188 :param kwargs: Keyword argument dict
2189 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
2190 :raises ImportError: No such module
2191 """
2192 return self._ceph_dispatch_remote(module_name, method_name,
2193 args, kwargs)
2194
2195 def add_osd_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
2196 """
2197 Register an OSD perf query. Argument is a
2198 dict of the query parameters, in this form:
2199
2200 ::
2201
2202 {
2203 'key_descriptor': [
2204 {'type': subkey_type, 'regex': regex_pattern},
2205 ...
2206 ],
2207 'performance_counter_descriptors': [
2208 list, of, descriptor, types
2209 ],
2210 'limit': {'order_by': performance_counter_type, 'max_count': n},
2211 }
2212
2213 Valid subkey types:
2214 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
2215 'pg_id', 'object_name', 'snap_id'
2216 Valid performance counter types:
2217 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
2218 'latency', 'write_latency', 'read_latency'
2219
2220 :param object query: query
2221 :rtype: int (query id)
2222 """
2223 return self._ceph_add_osd_perf_query(query)
2224
2225 @API.perm('w')
2226 @API.expose
2227 def remove_osd_perf_query(self, query_id: int) -> None:
2228 """
2229 Unregister an OSD perf query.
2230
2231 :param int query_id: query ID
2232 """
2233 return self._ceph_remove_osd_perf_query(query_id)
2234
2235 @API.expose
2236 def get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
2237 """
2238 Get stats collected for an OSD perf query.
2239
2240 :param int query_id: query ID
2241 """
2242 return self._ceph_get_osd_perf_counters(query_id)
2243
2244 def add_mds_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
2245 """
2246 Register an MDS perf query. Argument is a
2247 dict of the query parameters, in this form:
2248
2249 ::
2250
2251 {
2252 'key_descriptor': [
2253 {'type': subkey_type, 'regex': regex_pattern},
2254 ...
2255 ],
2256 'performance_counter_descriptors': [
2257 list, of, descriptor, types
2258 ],
2259 }
2260
2261 NOTE: 'limit' and 'order_by' are not supported (yet).
2262
2263 Valid subkey types:
2264 'mds_rank', 'client_id'
2265 Valid performance counter types:
2266 'cap_hit_metric'
2267
2268 :param object query: query
2269 :rtype: int (query id)
2270 """
2271 return self._ceph_add_mds_perf_query(query)
2272
2273 @API.perm('w')
2274 @API.expose
2275 def remove_mds_perf_query(self, query_id: int) -> None:
2276 """
2277 Unregister an MDS perf query.
2278
2279 :param int query_id: query ID
2280 """
2281 return self._ceph_remove_mds_perf_query(query_id)
2282
2283 @API.expose
2284
2285 def reregister_mds_perf_queries(self) -> None:
2286 """
2287 Re-register MDS perf queries.
2288 """
2289 return self._ceph_reregister_mds_perf_queries()
2290
2291 def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
2292 """
2293 Get stats collected for an MDS perf query.
2294
2295 :param int query_id: query ID
2296 """
2297 return self._ceph_get_mds_perf_counters(query_id)
2298
2299 def get_daemon_health_metrics(self) -> Dict[str, List[Dict[str, Any]]]:
2300 """
2301 Get the list of health metrics per daemon. This includes SLOW_OPS health metrics
2302 in MON and OSD daemons, and PENDING_CREATING_PGS health metrics for OSDs.
2303 """
2304 return self._ceph_get_daemon_health_metrics()
2305
2306 def is_authorized(self, arguments: Dict[str, str]) -> bool:
2307 """
2308 Verifies that the current session caps permit executing the py service
2309 or current module with the provided arguments. This provides a generic
2310 way to allow modules to restrict by more fine-grained controls (e.g.
2311 pools).
2312
2313 :param arguments: dict of key/value arguments to test
2314 """
2315 return self._ceph_is_authorized(arguments)
2316
2317 @API.expose
2318 def send_rgwadmin_command(self, args: List[str],
2319 stdout_as_json: bool = True) -> Tuple[int, Union[str, dict], str]:
2320 try:
2321 cmd = [
2322 'radosgw-admin',
2323 '-c', str(self.get_ceph_conf_path()),
2324 '-k', str(self.get_ceph_option('keyring')),
2325 '-n', f'mgr.{self.get_mgr_id()}',
2326 ] + args
2327 self.log.debug('Executing %s', str(cmd))
2328 result = subprocess.run( # pylint: disable=subprocess-run-check
2329 cmd,
2330 stdout=subprocess.PIPE,
2331 stderr=subprocess.PIPE,
2332 timeout=10,
2333 )
2334 stdout = result.stdout.decode('utf-8')
2335 stderr = result.stderr.decode('utf-8')
2336 if stdout and stdout_as_json:
2337 stdout = json.loads(stdout)
2338 if result.returncode:
2339 self.log.debug('Error %s executing %s: %s', result.returncode, str(cmd), stderr)
2340 return result.returncode, stdout, stderr
2341 except subprocess.CalledProcessError as ex:
2342 self.log.exception('Error executing radosgw-admin %s: %s', str(ex.cmd), str(ex.output))
2343 raise
2344 except subprocess.TimeoutExpired as ex:
2345 self.log.error('Timeout (10s) executing radosgw-admin %s', str(ex.cmd))
2346 raise