]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
1 import ceph_module # noqa
2
3 from typing import cast, Tuple, Any, Dict, Generic, Optional, Callable, List, \
4 Mapping, NamedTuple, Sequence, Union, Set, TYPE_CHECKING
5 if TYPE_CHECKING:
6 import sys
7 if sys.version_info >= (3, 8):
8 from typing import Literal
9 else:
10 from typing_extensions import Literal
11
12 import inspect
13 import logging
14 import errno
15 import functools
16 import json
17 import subprocess
18 import threading
19 from collections import defaultdict
20 from enum import IntEnum, Enum
21 import rados
22 import re
23 import socket
24 import sqlite3
25 import sys
26 import time
27 from ceph_argparse import CephArgtype
28 from mgr_util import profile_method
29
30 if sys.version_info >= (3, 8):
31 from typing import get_args, get_origin
32 else:
33 def get_args(tp: Any) -> Any:
34 if tp is Generic:
35 return tp
36 else:
37 return getattr(tp, '__args__', ())
38
39 def get_origin(tp: Any) -> Any:
40 return getattr(tp, '__origin__', None)
41
42
43 ERROR_MSG_EMPTY_INPUT_FILE = 'Empty input file'
44 ERROR_MSG_NO_INPUT_FILE = 'Input file not specified'
45 # Full list of strings in "osd_types.cc:pg_state_string()"
46 PG_STATES = [
47 "active",
48 "clean",
49 "down",
50 "recovery_unfound",
51 "backfill_unfound",
52 "scrubbing",
53 "degraded",
54 "inconsistent",
55 "peering",
56 "repair",
57 "recovering",
58 "forced_recovery",
59 "backfill_wait",
60 "incomplete",
61 "stale",
62 "remapped",
63 "deep",
64 "backfilling",
65 "forced_backfill",
66 "backfill_toofull",
67 "recovery_wait",
68 "recovery_toofull",
69 "undersized",
70 "activating",
71 "peered",
72 "snaptrim",
73 "snaptrim_wait",
74 "snaptrim_error",
75 "creating",
76 "unknown",
77 "premerge",
78 "failed_repair",
79 "laggy",
80 "wait",
81 ]
82
83 NFS_GANESHA_SUPPORTED_FSALS = ['CEPH', 'RGW']
84 NFS_POOL_NAME = '.nfs'
85
86
87 class NotifyType(str, Enum):
88 mon_map = 'mon_map'
89 pg_summary = 'pg_summary'
90 health = 'health'
91 clog = 'clog'
92 osd_map = 'osd_map'
93 fs_map = 'fs_map'
94 command = 'command'
95
96 # these are disabled because there are no users.
97 # see Mgr.cc:
98 # service_map = 'service_map'
99 # mon_status = 'mon_status'
100 # see DaemonServer.cc:
101 # perf_schema_update = 'perf_schema_update'
102
103
104 class CommandResult(object):
105 """
106 Use with MgrModule.send_command
107 """
108
109 def __init__(self, tag: Optional[str] = None):
110 self.ev = threading.Event()
111 self.outs = ""
112 self.outb = ""
113 self.r = 0
114
115 # This is just a convenience for notifications from
116 # C++ land, to avoid passing addresses around in messages.
117 self.tag = tag if tag else ""
118
119 def complete(self, r: int, outb: str, outs: str) -> None:
120 self.r = r
121 self.outb = outb
122 self.outs = outs
123 self.ev.set()
124
125 def wait(self) -> Tuple[int, str, str]:
126 self.ev.wait()
127 return self.r, self.outb, self.outs
128
129
130 class HandleCommandResult(NamedTuple):
131 """
132 Tuple containing the result of `handle_command()`
133
134 Only write to stderr if there is an error, or in extraordinary circumstances
135
136 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
137 is critical information to include there.
138
139 Everything programmatically consumable should be put on stdout
140 """
141 retval: int = 0 # return code. E.g. 0 or -errno.EINVAL
142 stdout: str = "" # data of this result.
143 stderr: str = "" # Typically used for error messages.
144
145
146 class MonCommandFailed(RuntimeError): pass
147 class MgrDBNotReady(RuntimeError): pass
148
149
150 class OSDMap(ceph_module.BasePyOSDMap):
151 def get_epoch(self) -> int:
152 return self._get_epoch()
153
154 def get_crush_version(self) -> int:
155 return self._get_crush_version()
156
157 def dump(self) -> Dict[str, Any]:
158 return self._dump()
159
160 def get_pools(self) -> Dict[int, Dict[str, Any]]:
161 # FIXME: efficient implementation
162 d = self._dump()
163 return dict([(p['pool'], p) for p in d['pools']])
164
165 def get_pools_by_name(self) -> Dict[str, Dict[str, Any]]:
166 # FIXME: efficient implementation
167 d = self._dump()
168 return dict([(p['pool_name'], p) for p in d['pools']])
169
170 def new_incremental(self) -> 'OSDMapIncremental':
171 return self._new_incremental()
172
173 def apply_incremental(self, inc: 'OSDMapIncremental') -> 'OSDMap':
174 return self._apply_incremental(inc)
175
176 def get_crush(self) -> 'CRUSHMap':
177 return self._get_crush()
178
179 def get_pools_by_take(self, take: int) -> List[int]:
180 return self._get_pools_by_take(take).get('pools', [])
181
182 def calc_pg_upmaps(self, inc: 'OSDMapIncremental',
183 max_deviation: int,
184 max_iterations: int = 10,
185 pools: Optional[List[str]] = None) -> int:
186 if pools is None:
187 pools = []
188 return self._calc_pg_upmaps(
189 inc,
190 max_deviation, max_iterations, pools)
191
192 def map_pool_pgs_up(self, poolid: int) -> List[int]:
193 return self._map_pool_pgs_up(poolid)
194
195 def pg_to_up_acting_osds(self, pool_id: int, ps: int) -> Dict[str, Any]:
196 return self._pg_to_up_acting_osds(pool_id, ps)
197
198 def pool_raw_used_rate(self, pool_id: int) -> float:
199 return self._pool_raw_used_rate(pool_id)
200
201 @classmethod
202 def build_simple(cls, epoch: int = 1, uuid: Optional[str] = None, num_osd: int = -1) -> 'ceph_module.BasePyOSDMap':
203 return cls._build_simple(epoch, uuid, num_osd)
204
205 def get_ec_profile(self, name: str) -> Optional[List[Dict[str, str]]]:
206 # FIXME: efficient implementation
207 d = self._dump()
208 return d['erasure_code_profiles'].get(name, None)
209
210 def get_require_osd_release(self) -> str:
211 d = self._dump()
212 return d['require_osd_release']
213
214
215 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
216 def get_epoch(self) -> int:
217 return self._get_epoch()
218
219 def dump(self) -> Dict[str, Any]:
220 return self._dump()
221
222 def set_osd_reweights(self, weightmap: Dict[int, float]) -> None:
223 """
224 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
225 """
226 return self._set_osd_reweights(weightmap)
227
228 def set_crush_compat_weight_set_weights(self, weightmap: Dict[str, float]) -> None:
229 """
230 weightmap is a dict, int to float. devices only. e.g.,
231 { 0: 3.4, 1: 3.3, 2: 3.334 }
232 """
233 return self._set_crush_compat_weight_set_weights(weightmap)
234
235
236 class CRUSHMap(ceph_module.BasePyCRUSH):
237 ITEM_NONE = 0x7fffffff
238 DEFAULT_CHOOSE_ARGS = '-1'
239
240 def dump(self) -> Dict[str, Any]:
241 return self._dump()
242
243 def get_item_weight(self, item: int) -> Optional[int]:
244 return self._get_item_weight(item)
245
246 def get_item_name(self, item: int) -> Optional[str]:
247 return self._get_item_name(item)
248
249 def find_takes(self) -> List[int]:
250 return self._find_takes().get('takes', [])
251
252 def find_roots(self) -> List[int]:
253 return self._find_roots().get('roots', [])
254
255 def get_take_weight_osd_map(self, root: int) -> Dict[int, float]:
256 uglymap = self._get_take_weight_osd_map(root)
257 return {int(k): v for k, v in uglymap.get('weights', {}).items()}
258
259 @staticmethod
260 def have_default_choose_args(dump: Dict[str, Any]) -> bool:
261 return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
262
263 @staticmethod
264 def get_default_choose_args(dump: Dict[str, Any]) -> List[Dict[str, Any]]:
265 choose_args = dump.get('choose_args')
266 assert isinstance(choose_args, dict)
267 return choose_args.get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
268
269 def get_rule(self, rule_name: str) -> Optional[Dict[str, Any]]:
270 # TODO efficient implementation
271 for rule in self.dump()['rules']:
272 if rule_name == rule['rule_name']:
273 return rule
274
275 return None
276
277 def get_rule_by_id(self, rule_id: int) -> Optional[Dict[str, Any]]:
278 for rule in self.dump()['rules']:
279 if rule['rule_id'] == rule_id:
280 return rule
281
282 return None
283
284 def get_rule_root(self, rule_name: str) -> Optional[int]:
285 rule = self.get_rule(rule_name)
286 if rule is None:
287 return None
288
289 try:
290 first_take = next(s for s in rule['steps'] if s.get('op') == 'take')
291 except StopIteration:
292 logging.warning("CRUSH rule '{0}' has no 'take' step".format(
293 rule_name))
294 return None
295 else:
296 return first_take['item']
297
298 def get_osds_under(self, root_id: int) -> List[int]:
299 # TODO don't abuse dump like this
300 d = self.dump()
301 buckets = dict([(b['id'], b) for b in d['buckets']])
302
303 osd_list = []
304
305 def accumulate(b: Dict[str, Any]) -> None:
306 for item in b['items']:
307 if item['id'] >= 0:
308 osd_list.append(item['id'])
309 else:
310 try:
311 accumulate(buckets[item['id']])
312 except KeyError:
313 pass
314
315 accumulate(buckets[root_id])
316
317 return osd_list
318
319 def device_class_counts(self) -> Dict[str, int]:
320 result = defaultdict(int) # type: Dict[str, int]
321 # TODO don't abuse dump like this
322 d = self.dump()
323 for device in d['devices']:
324 cls = device.get('class', None)
325 result[cls] += 1
326
327 return dict(result)
328
329
330 HandlerFuncType = Callable[..., Tuple[int, str, str]]
331
332 def _extract_target_func(
333 f: HandlerFuncType
334 ) -> Tuple[HandlerFuncType, Dict[str, Any]]:
335 """In order to interoperate with other decorated functions,
336 we need to find the original function which will provide
337 the main set of arguments. While we descend through the
338 stack of wrapped functions, gather additional arguments
339 the decorators may want to provide.
340 """
341 # use getattr to keep mypy happy
342 wrapped = getattr(f, "__wrapped__", None)
343 if not wrapped:
344 return f, {}
345 extra_args: Dict[str, Any] = {}
346 while wrapped is not None:
347 extra_args.update(getattr(f, "extra_args", {}))
348 f = wrapped
349 wrapped = getattr(f, "__wrapped__", None)
350 return f, extra_args
351
352
353 class CLICommand(object):
354 COMMANDS = {} # type: Dict[str, CLICommand]
355
356 def __init__(self,
357 prefix: str,
358 perm: str = 'rw',
359 poll: bool = False):
360 self.prefix = prefix
361 self.perm = perm
362 self.poll = poll
363 self.func = None # type: Optional[Callable]
364 self.arg_spec = {} # type: Dict[str, Any]
365 self.first_default = -1
366
367 KNOWN_ARGS = '_', 'self', 'mgr', 'inbuf', 'return'
368
369 @classmethod
370 def _load_func_metadata(cls: Any, f: HandlerFuncType) -> Tuple[str, Dict[str, Any], int, str]:
371 f, extra_args = _extract_target_func(f)
372 desc = (inspect.getdoc(f) or '').replace('\n', ' ')
373 full_argspec = inspect.getfullargspec(f)
374 arg_spec = full_argspec.annotations
375 first_default = len(arg_spec)
376 if full_argspec.defaults:
377 first_default -= len(full_argspec.defaults)
378 args = []
379 positional = True
380 for index, arg in enumerate(full_argspec.args):
381 if arg in cls.KNOWN_ARGS:
382 # record that this function takes an inbuf if it is present
383 # in the full_argspec and not already in the arg_spec
384 if arg == 'inbuf' and 'inbuf' not in arg_spec:
385 arg_spec['inbuf'] = 'str'
386 continue
387 if arg == '_end_positional_':
388 positional = False
389 continue
390 if (
391 arg == 'format'
392 or arg_spec[arg] is Optional[bool]
393 or arg_spec[arg] is bool
394 ):
395 # implicit switch to non-positional on any
396 # Optional[bool] or the --format option
397 positional = False
398 assert arg in arg_spec, \
399 f"'{arg}' is not annotated for {f}: {full_argspec}"
400 has_default = index >= first_default
401 args.append(CephArgtype.to_argdesc(arg_spec[arg],
402 dict(name=arg),
403 has_default,
404 positional))
405 for argname, argtype in extra_args.items():
406 # avoid shadowing args from the function
407 if argname in arg_spec:
408 continue
409 arg_spec[argname] = argtype
410 args.append(CephArgtype.to_argdesc(
411 argtype, dict(name=argname), has_default=True, positional=False
412 ))
413 return desc, arg_spec, first_default, ' '.join(args)
414
415 def store_func_metadata(self, f: HandlerFuncType) -> None:
416 self.desc, self.arg_spec, self.first_default, self.args = \
417 self._load_func_metadata(f)
418
419 def __call__(self, func: HandlerFuncType) -> HandlerFuncType:
420 self.store_func_metadata(func)
421 self.func = func
422 self.COMMANDS[self.prefix] = self
423 return self.func
424
425 def _get_arg_value(self, kwargs_switch: bool, key: str, val: Any) -> Tuple[bool, str, Any]:
426 def start_kwargs() -> bool:
427 if isinstance(val, str) and '=' in val:
428 k, v = val.split('=', 1)
429 if k in self.arg_spec:
430 return True
431 return False
432
433 if not kwargs_switch:
434 kwargs_switch = start_kwargs()
435
436 if kwargs_switch:
437 k, v = val.split('=', 1)
438 else:
439 k, v = key, val
440 return kwargs_switch, k.replace('-', '_'), v
441
442 def _collect_args_by_argspec(self, cmd_dict: Dict[str, Any]) -> Tuple[Dict[str, Any], Set[str]]:
443 kwargs = {}
444 special_args = set()
445 kwargs_switch = False
446 for index, (name, tp) in enumerate(self.arg_spec.items()):
447 if name in CLICommand.KNOWN_ARGS:
448 special_args.add(name)
449 continue
450 assert self.first_default >= 0
451 raw_v = cmd_dict.get(name)
452 if index >= self.first_default:
453 if raw_v is None:
454 continue
455 kwargs_switch, k, v = self._get_arg_value(kwargs_switch,
456 name, raw_v)
457 kwargs[k] = CephArgtype.cast_to(tp, v)
458 return kwargs, special_args
459
460 def call(self,
461 mgr: Any,
462 cmd_dict: Dict[str, Any],
463 inbuf: Optional[str] = None) -> HandleCommandResult:
464 kwargs, specials = self._collect_args_by_argspec(cmd_dict)
465 if inbuf:
466 if 'inbuf' not in specials:
467 return HandleCommandResult(
468 -errno.EINVAL,
469 '',
470 'Invalid command: Input file data (-i) not supported',
471 )
472 kwargs['inbuf'] = inbuf
473 assert self.func
474 return self.func(mgr, **kwargs)
475
476 def dump_cmd(self) -> Dict[str, Union[str, bool]]:
477 return {
478 'cmd': '{} {}'.format(self.prefix, self.args),
479 'desc': self.desc,
480 'perm': self.perm,
481 'poll': self.poll,
482 }
483
484 @classmethod
485 def dump_cmd_list(cls) -> List[Dict[str, Union[str, bool]]]:
486 return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
487
488
489 def CLIReadCommand(prefix: str, poll: bool = False) -> CLICommand:
490 return CLICommand(prefix, "r", poll)
491
492
493 def CLIWriteCommand(prefix: str, poll: bool = False) -> CLICommand:
494 return CLICommand(prefix, "w", poll)
495
496
497 def CLICheckNonemptyFileInput(desc: str) -> Callable[[HandlerFuncType], HandlerFuncType]:
498 def CheckFileInput(func: HandlerFuncType) -> HandlerFuncType:
499 @functools.wraps(func)
500 def check(*args: Any, **kwargs: Any) -> Tuple[int, str, str]:
501 if 'inbuf' not in kwargs:
502 return -errno.EINVAL, '', f'{ERROR_MSG_NO_INPUT_FILE}: Please specify the file '\
503 f'containing {desc} with "-i" option'
504 if isinstance(kwargs['inbuf'], str):
505 # Delete new line separator at EOF (it may have been added by a text editor).
506 kwargs['inbuf'] = kwargs['inbuf'].rstrip('\r\n').rstrip('\n')
507 if not kwargs['inbuf'] or not kwargs['inbuf'].strip():
508 return -errno.EINVAL, '', f'{ERROR_MSG_EMPTY_INPUT_FILE}: Please add {desc} to '\
509 'the file'
510 return func(*args, **kwargs)
511 check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
512 return check
513 return CheckFileInput
514
515 # If the mgr loses its lock on the database because e.g. the pgs were
516 # transiently down, then close it and allow it to be reopened.
517 MAX_DBCLEANUP_RETRIES = 3
518 def MgrModuleRecoverDB(func: Callable) -> Callable:
519 @functools.wraps(func)
520 def check(self: MgrModule, *args: Any, **kwargs: Any) -> Any:
521 retries = 0
522 while True:
523 try:
524 return func(self, *args, **kwargs)
525 except sqlite3.DatabaseError as e:
526 self.log.error(f"Caught fatal database error: {e}")
527 retries = retries+1
528 if retries > MAX_DBCLEANUP_RETRIES:
529 raise
530 self.log.debug(f"attempting reopen of database")
531 self.close_db()
532 self.open_db();
533 # allow retry of func(...)
534 check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
535 return check
536
537 def CLIRequiresDB(func: HandlerFuncType) -> HandlerFuncType:
538 @functools.wraps(func)
539 def check(self: MgrModule, *args: Any, **kwargs: Any) -> Tuple[int, str, str]:
540 if not self.db_ready():
541 return -errno.EAGAIN, "", "mgr db not yet available"
542 return func(self, *args, **kwargs)
543 check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
544 return check
545
546 def _get_localized_key(prefix: str, key: str) -> str:
547 return '{}/{}'.format(prefix, key)
548
549
550 """
551 MODULE_OPTIONS types and Option Class
552 """
553 if TYPE_CHECKING:
554 OptionTypeLabel = Literal[
555 'uint', 'int', 'str', 'float', 'bool', 'addr', 'addrvec', 'uuid', 'size', 'secs']
556
557
558 # common/options.h: value_t
559 OptionValue = Optional[Union[bool, int, float, str]]
560
561
562 class Option(Dict):
563 """
564 Helper class to declare options for MODULE_OPTIONS list.
565 TODO: Replace with typing.TypedDict when in python_version >= 3.8
566 """
567
568 def __init__(
569 self,
570 name: str,
571 default: OptionValue = None,
572 type: 'OptionTypeLabel' = 'str',
573 desc: Optional[str] = None,
574 long_desc: Optional[str] = None,
575 min: OptionValue = None,
576 max: OptionValue = None,
577 enum_allowed: Optional[List[str]] = None,
578 tags: Optional[List[str]] = None,
579 see_also: Optional[List[str]] = None,
580 runtime: bool = False,
581 ):
582 super(Option, self).__init__(
583 (k, v) for k, v in vars().items()
584 if k != 'self' and v is not None)
585
586
587 class Command(dict):
588 """
589 Helper class to declare options for COMMANDS list.
590
591 It also allows to specify prefix and args separately, as well as storing a
592 handler callable.
593
594 Usage:
595 >>> def handler(): return 0, "", ""
596 >>> Command(prefix="example",
597 ... handler=handler,
598 ... perm='w')
599 {'perm': 'w', 'poll': False}
600 """
601
602 def __init__(
603 self,
604 prefix: str,
605 handler: HandlerFuncType,
606 perm: str = "rw",
607 poll: bool = False,
608 ):
609 super().__init__(perm=perm,
610 poll=poll)
611 self.prefix = prefix
612 self.handler = handler
613
614 @staticmethod
615 def returns_command_result(instance: Any,
616 f: HandlerFuncType) -> Callable[..., HandleCommandResult]:
617 @functools.wraps(f)
618 def wrapper(mgr: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
619 retval, stdout, stderr = f(instance or mgr, *args, **kwargs)
620 return HandleCommandResult(retval, stdout, stderr)
621 wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined]
622 return wrapper
623
624 def register(self, instance: bool = False) -> HandlerFuncType:
625 """
626 Register a CLICommand handler. It allows an instance to register bound
627 methods. In that case, the mgr instance is not passed, and it's expected
628 to be available in the class instance.
629 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
630 items.
631 """
632 cmd = CLICommand(prefix=self.prefix, perm=self['perm'])
633 return cmd(self.returns_command_result(instance, self.handler))
634
635
636 class CPlusPlusHandler(logging.Handler):
637 def __init__(self, module_inst: Any):
638 super(CPlusPlusHandler, self).__init__()
639 self._module = module_inst
640 self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
641 .format(module_inst.module_name)))
642
643 def emit(self, record: logging.LogRecord) -> None:
644 if record.levelno >= self.level:
645 self._module._ceph_log(self.format(record))
646
647
648 class ClusterLogHandler(logging.Handler):
649 def __init__(self, module_inst: Any):
650 super().__init__()
651 self._module = module_inst
652 self.setFormatter(logging.Formatter("%(message)s"))
653
654 def emit(self, record: logging.LogRecord) -> None:
655 levelmap = {
656 logging.DEBUG: MgrModule.ClusterLogPrio.DEBUG,
657 logging.INFO: MgrModule.ClusterLogPrio.INFO,
658 logging.WARNING: MgrModule.ClusterLogPrio.WARN,
659 logging.ERROR: MgrModule.ClusterLogPrio.ERROR,
660 logging.CRITICAL: MgrModule.ClusterLogPrio.ERROR,
661 }
662 level = levelmap[record.levelno]
663 if record.levelno >= self.level:
664 self._module.cluster_log(self._module.module_name,
665 level,
666 self.format(record))
667
668
669 class FileHandler(logging.FileHandler):
670 def __init__(self, module_inst: Any):
671 path = module_inst.get_ceph_option("log_file")
672 idx = path.rfind(".log")
673 if idx != -1:
674 self.path = "{}.{}.log".format(path[:idx], module_inst.module_name)
675 else:
676 self.path = "{}.{}".format(path, module_inst.module_name)
677 super(FileHandler, self).__init__(self.path, delay=True)
678 self.setFormatter(logging.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
679
680
681 class MgrModuleLoggingMixin(object):
682 def _configure_logging(self,
683 mgr_level: str,
684 module_level: str,
685 cluster_level: str,
686 log_to_file: bool,
687 log_to_cluster: bool) -> None:
688 self._mgr_level: Optional[str] = None
689 self._module_level: Optional[str] = None
690 self._root_logger = logging.getLogger()
691
692 self._unconfigure_logging()
693
694 # the ceph log handler is initialized only once
695 self._mgr_log_handler = CPlusPlusHandler(self)
696 self._cluster_log_handler = ClusterLogHandler(self)
697 self._file_log_handler = FileHandler(self)
698
699 self.log_to_file = log_to_file
700 self.log_to_cluster = log_to_cluster
701
702 self._root_logger.addHandler(self._mgr_log_handler)
703 if log_to_file:
704 self._root_logger.addHandler(self._file_log_handler)
705 if log_to_cluster:
706 self._root_logger.addHandler(self._cluster_log_handler)
707
708 self._root_logger.setLevel(logging.NOTSET)
709 self._set_log_level(mgr_level, module_level, cluster_level)
710
711 def _unconfigure_logging(self) -> None:
712 # remove existing handlers:
713 rm_handlers = [
714 h for h in self._root_logger.handlers
715 if (isinstance(h, CPlusPlusHandler) or
716 isinstance(h, FileHandler) or
717 isinstance(h, ClusterLogHandler))]
718 for h in rm_handlers:
719 self._root_logger.removeHandler(h)
720 self.log_to_file = False
721 self.log_to_cluster = False
722
723 def _set_log_level(self,
724 mgr_level: str,
725 module_level: str,
726 cluster_level: str) -> None:
727 self._cluster_log_handler.setLevel(cluster_level.upper())
728
729 module_level = module_level.upper() if module_level else ''
730 if not self._module_level:
731 # using debug_mgr level
732 if not module_level and self._mgr_level == mgr_level:
733 # no change in module level neither in debug_mgr
734 return
735 else:
736 if self._module_level == module_level:
737 # no change in module level
738 return
739
740 if not self._module_level and not module_level:
741 level = self._ceph_log_level_to_python(mgr_level)
742 self.getLogger().debug("setting log level based on debug_mgr: %s (%s)",
743 level, mgr_level)
744 elif self._module_level and not module_level:
745 level = self._ceph_log_level_to_python(mgr_level)
746 self.getLogger().warning("unsetting module log level, falling back to "
747 "debug_mgr level: %s (%s)", level, mgr_level)
748 elif module_level:
749 level = module_level
750 self.getLogger().debug("setting log level: %s", level)
751
752 self._module_level = module_level
753 self._mgr_level = mgr_level
754
755 self._mgr_log_handler.setLevel(level)
756 self._file_log_handler.setLevel(level)
757
758 def _enable_file_log(self) -> None:
759 # enable file log
760 self.getLogger().warning("enabling logging to file")
761 self.log_to_file = True
762 self._root_logger.addHandler(self._file_log_handler)
763
764 def _disable_file_log(self) -> None:
765 # disable file log
766 self.getLogger().warning("disabling logging to file")
767 self.log_to_file = False
768 self._root_logger.removeHandler(self._file_log_handler)
769
770 def _enable_cluster_log(self) -> None:
771 # enable cluster log
772 self.getLogger().warning("enabling logging to cluster")
773 self.log_to_cluster = True
774 self._root_logger.addHandler(self._cluster_log_handler)
775
776 def _disable_cluster_log(self) -> None:
777 # disable cluster log
778 self.getLogger().warning("disabling logging to cluster")
779 self.log_to_cluster = False
780 self._root_logger.removeHandler(self._cluster_log_handler)
781
782 def _ceph_log_level_to_python(self, log_level: str) -> str:
783 if log_level:
784 try:
785 ceph_log_level = int(log_level.split("/", 1)[0])
786 except ValueError:
787 ceph_log_level = 0
788 else:
789 ceph_log_level = 0
790
791 log_level = "DEBUG"
792 if ceph_log_level <= 0:
793 log_level = "CRITICAL"
794 elif ceph_log_level <= 1:
795 log_level = "WARNING"
796 elif ceph_log_level <= 4:
797 log_level = "INFO"
798 return log_level
799
800 def getLogger(self, name: Optional[str] = None) -> logging.Logger:
801 return logging.getLogger(name)
802
803
804 class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
805 """
806 Standby modules only implement a serve and shutdown method, they
807 are not permitted to implement commands and they do not receive
808 any notifications.
809
810 They only have access to the mgrmap (for accessing service URI info
811 from their active peer), and to configuration settings (read only).
812 """
813
814 MODULE_OPTIONS: List[Option] = []
815 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
816
817 def __init__(self, module_name: str, capsule: Any):
818 super(MgrStandbyModule, self).__init__(capsule)
819 self.module_name = module_name
820
821 # see also MgrModule.__init__()
822 for o in self.MODULE_OPTIONS:
823 if 'default' in o:
824 if 'type' in o:
825 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
826 else:
827 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
828
829 # mock does not return a str
830 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
831 log_level = cast(str, self.get_module_option("log_level"))
832 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
833 self._configure_logging(mgr_level, log_level, cluster_level,
834 False, False)
835
836 # for backwards compatibility
837 self._logger = self.getLogger()
838
839 def __del__(self) -> None:
840 self._cleanup()
841 self._unconfigure_logging()
842
843 def _cleanup(self) -> None:
844 pass
845
846 @classmethod
847 def _register_options(cls, module_name: str) -> None:
848 cls.MODULE_OPTIONS.append(
849 Option(name='log_level', type='str', default="", runtime=True,
850 enum_allowed=['info', 'debug', 'critical', 'error',
851 'warning', '']))
852 cls.MODULE_OPTIONS.append(
853 Option(name='log_to_file', type='bool', default=False, runtime=True))
854 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
855 cls.MODULE_OPTIONS.append(
856 Option(name='log_to_cluster', type='bool', default=False,
857 runtime=True))
858 cls.MODULE_OPTIONS.append(
859 Option(name='log_to_cluster_level', type='str', default='info',
860 runtime=True,
861 enum_allowed=['info', 'debug', 'critical', 'error',
862 'warning', '']))
863
864 @property
865 def log(self) -> logging.Logger:
866 return self._logger
867
868 def serve(self) -> None:
869 """
870 The serve method is mandatory for standby modules.
871 :return:
872 """
873 raise NotImplementedError()
874
875 def get_mgr_id(self) -> str:
876 return self._ceph_get_mgr_id()
877
878 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
879 """
880 Retrieve the value of a persistent configuration setting
881
882 :param default: the default value of the config if it is not found
883 """
884 r = self._ceph_get_module_option(key)
885 if r is None:
886 return self.MODULE_OPTION_DEFAULTS.get(key, default)
887 else:
888 return r
889
890 def get_ceph_option(self, key: str) -> OptionValue:
891 return self._ceph_get_option(key)
892
893 def get_store(self, key: str) -> Optional[str]:
894 """
895 Retrieve the value of a persistent KV store entry
896
897 :param key: String
898 :return: Byte string or None
899 """
900 return self._ceph_get_store(key)
901
902 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
903 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
904 if r is None:
905 r = self._ceph_get_store(key)
906 if r is None:
907 r = default
908 return r
909
910 def get_active_uri(self) -> str:
911 return self._ceph_get_active_uri()
912
913 def get(self, data_name: str) -> Dict[str, Any]:
914 return self._ceph_get(data_name)
915
916 def get_mgr_ip(self) -> str:
917 ips = self.get("mgr_ips").get('ips', [])
918 if not ips:
919 return socket.gethostname()
920 return ips[0]
921
922 def get_hostname(self) -> str:
923 return socket.gethostname()
924
925 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
926 r = self._ceph_get_module_option(key, self.get_mgr_id())
927 if r is None:
928 return self.MODULE_OPTION_DEFAULTS.get(key, default)
929 else:
930 return r
931
932
933 HealthChecksT = Mapping[str, Mapping[str, Union[int, str, Sequence[str]]]]
934 # {"type": service_type, "id": service_id}
935 ServiceInfoT = Dict[str, str]
936 # {"hostname": hostname,
937 # "ceph_version": version,
938 # "services": [service_info, ..]}
939 ServerInfoT = Dict[str, Union[str, List[ServiceInfoT]]]
940 PerfCounterT = Dict[str, Any]
941
942
943 class API:
944 def DecoratorFactory(attr: str, default: Any): # type: ignore
945 class DecoratorClass:
946 _ATTR_TOKEN = f'__ATTR_{attr.upper()}__'
947
948 def __init__(self, value: Any=default) -> None:
949 self.value = value
950
951 def __call__(self, func: Callable) -> Any:
952 setattr(func, self._ATTR_TOKEN, self.value)
953 return func
954
955 @classmethod
956 def get(cls, func: Callable) -> Any:
957 return getattr(func, cls._ATTR_TOKEN, default)
958
959 return DecoratorClass
960
961 perm = DecoratorFactory('perm', default='r')
962 expose = DecoratorFactory('expose', default=False)(True)
963
964
965 class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
966 MGR_POOL_NAME = ".mgr"
967
968 COMMANDS = [] # type: List[Any]
969 MODULE_OPTIONS: List[Option] = []
970 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
971
972 # Database Schema
973 SCHEMA = None # type: Optional[str]
974 SCHEMA_VERSIONED = None # type: Optional[List[str]]
975
976 # Priority definitions for perf counters
977 PRIO_CRITICAL = 10
978 PRIO_INTERESTING = 8
979 PRIO_USEFUL = 5
980 PRIO_UNINTERESTING = 2
981 PRIO_DEBUGONLY = 0
982
983 # counter value types
984 PERFCOUNTER_TIME = 1
985 PERFCOUNTER_U64 = 2
986
987 # counter types
988 PERFCOUNTER_LONGRUNAVG = 4
989 PERFCOUNTER_COUNTER = 8
990 PERFCOUNTER_HISTOGRAM = 0x10
991 PERFCOUNTER_TYPE_MASK = ~3
992
993 # units supported
994 BYTES = 0
995 NONE = 1
996
997 # Cluster log priorities
998 class ClusterLogPrio(IntEnum):
999 DEBUG = 0
1000 INFO = 1
1001 SEC = 2
1002 WARN = 3
1003 ERROR = 4
1004
1005 def __init__(self, module_name: str, py_modules_ptr: object, this_ptr: object):
1006 self.module_name = module_name
1007 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
1008
1009 for o in self.MODULE_OPTIONS:
1010 if 'default' in o:
1011 if 'type' in o:
1012 # we'll assume the declared type matches the
1013 # supplied default value's type.
1014 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
1015 else:
1016 # module not declaring it's type, so normalize the
1017 # default value to be a string for consistent behavior
1018 # with default and user-supplied option values.
1019 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
1020
1021 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
1022 log_level = cast(str, self.get_module_option("log_level"))
1023 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
1024 log_to_file = self.get_module_option("log_to_file")
1025 assert isinstance(log_to_file, bool)
1026 log_to_cluster = self.get_module_option("log_to_cluster")
1027 assert isinstance(log_to_cluster, bool)
1028 self._configure_logging(mgr_level, log_level, cluster_level,
1029 log_to_file, log_to_cluster)
1030
1031 # for backwards compatibility
1032 self._logger = self.getLogger()
1033
1034 self._db = None # type: Optional[sqlite3.Connection]
1035
1036 self._version = self._ceph_get_version()
1037
1038 self._perf_schema_cache = None
1039
1040 # Keep a librados instance for those that need it.
1041 self._rados: Optional[rados.Rados] = None
1042
1043 # this does not change over the lifetime of an active mgr
1044 self._mgr_ips: Optional[str] = None
1045
1046 self._db_lock = threading.Lock()
1047
1048 def __del__(self) -> None:
1049 self._unconfigure_logging()
1050
1051 @classmethod
1052 def _register_options(cls, module_name: str) -> None:
1053 cls.MODULE_OPTIONS.append(
1054 Option(name='log_level', type='str', default="", runtime=True,
1055 enum_allowed=['info', 'debug', 'critical', 'error',
1056 'warning', '']))
1057 cls.MODULE_OPTIONS.append(
1058 Option(name='log_to_file', type='bool', default=False, runtime=True))
1059 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
1060 cls.MODULE_OPTIONS.append(
1061 Option(name='log_to_cluster', type='bool', default=False,
1062 runtime=True))
1063 cls.MODULE_OPTIONS.append(
1064 Option(name='log_to_cluster_level', type='str', default='info',
1065 runtime=True,
1066 enum_allowed=['info', 'debug', 'critical', 'error',
1067 'warning', '']))
1068
1069 @classmethod
1070 def _register_commands(cls, module_name: str) -> None:
1071 cls.COMMANDS.extend(CLICommand.dump_cmd_list())
1072
1073 @property
1074 def log(self) -> logging.Logger:
1075 return self._logger
1076
1077 def cluster_log(self, channel: str, priority: ClusterLogPrio, message: str) -> None:
1078 """
1079 :param channel: The log channel. This can be 'cluster', 'audit', ...
1080 :param priority: The log message priority.
1081 :param message: The message to log.
1082 """
1083 self._ceph_cluster_log(channel, priority.value, message)
1084
1085 @property
1086 def version(self) -> str:
1087 return self._version
1088
1089 @API.expose
1090 def pool_exists(self, name: str) -> bool:
1091 pools = [p['pool_name'] for p in self.get('osd_map')['pools']]
1092 return name in pools
1093
1094 @API.expose
1095 def have_enough_osds(self) -> bool:
1096 # wait until we have enough OSDs to allow the pool to be healthy
1097 ready = 0
1098 for osd in self.get("osd_map")["osds"]:
1099 if osd["up"] and osd["in"]:
1100 ready += 1
1101
1102 need = cast(int, self.get_ceph_option("osd_pool_default_size"))
1103 return ready >= need
1104
1105 @API.perm('w')
1106 @API.expose
1107 def rename_pool(self, srcpool: str, destpool: str) -> None:
1108 c = {
1109 'prefix': 'osd pool rename',
1110 'format': 'json',
1111 'srcpool': srcpool,
1112 'destpool': destpool,
1113 'yes_i_really_mean_it': True
1114 }
1115 self.check_mon_command(c)
1116
1117 @API.perm('w')
1118 @API.expose
1119 def create_pool(self, pool: str) -> None:
1120 c = {
1121 'prefix': 'osd pool create',
1122 'format': 'json',
1123 'pool': pool,
1124 'pg_num': 1,
1125 'pg_num_min': 1,
1126 'pg_num_max': 32,
1127 'yes_i_really_mean_it': True
1128 }
1129 self.check_mon_command(c)
1130
1131 @API.perm('w')
1132 @API.expose
1133 def appify_pool(self, pool: str, app: str) -> None:
1134 c = {
1135 'prefix': 'osd pool application enable',
1136 'format': 'json',
1137 'pool': pool,
1138 'app': app,
1139 'yes_i_really_mean_it': True
1140 }
1141 self.check_mon_command(c)
1142
1143 @API.perm('w')
1144 @API.expose
1145 def create_mgr_pool(self) -> None:
1146 self.log.info("creating mgr pool")
1147
1148 ov = self.get_module_option_ex('devicehealth', 'pool_name', 'device_health_metrics')
1149 devhealth = cast(str, ov)
1150 if devhealth is not None and self.pool_exists(devhealth):
1151 self.log.debug("reusing devicehealth pool")
1152 self.rename_pool(devhealth, self.MGR_POOL_NAME)
1153 self.appify_pool(self.MGR_POOL_NAME, 'mgr')
1154 else:
1155 self.log.debug("creating new mgr pool")
1156 self.create_pool(self.MGR_POOL_NAME)
1157 self.appify_pool(self.MGR_POOL_NAME, 'mgr')
1158
1159 def create_skeleton_schema(self, db: sqlite3.Connection) -> None:
1160 SQL = """
1161 CREATE TABLE IF NOT EXISTS MgrModuleKV (
1162 key TEXT PRIMARY KEY,
1163 value NOT NULL
1164 ) WITHOUT ROWID;
1165 INSERT OR IGNORE INTO MgrModuleKV (key, value) VALUES ('__version', 0);
1166 """
1167
1168 db.executescript(SQL)
1169
1170 def update_schema_version(self, db: sqlite3.Connection, version: int) -> None:
1171 SQL = "UPDATE OR ROLLBACK MgrModuleKV SET value = ? WHERE key = '__version';"
1172
1173 db.execute(SQL, (version,))
1174
1175 def set_kv(self, key: str, value: Any) -> None:
1176 SQL = "INSERT OR REPLACE INTO MgrModuleKV (key, value) VALUES (?, ?);"
1177
1178 assert key[:2] != "__"
1179
1180 self.log.debug(f"set_kv('{key}', '{value}')")
1181
1182 with self._db_lock, self.db:
1183 self.db.execute(SQL, (key, value))
1184
1185 @API.expose
1186 def get_kv(self, key: str) -> Any:
1187 SQL = "SELECT value FROM MgrModuleKV WHERE key = ?;"
1188
1189 assert key[:2] != "__"
1190
1191 self.log.debug(f"get_kv('{key}')")
1192
1193 with self._db_lock, self.db:
1194 cur = self.db.execute(SQL, (key,))
1195 row = cur.fetchone()
1196 if row is None:
1197 return None
1198 else:
1199 v = row['value']
1200 self.log.debug(f" = {v}")
1201 return v
1202
1203 def maybe_upgrade(self, db: sqlite3.Connection, version: int) -> None:
1204 if version <= 0:
1205 self.log.info(f"creating main.db for {self.module_name}")
1206 assert self.SCHEMA is not None
1207 db.executescript(self.SCHEMA)
1208 self.update_schema_version(db, 1)
1209 else:
1210 assert self.SCHEMA_VERSIONED is not None
1211 latest = len(self.SCHEMA_VERSIONED)
1212 if latest < version:
1213 raise RuntimeError(f"main.db version is newer ({version}) than module ({latest})")
1214 for i in range(version, latest):
1215 self.log.info(f"upgrading main.db for {self.module_name} from {i-1}:{i}")
1216 SQL = self.SCHEMA_VERSIONED[i]
1217 db.executescript(SQL)
1218 if version < latest:
1219 self.update_schema_version(db, latest)
1220
1221 def load_schema(self, db: sqlite3.Connection) -> None:
1222 SQL = """
1223 SELECT value FROM MgrModuleKV WHERE key = '__version';
1224 """
1225
1226 with db:
1227 self.create_skeleton_schema(db)
1228 cur = db.execute(SQL)
1229 row = cur.fetchone()
1230 self.maybe_upgrade(db, int(row['value']))
1231 assert cur.fetchone() is None
1232 cur.close()
1233
1234 def configure_db(self, db: sqlite3.Connection) -> None:
1235 db.execute('PRAGMA FOREIGN_KEYS = 1')
1236 db.execute('PRAGMA JOURNAL_MODE = PERSIST')
1237 db.execute('PRAGMA PAGE_SIZE = 65536')
1238 db.execute('PRAGMA CACHE_SIZE = 64')
1239 db.execute('PRAGMA TEMP_STORE = memory')
1240 db.row_factory = sqlite3.Row
1241 self.load_schema(db)
1242
1243 def close_db(self) -> None:
1244 with self._db_lock:
1245 if self._db is not None:
1246 self._db.close()
1247 self._db = None
1248
1249 def open_db(self) -> Optional[sqlite3.Connection]:
1250 if not self.pool_exists(self.MGR_POOL_NAME):
1251 if not self.have_enough_osds():
1252 return None
1253 self.create_mgr_pool()
1254 uri = f"file:///{self.MGR_POOL_NAME}:{self.module_name}/main.db?vfs=ceph";
1255 self.log.debug(f"using uri {uri}")
1256 db = sqlite3.connect(uri, check_same_thread=False, uri=True)
1257 # if libcephsqlite reconnects, update the addrv for blocklist
1258 with db:
1259 cur = db.execute('SELECT json_extract(ceph_status(), "$.addr");')
1260 (addrv,) = cur.fetchone()
1261 assert addrv is not None
1262 self.log.debug(f"new libcephsqlite addrv = {addrv}")
1263 self._ceph_register_client("libcephsqlite", addrv, True)
1264 self.configure_db(db)
1265 return db
1266
1267 @API.expose
1268 def db_ready(self) -> bool:
1269 with self._db_lock:
1270 try:
1271 return self.db is not None
1272 except MgrDBNotReady:
1273 return False
1274
1275 @property
1276 def db(self) -> sqlite3.Connection:
1277 assert self._db_lock.locked()
1278 if self._db is not None:
1279 return self._db
1280 db_allowed = self.get_ceph_option("mgr_pool")
1281 if not db_allowed:
1282 raise MgrDBNotReady();
1283 self._db = self.open_db()
1284 if self._db is None:
1285 raise MgrDBNotReady();
1286 return self._db
1287
1288 @property
1289 def release_name(self) -> str:
1290 """
1291 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
1292 :return: Returns the release name of the Ceph version in lower case.
1293 :rtype: str
1294 """
1295 return self._ceph_get_release_name()
1296
1297 @API.expose
1298 def lookup_release_name(self, major: int) -> str:
1299 return self._ceph_lookup_release_name(major)
1300
1301 def get_context(self) -> object:
1302 """
1303 :return: a Python capsule containing a C++ CephContext pointer
1304 """
1305 return self._ceph_get_context()
1306
1307 def notify(self, notify_type: NotifyType, notify_id: str) -> None:
1308 """
1309 Called by the ceph-mgr service to notify the Python plugin
1310 that new state is available. This method is *only* called for
1311 notify_types that are listed in the NOTIFY_TYPES string list
1312 member of the module class.
1313
1314 :param notify_type: string indicating what kind of notification,
1315 such as osd_map, mon_map, fs_map, mon_status,
1316 health, pg_summary, command, service_map
1317 :param notify_id: string (may be empty) that optionally specifies
1318 which entity is being notified about. With
1319 "command" notifications this is set to the tag
1320 ``from send_command``.
1321 """
1322 pass
1323
1324 def _config_notify(self) -> None:
1325 # check logging options for changes
1326 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
1327 module_level = cast(str, self.get_module_option("log_level"))
1328 cluster_level = cast(str, self.get_module_option("log_to_cluster_level"))
1329 assert isinstance(cluster_level, str)
1330 log_to_file = self.get_module_option("log_to_file", False)
1331 assert isinstance(log_to_file, bool)
1332 log_to_cluster = self.get_module_option("log_to_cluster", False)
1333 assert isinstance(log_to_cluster, bool)
1334 self._set_log_level(mgr_level, module_level, cluster_level)
1335
1336 if log_to_file != self.log_to_file:
1337 if log_to_file:
1338 self._enable_file_log()
1339 else:
1340 self._disable_file_log()
1341 if log_to_cluster != self.log_to_cluster:
1342 if log_to_cluster:
1343 self._enable_cluster_log()
1344 else:
1345 self._disable_cluster_log()
1346
1347 # call module subclass implementations
1348 self.config_notify()
1349
1350 def config_notify(self) -> None:
1351 """
1352 Called by the ceph-mgr service to notify the Python plugin
1353 that the configuration may have changed. Modules will want to
1354 refresh any configuration values stored in config variables.
1355 """
1356 pass
1357
1358 def serve(self) -> None:
1359 """
1360 Called by the ceph-mgr service to start any server that
1361 is provided by this Python plugin. The implementation
1362 of this function should block until ``shutdown`` is called.
1363
1364 You *must* implement ``shutdown`` if you implement ``serve``
1365 """
1366 pass
1367
1368 def shutdown(self) -> None:
1369 """
1370 Called by the ceph-mgr service to request that this
1371 module drop out of its serve() function. You do not
1372 need to implement this if you do not implement serve()
1373
1374 :return: None
1375 """
1376 if self._rados:
1377 addrs = self._rados.get_addrs()
1378 self._rados.shutdown()
1379 self._ceph_unregister_client(None, addrs)
1380 self._rados = None
1381
1382 @API.expose
1383 def get(self, data_name: str) -> Any:
1384 """
1385 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
1386
1387 :param str data_name: Valid things to fetch are osdmap_crush_map_text,
1388 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
1389 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
1390 health, mon_status, devices, device <devid>, pg_stats,
1391 pool_stats, pg_ready, osd_ping_times, mgr_map, mgr_ips,
1392 modified_config_options, service_map, mds_metadata,
1393 have_local_config_map, osd_pool_stats, pg_status.
1394
1395 Note:
1396 All these structures have their own JSON representations: experiment
1397 or look at the C++ ``dump()`` methods to learn about them.
1398 """
1399 obj = self._ceph_get(data_name)
1400 if isinstance(obj, bytes):
1401 obj = json.loads(obj)
1402
1403 return obj
1404
1405 def _stattype_to_str(self, stattype: int) -> str:
1406
1407 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
1408 if typeonly == 0:
1409 return 'gauge'
1410 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
1411 # this lie matches the DaemonState decoding: only val, no counts
1412 return 'counter'
1413 if typeonly == self.PERFCOUNTER_COUNTER:
1414 return 'counter'
1415 if typeonly == self.PERFCOUNTER_HISTOGRAM:
1416 return 'histogram'
1417
1418 return ''
1419
1420 def _perfpath_to_path_labels(self, daemon: str,
1421 path: str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]:
1422 if daemon.startswith('rgw.'):
1423 label_name = 'instance_id'
1424 daemon = daemon[len('rgw.'):]
1425 else:
1426 label_name = 'ceph_daemon'
1427
1428 label_names = (label_name,) # type: Tuple[str, ...]
1429 labels = (daemon,) # type: Tuple[str, ...]
1430
1431 if daemon.startswith('rbd-mirror.'):
1432 match = re.match(
1433 r'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
1434 path
1435 )
1436 if match:
1437 path = 'rbd_mirror_image_' + match.group(4)
1438 pool = match.group(1)
1439 namespace = match.group(2) or ''
1440 image = match.group(3)
1441 label_names += ('pool', 'namespace', 'image')
1442 labels += (pool, namespace, image)
1443
1444 return path, label_names, labels,
1445
1446 def _perfvalue_to_value(self, stattype: int, value: Union[int, float]) -> Union[float, int]:
1447 if stattype & self.PERFCOUNTER_TIME:
1448 # Convert from ns to seconds
1449 return value / 1000000000.0
1450 else:
1451 return value
1452
1453 def _unit_to_str(self, unit: int) -> str:
1454 if unit == self.NONE:
1455 return "/s"
1456 elif unit == self.BYTES:
1457 return "B/s"
1458 else:
1459 raise ValueError(f'bad unit "{unit}"')
1460
1461 @staticmethod
1462 def to_pretty_iec(n: int) -> str:
1463 for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
1464 (20, 'Mi'), (10, 'Ki')]:
1465 if n > 10 << bits:
1466 return str(n >> bits) + ' ' + suffix
1467 return str(n) + ' '
1468
1469 @staticmethod
1470 def get_pretty_row(elems: Sequence[str], width: int) -> str:
1471 """
1472 Takes an array of elements and returns a string with those elements
1473 formatted as a table row. Useful for polling modules.
1474
1475 :param elems: the elements to be printed
1476 :param width: the width of the terminal
1477 """
1478 n = len(elems)
1479 column_width = int(width / n)
1480
1481 ret = '|'
1482 for elem in elems:
1483 ret += '{0:>{w}} |'.format(elem, w=column_width - 2)
1484
1485 return ret
1486
1487 def get_pretty_header(self, elems: Sequence[str], width: int) -> str:
1488 """
1489 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
1490
1491 :param elems: the elements to be printed
1492 :param width: the width of the terminal
1493 """
1494 n = len(elems)
1495 column_width = int(width / n)
1496
1497 # dash line
1498 ret = '+'
1499 for i in range(0, n):
1500 ret += '-' * (column_width - 1) + '+'
1501 ret += '\n'
1502
1503 # title
1504 ret += self.get_pretty_row(elems, width)
1505 ret += '\n'
1506
1507 # dash line
1508 ret += '+'
1509 for i in range(0, n):
1510 ret += '-' * (column_width - 1) + '+'
1511 ret += '\n'
1512
1513 return ret
1514
1515 @API.expose
1516 def get_server(self, hostname: str) -> ServerInfoT:
1517 """
1518 Called by the plugin to fetch metadata about a particular hostname from
1519 ceph-mgr.
1520
1521 This is information that ceph-mgr has gleaned from the daemon metadata
1522 reported by daemons running on a particular server.
1523
1524 :param hostname: a hostname
1525 """
1526 return cast(ServerInfoT, self._ceph_get_server(hostname))
1527
1528 @API.expose
1529 def get_perf_schema(self,
1530 svc_type: str,
1531 svc_name: str) -> Dict[str,
1532 Dict[str, Dict[str, Union[str, int]]]]:
1533 """
1534 Called by the plugin to fetch perf counter schema info.
1535 svc_name can be nullptr, as can svc_type, in which case
1536 they are wildcards
1537
1538 :param str svc_type:
1539 :param str svc_name:
1540 :return: list of dicts describing the counters requested
1541 """
1542 return self._ceph_get_perf_schema(svc_type, svc_name)
1543
1544 def get_rocksdb_version(self) -> str:
1545 """
1546 Called by the plugin to fetch the latest RocksDB version number.
1547
1548 :return: str representing the major, minor, and patch RocksDB version numbers
1549 """
1550 return self._ceph_get_rocksdb_version()
1551
1552 @API.expose
1553 def get_counter(self,
1554 svc_type: str,
1555 svc_name: str,
1556 path: str) -> Dict[str, List[Tuple[float, int]]]:
1557 """
1558 Called by the plugin to fetch the latest performance counter data for a
1559 particular counter on a particular service.
1560
1561 :param str svc_type:
1562 :param str svc_name:
1563 :param str path: a period-separated concatenation of the subsystem and the
1564 counter name, for example "mds.inodes".
1565 :return: A dict of counter names to their values. each value is a list of
1566 of two-tuples of (timestamp, value). This may be empty if no data is
1567 available.
1568 """
1569 return self._ceph_get_counter(svc_type, svc_name, path)
1570
1571 @API.expose
1572 def get_latest_counter(self,
1573 svc_type: str,
1574 svc_name: str,
1575 path: str) -> Dict[str, Union[Tuple[float, int],
1576 Tuple[float, int, int]]]:
1577 """
1578 Called by the plugin to fetch only the newest performance counter data
1579 point for a particular counter on a particular service.
1580
1581 :param str svc_type:
1582 :param str svc_name:
1583 :param str path: a period-separated concatenation of the subsystem and the
1584 counter name, for example "mds.inodes".
1585 :return: A list of two-tuples of (timestamp, value) or three-tuple of
1586 (timestamp, value, count) is returned. This may be empty if no
1587 data is available.
1588 """
1589 return self._ceph_get_latest_counter(svc_type, svc_name, path)
1590
1591 @API.expose
1592 def list_servers(self) -> List[ServerInfoT]:
1593 """
1594 Like ``get_server``, but gives information about all servers (i.e. all
1595 unique hostnames that have been mentioned in daemon metadata)
1596
1597 :return: a list of information about all servers
1598 :rtype: list
1599 """
1600 return cast(List[ServerInfoT], self._ceph_get_server(None))
1601
1602 def get_metadata(self,
1603 svc_type: str,
1604 svc_id: str,
1605 default: Optional[Dict[str, str]] = None) -> Optional[Dict[str, str]]:
1606 """
1607 Fetch the daemon metadata for a particular service.
1608
1609 ceph-mgr fetches metadata asynchronously, so are windows of time during
1610 addition/removal of services where the metadata is not available to
1611 modules. ``None`` is returned if no metadata is available.
1612
1613 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1614 :param str svc_id: service id. convert OSD integer IDs to strings when
1615 calling this
1616 :rtype: dict, or None if no metadata found
1617 """
1618 metadata = self._ceph_get_metadata(svc_type, svc_id)
1619 if not metadata:
1620 return default
1621 return metadata
1622
1623 @API.expose
1624 def get_daemon_status(self, svc_type: str, svc_id: str) -> Dict[str, str]:
1625 """
1626 Fetch the latest status for a particular service daemon.
1627
1628 This method may return ``None`` if no status information is
1629 available, for example because the daemon hasn't fully started yet.
1630
1631 :param svc_type: string (e.g., 'rgw')
1632 :param svc_id: string
1633 :return: dict, or None if the service is not found
1634 """
1635 return self._ceph_get_daemon_status(svc_type, svc_id)
1636
1637 def check_mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> HandleCommandResult:
1638 """
1639 Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
1640 if ``retval != 0``.
1641 """
1642
1643 r = HandleCommandResult(*self.mon_command(cmd_dict, inbuf))
1644 if r.retval:
1645 raise MonCommandFailed(f'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
1646 return r
1647
1648 def mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1649 """
1650 Helper for modules that do simple, synchronous mon command
1651 execution.
1652
1653 See send_command for general case.
1654
1655 :return: status int, out std, err str
1656 """
1657
1658 t1 = time.time()
1659 result = CommandResult()
1660 self.send_command(result, "mon", "", json.dumps(cmd_dict), "", inbuf)
1661 r = result.wait()
1662 t2 = time.time()
1663
1664 self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1665 cmd_dict['prefix'], r[0], t2 - t1
1666 ))
1667
1668 return r
1669
1670 def osd_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1671 """
1672 Helper for osd command execution.
1673
1674 See send_command for general case. Also, see osd/OSD.cc for available commands.
1675
1676 :param dict cmd_dict: expects a prefix and an osd id, i.e.:
1677 cmd_dict = {
1678 'prefix': 'perf histogram dump',
1679 'id': '0'
1680 }
1681 :return: status int, out std, err str
1682 """
1683 t1 = time.time()
1684 result = CommandResult()
1685 self.send_command(result, "osd", cmd_dict['id'], json.dumps(cmd_dict), "", inbuf)
1686 r = result.wait()
1687 t2 = time.time()
1688
1689 self.log.debug("osd_command: '{0}' -> {1} in {2:.3f}s".format(
1690 cmd_dict['prefix'], r[0], t2 - t1
1691 ))
1692
1693 return r
1694
1695 def tell_command(self, daemon_type: str, daemon_id: str, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1696 """
1697 Helper for `ceph tell` command execution.
1698
1699 See send_command for general case.
1700
1701 :param dict cmd_dict: expects a prefix i.e.:
1702 cmd_dict = {
1703 'prefix': 'heap',
1704 'heapcmd': 'stats',
1705 }
1706 :return: status int, out std, err str
1707 """
1708 t1 = time.time()
1709 result = CommandResult()
1710 self.send_command(result, daemon_type, daemon_id, json.dumps(cmd_dict), "", inbuf)
1711 r = result.wait()
1712 t2 = time.time()
1713
1714 self.log.debug("tell_command on {0}.{1}: '{2}' -> {3} in {4:.5f}s".format(
1715 daemon_type, daemon_id, cmd_dict['prefix'], r[0], t2 - t1
1716 ))
1717
1718 return r
1719
1720 def send_command(
1721 self,
1722 result: CommandResult,
1723 svc_type: str,
1724 svc_id: str,
1725 command: str,
1726 tag: str,
1727 inbuf: Optional[str] = None) -> None:
1728 """
1729 Called by the plugin to send a command to the mon
1730 cluster.
1731
1732 :param CommandResult result: an instance of the ``CommandResult``
1733 class, defined in the same module as MgrModule. This acts as a
1734 completion and stores the output of the command. Use
1735 ``CommandResult.wait()`` if you want to block on completion.
1736 :param str svc_type:
1737 :param str svc_id:
1738 :param str command: a JSON-serialized command. This uses the same
1739 format as the ceph command line, which is a dictionary of command
1740 arguments, with the extra ``prefix`` key containing the command
1741 name itself. Consult MonCommands.h for available commands and
1742 their expected arguments.
1743 :param str tag: used for nonblocking operation: when a command
1744 completes, the ``notify()`` callback on the MgrModule instance is
1745 triggered, with notify_type set to "command", and notify_id set to
1746 the tag of the command.
1747 :param str inbuf: input buffer for sending additional data.
1748 """
1749 self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
1750
1751 def tool_exec(
1752 self,
1753 args: List[str],
1754 timeout: int = 10,
1755 stdin: Optional[bytes] = None
1756 ) -> Tuple[int, str, str]:
1757 try:
1758 tool = args.pop(0)
1759 cmd = [
1760 tool,
1761 '-k', str(self.get_ceph_option('keyring')),
1762 '-n', f'mgr.{self.get_mgr_id()}',
1763 ] + args
1764 self.log.debug('exec: ' + ' '.join(cmd))
1765 p = subprocess.run(
1766 cmd,
1767 input=stdin,
1768 stdout=subprocess.PIPE,
1769 stderr=subprocess.PIPE,
1770 timeout=timeout,
1771 )
1772 except subprocess.TimeoutExpired as ex:
1773 self.log.error(ex)
1774 return -errno.ETIMEDOUT, '', str(ex)
1775 if p.returncode:
1776 self.log.error(f'Non-zero return from {cmd}: {p.stderr.decode()}')
1777 return p.returncode, p.stdout.decode(), p.stderr.decode()
1778
1779 def set_health_checks(self, checks: HealthChecksT) -> None:
1780 """
1781 Set the module's current map of health checks. Argument is a
1782 dict of check names to info, in this form:
1783
1784 ::
1785
1786 {
1787 'CHECK_FOO': {
1788 'severity': 'warning', # or 'error'
1789 'summary': 'summary string',
1790 'count': 4, # quantify badness
1791 'detail': [ 'list', 'of', 'detail', 'strings' ],
1792 },
1793 'CHECK_BAR': {
1794 'severity': 'error',
1795 'summary': 'bars are bad',
1796 'detail': [ 'too hard' ],
1797 },
1798 }
1799
1800 :param list: dict of health check dicts
1801 """
1802 self._ceph_set_health_checks(checks)
1803
1804 def _handle_command(self,
1805 inbuf: str,
1806 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1807 Tuple[int, str, str]]:
1808 if cmd['prefix'] not in CLICommand.COMMANDS:
1809 return self.handle_command(inbuf, cmd)
1810
1811 return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
1812
1813 def handle_command(self,
1814 inbuf: str,
1815 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1816 Tuple[int, str, str]]:
1817 """
1818 Called by ceph-mgr to request the plugin to handle one
1819 of the commands that it declared in self.COMMANDS
1820
1821 Return a status code, an output buffer, and an
1822 output string. The output buffer is for data results,
1823 the output string is for informative text.
1824
1825 :param inbuf: content of any "-i <file>" supplied to ceph cli
1826 :type inbuf: str
1827 :param cmd: from Ceph's cmdmap_t
1828 :type cmd: dict
1829
1830 :return: HandleCommandResult or a 3-tuple of (int, str, str)
1831 """
1832
1833 # Should never get called if they didn't declare
1834 # any ``COMMANDS``
1835 raise NotImplementedError()
1836
1837 def get_mgr_id(self) -> str:
1838 """
1839 Retrieve the name of the manager daemon where this plugin
1840 is currently being executed (i.e. the active manager).
1841
1842 :return: str
1843 """
1844 return self._ceph_get_mgr_id()
1845
1846 @API.expose
1847 def get_ceph_conf_path(self) -> str:
1848 return self._ceph_get_ceph_conf_path()
1849
1850 @API.expose
1851 def get_mgr_ip(self) -> str:
1852 if not self._mgr_ips:
1853 ips = self.get("mgr_ips").get('ips', [])
1854 if not ips:
1855 return socket.gethostname()
1856 self._mgr_ips = ips[0]
1857 assert self._mgr_ips is not None
1858 return self._mgr_ips
1859
1860 @API.expose
1861 def get_hostname(self) -> str:
1862 return socket.gethostname()
1863
1864 @API.expose
1865 def get_ceph_option(self, key: str) -> OptionValue:
1866 return self._ceph_get_option(key)
1867
1868 @API.expose
1869 def get_foreign_ceph_option(self, entity: str, key: str) -> OptionValue:
1870 return self._ceph_get_foreign_option(entity, key)
1871
1872 def _validate_module_option(self, key: str) -> None:
1873 """
1874 Helper: don't allow get/set config callers to
1875 access config options that they didn't declare
1876 in their schema.
1877 """
1878 if key not in [o['name'] for o in self.MODULE_OPTIONS]:
1879 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1880 format(key, self.__class__.__name__))
1881
1882 def _get_module_option(self,
1883 key: str,
1884 default: OptionValue,
1885 localized_prefix: str = "") -> OptionValue:
1886 r = self._ceph_get_module_option(self.module_name, key,
1887 localized_prefix)
1888 if r is None:
1889 return self.MODULE_OPTION_DEFAULTS.get(key, default)
1890 else:
1891 return r
1892
1893 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
1894 """
1895 Retrieve the value of a persistent configuration setting
1896 """
1897 self._validate_module_option(key)
1898 return self._get_module_option(key, default)
1899
1900 def get_module_option_ex(self, module: str,
1901 key: str,
1902 default: OptionValue = None) -> OptionValue:
1903 """
1904 Retrieve the value of a persistent configuration setting
1905 for the specified module.
1906
1907 :param module: The name of the module, e.g. 'dashboard'
1908 or 'telemetry'.
1909 :param key: The configuration key, e.g. 'server_addr'.
1910 :param default: The default value to use when the
1911 returned value is ``None``. Defaults to ``None``.
1912 """
1913 if module == self.module_name:
1914 self._validate_module_option(key)
1915 r = self._ceph_get_module_option(module, key)
1916 return default if r is None else r
1917
1918 @API.expose
1919 def get_store_prefix(self, key_prefix: str) -> Dict[str, str]:
1920 """
1921 Retrieve a dict of KV store keys to values, where the keys
1922 have the given prefix
1923
1924 :param str key_prefix:
1925 :return: str
1926 """
1927 return self._ceph_get_store_prefix(key_prefix)
1928
1929 def _set_localized(self,
1930 key: str,
1931 val: Optional[str],
1932 setter: Callable[[str, Optional[str]], None]) -> None:
1933 return setter(_get_localized_key(self.get_mgr_id(), key), val)
1934
1935 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
1936 """
1937 Retrieve localized configuration for this ceph-mgr instance
1938 """
1939 self._validate_module_option(key)
1940 return self._get_module_option(key, default, self.get_mgr_id())
1941
1942 def _set_module_option(self, key: str, val: Any) -> None:
1943 return self._ceph_set_module_option(self.module_name, key,
1944 None if val is None else str(val))
1945
1946 def set_module_option(self, key: str, val: Any) -> None:
1947 """
1948 Set the value of a persistent configuration setting
1949
1950 :param str key:
1951 :type val: str | None
1952 :raises ValueError: if `val` cannot be parsed or it is out of the specified range
1953 """
1954 self._validate_module_option(key)
1955 return self._set_module_option(key, val)
1956
1957 def set_module_option_ex(self, module: str, key: str, val: OptionValue) -> None:
1958 """
1959 Set the value of a persistent configuration setting
1960 for the specified module.
1961
1962 :param str module:
1963 :param str key:
1964 :param str val:
1965 """
1966 if module == self.module_name:
1967 self._validate_module_option(key)
1968 return self._ceph_set_module_option(module, key, str(val))
1969
1970 @API.perm('w')
1971 @API.expose
1972 def set_localized_module_option(self, key: str, val: Optional[str]) -> None:
1973 """
1974 Set localized configuration for this ceph-mgr instance
1975 :param str key:
1976 :param str val:
1977 :return: str
1978 """
1979 self._validate_module_option(key)
1980 return self._set_localized(key, val, self._set_module_option)
1981
1982 @API.perm('w')
1983 @API.expose
1984 def set_store(self, key: str, val: Optional[str]) -> None:
1985 """
1986 Set a value in this module's persistent key value store.
1987 If val is None, remove key from store
1988 """
1989 self._ceph_set_store(key, val)
1990
1991 @API.expose
1992 def get_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
1993 """
1994 Get a value from this module's persistent key value store
1995 """
1996 r = self._ceph_get_store(key)
1997 if r is None:
1998 return default
1999 else:
2000 return r
2001
2002 @API.expose
2003 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
2004 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
2005 if r is None:
2006 r = self._ceph_get_store(key)
2007 if r is None:
2008 r = default
2009 return r
2010
2011 @API.perm('w')
2012 @API.expose
2013 def set_localized_store(self, key: str, val: Optional[str]) -> None:
2014 return self._set_localized(key, val, self.set_store)
2015
2016 def self_test(self) -> Optional[str]:
2017 """
2018 Run a self-test on the module. Override this function and implement
2019 a best as possible self-test for (automated) testing of the module
2020
2021 Indicate any failures by raising an exception. This does not have
2022 to be pretty, it's mainly for picking up regressions during
2023 development, rather than use in the field.
2024
2025 :return: None, or an advisory string for developer interest, such
2026 as a json dump of some state.
2027 """
2028 pass
2029
2030 def get_osdmap(self) -> OSDMap:
2031 """
2032 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
2033 OSDMap.
2034 :return: OSDMap
2035 """
2036 return cast(OSDMap, self._ceph_get_osdmap())
2037
2038 @API.expose
2039 def get_latest(self, daemon_type: str, daemon_name: str, counter: str) -> int:
2040 data = self.get_latest_counter(
2041 daemon_type, daemon_name, counter)[counter]
2042 if data:
2043 return data[1]
2044 else:
2045 return 0
2046
2047 @API.expose
2048 def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tuple[int, int]:
2049 data = self.get_latest_counter(
2050 daemon_type, daemon_name, counter)[counter]
2051 if data:
2052 # https://github.com/python/mypy/issues/1178
2053 _, value, count = cast(Tuple[float, int, int], data)
2054 return value, count
2055 else:
2056 return 0, 0
2057
2058 @API.expose
2059 @profile_method()
2060 def get_unlabeled_perf_counters(self, prio_limit: int = PRIO_USEFUL,
2061 services: Sequence[str] = ("mds", "mon", "osd",
2062 "rbd-mirror", "rgw",
2063 "tcmu-runner")) -> Dict[str, dict]:
2064 """
2065 Return the perf counters currently known to this ceph-mgr
2066 instance, filtered by priority equal to or greater than `prio_limit`.
2067
2068 The result is a map of string to dict, associating services
2069 (like "osd.123") with their counters. The counter
2070 dict for each service maps counter paths to a counter
2071 info structure, which is the information from
2072 the schema, plus an additional "value" member with the latest
2073 value.
2074 """
2075
2076 result = defaultdict(dict) # type: Dict[str, dict]
2077
2078 for server in self.list_servers():
2079 for service in cast(List[ServiceInfoT], server['services']):
2080 if service['type'] not in services:
2081 continue
2082
2083 schemas = self.get_perf_schema(service['type'], service['id'])
2084 if not schemas:
2085 self.log.warning("No perf counter schema for {0}.{1}".format(
2086 service['type'], service['id']
2087 ))
2088 continue
2089
2090 # Value is returned in a potentially-multi-service format,
2091 # get just the service we're asking about
2092 svc_full_name = "{0}.{1}".format(
2093 service['type'], service['id'])
2094 schema = schemas[svc_full_name]
2095
2096 # Populate latest values
2097 for counter_path, counter_schema in schema.items():
2098 # self.log.debug("{0}: {1}".format(
2099 # counter_path, json.dumps(counter_schema)
2100 # ))
2101 priority = counter_schema['priority']
2102 assert isinstance(priority, int)
2103 if priority < prio_limit:
2104 continue
2105
2106 tp = counter_schema['type']
2107 assert isinstance(tp, int)
2108 counter_info = dict(counter_schema)
2109 # Also populate count for the long running avgs
2110 if tp & self.PERFCOUNTER_LONGRUNAVG:
2111 v, c = self.get_latest_avg(
2112 service['type'],
2113 service['id'],
2114 counter_path
2115 )
2116 counter_info['value'], counter_info['count'] = v, c
2117 result[svc_full_name][counter_path] = counter_info
2118 else:
2119 counter_info['value'] = self.get_latest(
2120 service['type'],
2121 service['id'],
2122 counter_path
2123 )
2124
2125 result[svc_full_name][counter_path] = counter_info
2126
2127 self.log.debug("returning {0} counter".format(len(result)))
2128
2129 return result
2130
2131 @API.expose
2132 def set_uri(self, uri: str) -> None:
2133 """
2134 If the module exposes a service, then call this to publish the
2135 address once it is available.
2136
2137 :return: a string
2138 """
2139 return self._ceph_set_uri(uri)
2140
2141 @API.perm('w')
2142 @API.expose
2143 def set_device_wear_level(self, devid: str, wear_level: float) -> None:
2144 return self._ceph_set_device_wear_level(devid, wear_level)
2145
2146 @API.expose
2147 def have_mon_connection(self) -> bool:
2148 """
2149 Check whether this ceph-mgr daemon has an open connection
2150 to a monitor. If it doesn't, then it's likely that the
2151 information we have about the cluster is out of date,
2152 and/or the monitor cluster is down.
2153 """
2154
2155 return self._ceph_have_mon_connection()
2156
2157 def update_progress_event(self,
2158 evid: str,
2159 desc: str,
2160 progress: float,
2161 add_to_ceph_s: bool) -> None:
2162 return self._ceph_update_progress_event(evid, desc, progress, add_to_ceph_s)
2163
2164 @API.perm('w')
2165 @API.expose
2166 def complete_progress_event(self, evid: str) -> None:
2167 return self._ceph_complete_progress_event(evid)
2168
2169 @API.perm('w')
2170 @API.expose
2171 def clear_all_progress_events(self) -> None:
2172 return self._ceph_clear_all_progress_events()
2173
2174 @property
2175 def rados(self) -> rados.Rados:
2176 """
2177 A librados instance to be shared by any classes within
2178 this mgr module that want one.
2179 """
2180 if self._rados:
2181 return self._rados
2182
2183 ctx_capsule = self.get_context()
2184 self._rados = rados.Rados(context=ctx_capsule)
2185 self._rados.connect()
2186 self._ceph_register_client(None, self._rados.get_addrs(), False)
2187 return self._rados
2188
2189 @staticmethod
2190 def can_run() -> Tuple[bool, str]:
2191 """
2192 Implement this function to report whether the module's dependencies
2193 are met. For example, if the module needs to import a particular
2194 dependency to work, then use a try/except around the import at
2195 file scope, and then report here if the import failed.
2196
2197 This will be called in a blocking way from the C++ code, so do not
2198 do any I/O that could block in this function.
2199
2200 :return a 2-tuple consisting of a boolean and explanatory string
2201 """
2202
2203 return True, ""
2204
2205 @API.expose
2206 def remote(self, module_name: str, method_name: str, *args: Any, **kwargs: Any) -> Any:
2207 """
2208 Invoke a method on another module. All arguments, and the return
2209 value from the other module must be serializable.
2210
2211 Limitation: Do not import any modules within the called method.
2212 Otherwise you will get an error in Python 2::
2213
2214 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
2215
2216
2217
2218 :param module_name: Name of other module. If module isn't loaded,
2219 an ImportError exception is raised.
2220 :param method_name: Method name. If it does not exist, a NameError
2221 exception is raised.
2222 :param args: Argument tuple
2223 :param kwargs: Keyword argument dict
2224 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
2225 :raises ImportError: No such module
2226 """
2227 return self._ceph_dispatch_remote(module_name, method_name,
2228 args, kwargs)
2229
2230 def add_osd_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
2231 """
2232 Register an OSD perf query. Argument is a
2233 dict of the query parameters, in this form:
2234
2235 ::
2236
2237 {
2238 'key_descriptor': [
2239 {'type': subkey_type, 'regex': regex_pattern},
2240 ...
2241 ],
2242 'performance_counter_descriptors': [
2243 list, of, descriptor, types
2244 ],
2245 'limit': {'order_by': performance_counter_type, 'max_count': n},
2246 }
2247
2248 Valid subkey types:
2249 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
2250 'pg_id', 'object_name', 'snap_id'
2251 Valid performance counter types:
2252 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
2253 'latency', 'write_latency', 'read_latency'
2254
2255 :param object query: query
2256 :rtype: int (query id)
2257 """
2258 return self._ceph_add_osd_perf_query(query)
2259
2260 @API.perm('w')
2261 @API.expose
2262 def remove_osd_perf_query(self, query_id: int) -> None:
2263 """
2264 Unregister an OSD perf query.
2265
2266 :param int query_id: query ID
2267 """
2268 return self._ceph_remove_osd_perf_query(query_id)
2269
2270 @API.expose
2271 def get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
2272 """
2273 Get stats collected for an OSD perf query.
2274
2275 :param int query_id: query ID
2276 """
2277 return self._ceph_get_osd_perf_counters(query_id)
2278
2279 def add_mds_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
2280 """
2281 Register an MDS perf query. Argument is a
2282 dict of the query parameters, in this form:
2283
2284 ::
2285
2286 {
2287 'key_descriptor': [
2288 {'type': subkey_type, 'regex': regex_pattern},
2289 ...
2290 ],
2291 'performance_counter_descriptors': [
2292 list, of, descriptor, types
2293 ],
2294 }
2295
2296 NOTE: 'limit' and 'order_by' are not supported (yet).
2297
2298 Valid subkey types:
2299 'mds_rank', 'client_id'
2300 Valid performance counter types:
2301 'cap_hit_metric'
2302
2303 :param object query: query
2304 :rtype: int (query id)
2305 """
2306 return self._ceph_add_mds_perf_query(query)
2307
2308 @API.perm('w')
2309 @API.expose
2310 def remove_mds_perf_query(self, query_id: int) -> None:
2311 """
2312 Unregister an MDS perf query.
2313
2314 :param int query_id: query ID
2315 """
2316 return self._ceph_remove_mds_perf_query(query_id)
2317
2318 @API.expose
2319
2320 def reregister_mds_perf_queries(self) -> None:
2321 """
2322 Re-register MDS perf queries.
2323 """
2324 return self._ceph_reregister_mds_perf_queries()
2325
2326 def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
2327 """
2328 Get stats collected for an MDS perf query.
2329
2330 :param int query_id: query ID
2331 """
2332 return self._ceph_get_mds_perf_counters(query_id)
2333
2334 def get_daemon_health_metrics(self) -> Dict[str, List[Dict[str, Any]]]:
2335 """
2336 Get the list of health metrics per daemon. This includes SLOW_OPS health metrics
2337 in MON and OSD daemons, and PENDING_CREATING_PGS health metrics for OSDs.
2338 """
2339 return self._ceph_get_daemon_health_metrics()
2340
2341 def is_authorized(self, arguments: Dict[str, str]) -> bool:
2342 """
2343 Verifies that the current session caps permit executing the py service
2344 or current module with the provided arguments. This provides a generic
2345 way to allow modules to restrict by more fine-grained controls (e.g.
2346 pools).
2347
2348 :param arguments: dict of key/value arguments to test
2349 """
2350 return self._ceph_is_authorized(arguments)
2351
2352 @API.expose
2353 def send_rgwadmin_command(self, args: List[str],
2354 stdout_as_json: bool = True) -> Tuple[int, Union[str, dict], str]:
2355 try:
2356 cmd = [
2357 'radosgw-admin',
2358 '-c', str(self.get_ceph_conf_path()),
2359 '-k', str(self.get_ceph_option('keyring')),
2360 '-n', f'mgr.{self.get_mgr_id()}',
2361 ] + args
2362 self.log.debug('Executing %s', str(cmd))
2363 result = subprocess.run( # pylint: disable=subprocess-run-check
2364 cmd,
2365 stdout=subprocess.PIPE,
2366 stderr=subprocess.PIPE,
2367 timeout=10,
2368 )
2369 stdout = result.stdout.decode('utf-8')
2370 stderr = result.stderr.decode('utf-8')
2371 if stdout and stdout_as_json:
2372 stdout = json.loads(stdout)
2373 if result.returncode:
2374 self.log.debug('Error %s executing %s: %s', result.returncode, str(cmd), stderr)
2375 return result.returncode, stdout, stderr
2376 except subprocess.CalledProcessError as ex:
2377 self.log.exception('Error executing radosgw-admin %s: %s', str(ex.cmd), str(ex.output))
2378 raise
2379 except subprocess.TimeoutExpired as ex:
2380 self.log.error('Timeout (10s) executing radosgw-admin %s', str(ex.cmd))
2381 raise