]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/mgr_module.py
update ceph source to reef 18.2.0
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
CommitLineData
3efd9988 1import ceph_module # noqa
3efd9988 2
f67539c2 3from typing import cast, Tuple, Any, Dict, Generic, Optional, Callable, List, \
1e59de90 4 Mapping, NamedTuple, Sequence, Union, Set, TYPE_CHECKING
f67539c2
TL
5if TYPE_CHECKING:
6 import sys
7 if sys.version_info >= (3, 8):
8 from typing import Literal
9 else:
10 from typing_extensions import Literal
11
12import inspect
7c673cae 13import logging
9f95a23c 14import errno
f67539c2 15import functools
11fdf7f2 16import json
522d829b 17import subprocess
7c673cae 18import threading
f67539c2 19from collections import defaultdict
20effc67 20from enum import IntEnum, Enum
11fdf7f2 21import rados
81eedcae 22import re
b3b6e05e 23import socket
20effc67 24import sqlite3
f67539c2 25import sys
11fdf7f2 26import time
f67539c2 27from ceph_argparse import CephArgtype
f6b5b4d7 28from mgr_util import profile_method
11fdf7f2 29
f67539c2
TL
30if sys.version_info >= (3, 8):
31 from typing import get_args, get_origin
32else:
20effc67 33 def get_args(tp: Any) -> Any:
f67539c2
TL
34 if tp is Generic:
35 return tp
36 else:
37 return getattr(tp, '__args__', ())
38
20effc67 39 def get_origin(tp: Any) -> Any:
f67539c2
TL
40 return getattr(tp, '__origin__', None)
41
42
522d829b
TL
43ERROR_MSG_EMPTY_INPUT_FILE = 'Empty input file'
44ERROR_MSG_NO_INPUT_FILE = 'Input file not specified'
f6b5b4d7 45# Full list of strings in "osd_types.cc:pg_state_string()"
11fdf7f2
TL
46PG_STATES = [
47 "active",
48 "clean",
49 "down",
50 "recovery_unfound",
51 "backfill_unfound",
52 "scrubbing",
53 "degraded",
54 "inconsistent",
55 "peering",
56 "repair",
57 "recovering",
58 "forced_recovery",
59 "backfill_wait",
60 "incomplete",
61 "stale",
62 "remapped",
63 "deep",
64 "backfilling",
65 "forced_backfill",
66 "backfill_toofull",
67 "recovery_wait",
68 "recovery_toofull",
69 "undersized",
70 "activating",
71 "peered",
72 "snaptrim",
73 "snaptrim_wait",
74 "snaptrim_error",
75 "creating",
f6b5b4d7
TL
76 "unknown",
77 "premerge",
78 "failed_repair",
79 "laggy",
80 "wait",
81]
3efd9988 82
a4b75251
TL
83NFS_GANESHA_SUPPORTED_FSALS = ['CEPH', 'RGW']
84NFS_POOL_NAME = '.nfs'
85
3efd9988 86
20effc67
TL
87class NotifyType(str, Enum):
88 mon_map = 'mon_map'
89 pg_summary = 'pg_summary'
90 health = 'health'
91 clog = 'clog'
92 osd_map = 'osd_map'
93 fs_map = 'fs_map'
94 command = 'command'
95
96 # these are disabled because there are no users.
97 # see Mgr.cc:
98 # service_map = 'service_map'
99 # mon_status = 'mon_status'
100 # see DaemonServer.cc:
101 # perf_schema_update = 'perf_schema_update'
102
103
7c673cae
FG
104class CommandResult(object):
105 """
106 Use with MgrModule.send_command
107 """
11fdf7f2 108
f67539c2 109 def __init__(self, tag: Optional[str] = None):
7c673cae
FG
110 self.ev = threading.Event()
111 self.outs = ""
112 self.outb = ""
113 self.r = 0
114
115 # This is just a convenience for notifications from
116 # C++ land, to avoid passing addresses around in messages.
11fdf7f2 117 self.tag = tag if tag else ""
7c673cae 118
f67539c2 119 def complete(self, r: int, outb: str, outs: str) -> None:
7c673cae
FG
120 self.r = r
121 self.outb = outb
122 self.outs = outs
123 self.ev.set()
124
f67539c2 125 def wait(self) -> Tuple[int, str, str]:
7c673cae
FG
126 self.ev.wait()
127 return self.r, self.outb, self.outs
128
129
f67539c2
TL
130class HandleCommandResult(NamedTuple):
131 """
132 Tuple containing the result of `handle_command()`
11fdf7f2 133
f67539c2 134 Only write to stderr if there is an error, or in extraordinary circumstances
11fdf7f2 135
f67539c2
TL
136 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
137 is critical information to include there.
11fdf7f2 138
f67539c2
TL
139 Everything programmatically consumable should be put on stdout
140 """
141 retval: int = 0 # return code. E.g. 0 or -errno.EINVAL
142 stdout: str = "" # data of this result.
143 stderr: str = "" # Typically used for error messages.
11fdf7f2
TL
144
145
e306af50 146class MonCommandFailed(RuntimeError): pass
20effc67 147class MgrDBNotReady(RuntimeError): pass
e306af50
TL
148
149
3efd9988 150class OSDMap(ceph_module.BasePyOSDMap):
f67539c2 151 def get_epoch(self) -> int:
3efd9988
FG
152 return self._get_epoch()
153
f67539c2 154 def get_crush_version(self) -> int:
3efd9988
FG
155 return self._get_crush_version()
156
f67539c2 157 def dump(self) -> Dict[str, Any]:
3efd9988
FG
158 return self._dump()
159
f67539c2 160 def get_pools(self) -> Dict[int, Dict[str, Any]]:
11fdf7f2
TL
161 # FIXME: efficient implementation
162 d = self._dump()
163 return dict([(p['pool'], p) for p in d['pools']])
164
f67539c2 165 def get_pools_by_name(self) -> Dict[str, Dict[str, Any]]:
11fdf7f2
TL
166 # FIXME: efficient implementation
167 d = self._dump()
168 return dict([(p['pool_name'], p) for p in d['pools']])
169
f67539c2 170 def new_incremental(self) -> 'OSDMapIncremental':
3efd9988
FG
171 return self._new_incremental()
172
f67539c2 173 def apply_incremental(self, inc: 'OSDMapIncremental') -> 'OSDMap':
3efd9988
FG
174 return self._apply_incremental(inc)
175
f67539c2 176 def get_crush(self) -> 'CRUSHMap':
3efd9988
FG
177 return self._get_crush()
178
f67539c2 179 def get_pools_by_take(self, take: int) -> List[int]:
3efd9988
FG
180 return self._get_pools_by_take(take).get('pools', [])
181
f67539c2
TL
182 def calc_pg_upmaps(self, inc: 'OSDMapIncremental',
183 max_deviation: int,
184 max_iterations: int = 10,
185 pools: Optional[List[str]] = None) -> int:
11fdf7f2
TL
186 if pools is None:
187 pools = []
3efd9988
FG
188 return self._calc_pg_upmaps(
189 inc,
190 max_deviation, max_iterations, pools)
191
f67539c2 192 def map_pool_pgs_up(self, poolid: int) -> List[int]:
3efd9988
FG
193 return self._map_pool_pgs_up(poolid)
194
f67539c2 195 def pg_to_up_acting_osds(self, pool_id: int, ps: int) -> Dict[str, Any]:
11fdf7f2
TL
196 return self._pg_to_up_acting_osds(pool_id, ps)
197
f67539c2 198 def pool_raw_used_rate(self, pool_id: int) -> float:
11fdf7f2
TL
199 return self._pool_raw_used_rate(pool_id)
200
20effc67
TL
201 @classmethod
202 def build_simple(cls, epoch: int = 1, uuid: Optional[str] = None, num_osd: int = -1) -> 'ceph_module.BasePyOSDMap':
203 return cls._build_simple(epoch, uuid, num_osd)
204
f67539c2 205 def get_ec_profile(self, name: str) -> Optional[List[Dict[str, str]]]:
11fdf7f2
TL
206 # FIXME: efficient implementation
207 d = self._dump()
208 return d['erasure_code_profiles'].get(name, None)
209
f67539c2 210 def get_require_osd_release(self) -> str:
9f95a23c
TL
211 d = self._dump()
212 return d['require_osd_release']
213
11fdf7f2 214
3efd9988 215class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
f67539c2 216 def get_epoch(self) -> int:
3efd9988
FG
217 return self._get_epoch()
218
f67539c2 219 def dump(self) -> Dict[str, Any]:
3efd9988
FG
220 return self._dump()
221
f67539c2 222 def set_osd_reweights(self, weightmap: Dict[int, float]) -> None:
3efd9988
FG
223 """
224 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
225 """
226 return self._set_osd_reweights(weightmap)
227
f67539c2 228 def set_crush_compat_weight_set_weights(self, weightmap: Dict[str, float]) -> None:
3efd9988
FG
229 """
230 weightmap is a dict, int to float. devices only. e.g.,
231 { 0: 3.4, 1: 3.3, 2: 3.334 }
232 """
233 return self._set_crush_compat_weight_set_weights(weightmap)
234
11fdf7f2 235
3efd9988 236class CRUSHMap(ceph_module.BasePyCRUSH):
b32b8144 237 ITEM_NONE = 0x7fffffff
11fdf7f2 238 DEFAULT_CHOOSE_ARGS = '-1'
b32b8144 239
f67539c2 240 def dump(self) -> Dict[str, Any]:
3efd9988
FG
241 return self._dump()
242
f67539c2 243 def get_item_weight(self, item: int) -> Optional[int]:
3efd9988
FG
244 return self._get_item_weight(item)
245
f67539c2 246 def get_item_name(self, item: int) -> Optional[str]:
3efd9988
FG
247 return self._get_item_name(item)
248
f67539c2 249 def find_takes(self) -> List[int]:
3efd9988
FG
250 return self._find_takes().get('takes', [])
251
b3b6e05e
TL
252 def find_roots(self) -> List[int]:
253 return self._find_roots().get('roots', [])
254
f67539c2 255 def get_take_weight_osd_map(self, root: int) -> Dict[int, float]:
3efd9988 256 uglymap = self._get_take_weight_osd_map(root)
f67539c2 257 return {int(k): v for k, v in uglymap.get('weights', {}).items()}
11fdf7f2
TL
258
259 @staticmethod
f67539c2 260 def have_default_choose_args(dump: Dict[str, Any]) -> bool:
11fdf7f2
TL
261 return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
262
263 @staticmethod
f67539c2
TL
264 def get_default_choose_args(dump: Dict[str, Any]) -> List[Dict[str, Any]]:
265 choose_args = dump.get('choose_args')
266 assert isinstance(choose_args, dict)
267 return choose_args.get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
11fdf7f2 268
f67539c2 269 def get_rule(self, rule_name: str) -> Optional[Dict[str, Any]]:
11fdf7f2
TL
270 # TODO efficient implementation
271 for rule in self.dump()['rules']:
272 if rule_name == rule['rule_name']:
273 return rule
274
275 return None
276
f67539c2 277 def get_rule_by_id(self, rule_id: int) -> Optional[Dict[str, Any]]:
11fdf7f2
TL
278 for rule in self.dump()['rules']:
279 if rule['rule_id'] == rule_id:
280 return rule
281
282 return None
283
f67539c2 284 def get_rule_root(self, rule_name: str) -> Optional[int]:
11fdf7f2
TL
285 rule = self.get_rule(rule_name)
286 if rule is None:
287 return None
288
289 try:
f67539c2
TL
290 first_take = next(s for s in rule['steps'] if s.get('op') == 'take')
291 except StopIteration:
9f95a23c 292 logging.warning("CRUSH rule '{0}' has no 'take' step".format(
11fdf7f2
TL
293 rule_name))
294 return None
295 else:
296 return first_take['item']
297
f67539c2 298 def get_osds_under(self, root_id: int) -> List[int]:
11fdf7f2
TL
299 # TODO don't abuse dump like this
300 d = self.dump()
301 buckets = dict([(b['id'], b) for b in d['buckets']])
302
303 osd_list = []
304
f67539c2 305 def accumulate(b: Dict[str, Any]) -> None:
11fdf7f2
TL
306 for item in b['items']:
307 if item['id'] >= 0:
308 osd_list.append(item['id'])
309 else:
310 try:
311 accumulate(buckets[item['id']])
312 except KeyError:
313 pass
314
315 accumulate(buckets[root_id])
316
317 return osd_list
318
f67539c2 319 def device_class_counts(self) -> Dict[str, int]:
9f95a23c 320 result = defaultdict(int) # type: Dict[str, int]
11fdf7f2
TL
321 # TODO don't abuse dump like this
322 d = self.dump()
323 for device in d['devices']:
324 cls = device.get('class', None)
325 result[cls] += 1
326
327 return dict(result)
328
329
f67539c2
TL
330HandlerFuncType = Callable[..., Tuple[int, str, str]]
331
33c7a0ef
TL
332def _extract_target_func(
333 f: HandlerFuncType
334) -> Tuple[HandlerFuncType, Dict[str, Any]]:
335 """In order to interoperate with other decorated functions,
336 we need to find the original function which will provide
337 the main set of arguments. While we descend through the
338 stack of wrapped functions, gather additional arguments
339 the decorators may want to provide.
340 """
341 # use getattr to keep mypy happy
342 wrapped = getattr(f, "__wrapped__", None)
343 if not wrapped:
344 return f, {}
1e59de90 345 extra_args: Dict[str, Any] = {}
33c7a0ef
TL
346 while wrapped is not None:
347 extra_args.update(getattr(f, "extra_args", {}))
348 f = wrapped
349 wrapped = getattr(f, "__wrapped__", None)
350 return f, extra_args
351
f67539c2 352
11fdf7f2 353class CLICommand(object):
9f95a23c 354 COMMANDS = {} # type: Dict[str, CLICommand]
11fdf7f2 355
f67539c2
TL
356 def __init__(self,
357 prefix: str,
358 perm: str = 'rw',
359 poll: bool = False):
11fdf7f2 360 self.prefix = prefix
11fdf7f2 361 self.perm = perm
f67539c2 362 self.poll = poll
9f95a23c 363 self.func = None # type: Optional[Callable]
f67539c2
TL
364 self.arg_spec = {} # type: Dict[str, Any]
365 self.first_default = -1
11fdf7f2 366
f67539c2
TL
367 KNOWN_ARGS = '_', 'self', 'mgr', 'inbuf', 'return'
368
33c7a0ef
TL
369 @classmethod
370 def _load_func_metadata(cls: Any, f: HandlerFuncType) -> Tuple[str, Dict[str, Any], int, str]:
371 f, extra_args = _extract_target_func(f)
a4b75251 372 desc = (inspect.getdoc(f) or '').replace('\n', ' ')
f67539c2
TL
373 full_argspec = inspect.getfullargspec(f)
374 arg_spec = full_argspec.annotations
375 first_default = len(arg_spec)
376 if full_argspec.defaults:
377 first_default -= len(full_argspec.defaults)
378 args = []
20effc67 379 positional = True
f67539c2 380 for index, arg in enumerate(full_argspec.args):
33c7a0ef 381 if arg in cls.KNOWN_ARGS:
1e59de90
TL
382 # record that this function takes an inbuf if it is present
383 # in the full_argspec and not already in the arg_spec
384 if arg == 'inbuf' and 'inbuf' not in arg_spec:
385 arg_spec['inbuf'] = 'str'
f67539c2 386 continue
20effc67
TL
387 if arg == '_end_positional_':
388 positional = False
389 continue
390 if (
391 arg == 'format'
392 or arg_spec[arg] is Optional[bool]
393 or arg_spec[arg] is bool
394 ):
395 # implicit switch to non-positional on any
396 # Optional[bool] or the --format option
397 positional = False
f67539c2
TL
398 assert arg in arg_spec, \
399 f"'{arg}' is not annotated for {f}: {full_argspec}"
400 has_default = index >= first_default
401 args.append(CephArgtype.to_argdesc(arg_spec[arg],
402 dict(name=arg),
20effc67
TL
403 has_default,
404 positional))
33c7a0ef
TL
405 for argname, argtype in extra_args.items():
406 # avoid shadowing args from the function
407 if argname in arg_spec:
408 continue
409 arg_spec[argname] = argtype
410 args.append(CephArgtype.to_argdesc(
39ae355f 411 argtype, dict(name=argname), has_default=True, positional=False
33c7a0ef 412 ))
f67539c2
TL
413 return desc, arg_spec, first_default, ' '.join(args)
414
415 def store_func_metadata(self, f: HandlerFuncType) -> None:
416 self.desc, self.arg_spec, self.first_default, self.args = \
33c7a0ef 417 self._load_func_metadata(f)
f67539c2
TL
418
419 def __call__(self, func: HandlerFuncType) -> HandlerFuncType:
420 self.store_func_metadata(func)
11fdf7f2
TL
421 self.func = func
422 self.COMMANDS[self.prefix] = self
423 return self.func
424
f67539c2
TL
425 def _get_arg_value(self, kwargs_switch: bool, key: str, val: Any) -> Tuple[bool, str, Any]:
426 def start_kwargs() -> bool:
427 if isinstance(val, str) and '=' in val:
428 k, v = val.split('=', 1)
429 if k in self.arg_spec:
430 return True
431 return False
432
433 if not kwargs_switch:
434 kwargs_switch = start_kwargs()
435
436 if kwargs_switch:
437 k, v = val.split('=', 1)
438 else:
439 k, v = key, val
440 return kwargs_switch, k.replace('-', '_'), v
441
1e59de90 442 def _collect_args_by_argspec(self, cmd_dict: Dict[str, Any]) -> Tuple[Dict[str, Any], Set[str]]:
11fdf7f2 443 kwargs = {}
1e59de90 444 special_args = set()
f67539c2
TL
445 kwargs_switch = False
446 for index, (name, tp) in enumerate(self.arg_spec.items()):
447 if name in CLICommand.KNOWN_ARGS:
1e59de90 448 special_args.add(name)
11fdf7f2 449 continue
f67539c2
TL
450 assert self.first_default >= 0
451 raw_v = cmd_dict.get(name)
452 if index >= self.first_default:
453 if raw_v is None:
454 continue
455 kwargs_switch, k, v = self._get_arg_value(kwargs_switch,
456 name, raw_v)
457 kwargs[k] = CephArgtype.cast_to(tp, v)
1e59de90 458 return kwargs, special_args
f67539c2
TL
459
460 def call(self,
461 mgr: Any,
462 cmd_dict: Dict[str, Any],
463 inbuf: Optional[str] = None) -> HandleCommandResult:
1e59de90 464 kwargs, specials = self._collect_args_by_argspec(cmd_dict)
11fdf7f2 465 if inbuf:
1e59de90
TL
466 if 'inbuf' not in specials:
467 return HandleCommandResult(
468 -errno.EINVAL,
469 '',
470 'Invalid command: Input file data (-i) not supported',
471 )
11fdf7f2 472 kwargs['inbuf'] = inbuf
9f95a23c 473 assert self.func
11fdf7f2
TL
474 return self.func(mgr, **kwargs)
475
f67539c2 476 def dump_cmd(self) -> Dict[str, Union[str, bool]]:
9f95a23c
TL
477 return {
478 'cmd': '{} {}'.format(self.prefix, self.args),
479 'desc': self.desc,
f67539c2
TL
480 'perm': self.perm,
481 'poll': self.poll,
9f95a23c
TL
482 }
483
11fdf7f2 484 @classmethod
f67539c2 485 def dump_cmd_list(cls) -> List[Dict[str, Union[str, bool]]]:
9f95a23c 486 return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
11fdf7f2
TL
487
488
f67539c2
TL
489def CLIReadCommand(prefix: str, poll: bool = False) -> CLICommand:
490 return CLICommand(prefix, "r", poll)
11fdf7f2
TL
491
492
f67539c2
TL
493def CLIWriteCommand(prefix: str, poll: bool = False) -> CLICommand:
494 return CLICommand(prefix, "w", poll)
11fdf7f2
TL
495
496
522d829b
TL
497def CLICheckNonemptyFileInput(desc: str) -> Callable[[HandlerFuncType], HandlerFuncType]:
498 def CheckFileInput(func: HandlerFuncType) -> HandlerFuncType:
499 @functools.wraps(func)
500 def check(*args: Any, **kwargs: Any) -> Tuple[int, str, str]:
501 if 'inbuf' not in kwargs:
502 return -errno.EINVAL, '', f'{ERROR_MSG_NO_INPUT_FILE}: Please specify the file '\
503 f'containing {desc} with "-i" option'
504 if isinstance(kwargs['inbuf'], str):
505 # Delete new line separator at EOF (it may have been added by a text editor).
506 kwargs['inbuf'] = kwargs['inbuf'].rstrip('\r\n').rstrip('\n')
507 if not kwargs['inbuf'] or not kwargs['inbuf'].strip():
508 return -errno.EINVAL, '', f'{ERROR_MSG_EMPTY_INPUT_FILE}: Please add {desc} to '\
509 'the file'
510 return func(*args, **kwargs)
511 check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
512 return check
513 return CheckFileInput
cd265ab1 514
20effc67
TL
515def CLIRequiresDB(func: HandlerFuncType) -> HandlerFuncType:
516 @functools.wraps(func)
517 def check(self: MgrModule, *args: Any, **kwargs: Any) -> Tuple[int, str, str]:
518 if not self.db_ready():
519 return -errno.EAGAIN, "", "mgr db not yet available"
520 return func(self, *args, **kwargs)
521 check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
522 return check
cd265ab1 523
f67539c2 524def _get_localized_key(prefix: str, key: str) -> str:
11fdf7f2
TL
525 return '{}/{}'.format(prefix, key)
526
527
f67539c2
TL
528"""
529MODULE_OPTIONS types and Option Class
530"""
531if TYPE_CHECKING:
532 OptionTypeLabel = Literal[
533 'uint', 'int', 'str', 'float', 'bool', 'addr', 'addrvec', 'uuid', 'size', 'secs']
534
535
536# common/options.h: value_t
537OptionValue = Optional[Union[bool, int, float, str]]
11fdf7f2 538
11fdf7f2 539
f67539c2
TL
540class Option(Dict):
541 """
542 Helper class to declare options for MODULE_OPTIONS list.
543 TODO: Replace with typing.TypedDict when in python_version >= 3.8
11fdf7f2
TL
544 """
545
546 def __init__(
f67539c2
TL
547 self,
548 name: str,
549 default: OptionValue = None,
550 type: 'OptionTypeLabel' = 'str',
551 desc: Optional[str] = None,
552 long_desc: Optional[str] = None,
553 min: OptionValue = None,
554 max: OptionValue = None,
555 enum_allowed: Optional[List[str]] = None,
556 tags: Optional[List[str]] = None,
557 see_also: Optional[List[str]] = None,
558 runtime: bool = False,
11fdf7f2
TL
559 ):
560 super(Option, self).__init__(
561 (k, v) for k, v in vars().items()
562 if k != 'self' and v is not None)
563
f67539c2 564
92f5a8d4
TL
565class Command(dict):
566 """
567 Helper class to declare options for COMMANDS list.
568
569 It also allows to specify prefix and args separately, as well as storing a
570 handler callable.
571
572 Usage:
20effc67 573 >>> def handler(): return 0, "", ""
92f5a8d4 574 >>> Command(prefix="example",
20effc67
TL
575 ... handler=handler,
576 ... perm='w')
577 {'perm': 'w', 'poll': False}
92f5a8d4
TL
578 """
579
580 def __init__(
581 self,
f67539c2
TL
582 prefix: str,
583 handler: HandlerFuncType,
584 perm: str = "rw",
585 poll: bool = False,
92f5a8d4 586 ):
f67539c2
TL
587 super().__init__(perm=perm,
588 poll=poll)
92f5a8d4 589 self.prefix = prefix
92f5a8d4
TL
590 self.handler = handler
591
f67539c2
TL
592 @staticmethod
593 def returns_command_result(instance: Any,
594 f: HandlerFuncType) -> Callable[..., HandleCommandResult]:
595 @functools.wraps(f)
596 def wrapper(mgr: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
597 retval, stdout, stderr = f(instance or mgr, *args, **kwargs)
598 return HandleCommandResult(retval, stdout, stderr)
599 wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined]
600 return wrapper
601
602 def register(self, instance: bool = False) -> HandlerFuncType:
92f5a8d4
TL
603 """
604 Register a CLICommand handler. It allows an instance to register bound
605 methods. In that case, the mgr instance is not passed, and it's expected
606 to be available in the class instance.
607 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
608 items.
609 """
f67539c2
TL
610 cmd = CLICommand(prefix=self.prefix, perm=self['perm'])
611 return cmd(self.returns_command_result(instance, self.handler))
92f5a8d4 612
3efd9988 613
9f95a23c 614class CPlusPlusHandler(logging.Handler):
f67539c2 615 def __init__(self, module_inst: Any):
9f95a23c
TL
616 super(CPlusPlusHandler, self).__init__()
617 self._module = module_inst
618 self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
619 .format(module_inst.module_name)))
620
f67539c2 621 def emit(self, record: logging.LogRecord) -> None:
9f95a23c
TL
622 if record.levelno >= self.level:
623 self._module._ceph_log(self.format(record))
624
f67539c2 625
9f95a23c 626class ClusterLogHandler(logging.Handler):
f67539c2 627 def __init__(self, module_inst: Any):
9f95a23c
TL
628 super().__init__()
629 self._module = module_inst
630 self.setFormatter(logging.Formatter("%(message)s"))
631
f67539c2 632 def emit(self, record: logging.LogRecord) -> None:
9f95a23c 633 levelmap = {
f67539c2
TL
634 logging.DEBUG: MgrModule.ClusterLogPrio.DEBUG,
635 logging.INFO: MgrModule.ClusterLogPrio.INFO,
636 logging.WARNING: MgrModule.ClusterLogPrio.WARN,
637 logging.ERROR: MgrModule.ClusterLogPrio.ERROR,
638 logging.CRITICAL: MgrModule.ClusterLogPrio.ERROR,
9f95a23c 639 }
f67539c2 640 level = levelmap[record.levelno]
9f95a23c
TL
641 if record.levelno >= self.level:
642 self._module.cluster_log(self._module.module_name,
643 level,
644 self.format(record))
645
f67539c2 646
9f95a23c 647class FileHandler(logging.FileHandler):
f67539c2 648 def __init__(self, module_inst: Any):
9f95a23c
TL
649 path = module_inst.get_ceph_option("log_file")
650 idx = path.rfind(".log")
651 if idx != -1:
652 self.path = "{}.{}.log".format(path[:idx], module_inst.module_name)
653 else:
654 self.path = "{}.{}".format(path, module_inst.module_name)
655 super(FileHandler, self).__init__(self.path, delay=True)
656 self.setFormatter(logging.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
657
658
659class MgrModuleLoggingMixin(object):
f67539c2
TL
660 def _configure_logging(self,
661 mgr_level: str,
662 module_level: str,
663 cluster_level: str,
664 log_to_file: bool,
665 log_to_cluster: bool) -> None:
666 self._mgr_level: Optional[str] = None
667 self._module_level: Optional[str] = None
9f95a23c
TL
668 self._root_logger = logging.getLogger()
669
670 self._unconfigure_logging()
671
672 # the ceph log handler is initialized only once
673 self._mgr_log_handler = CPlusPlusHandler(self)
674 self._cluster_log_handler = ClusterLogHandler(self)
675 self._file_log_handler = FileHandler(self)
676
677 self.log_to_file = log_to_file
678 self.log_to_cluster = log_to_cluster
679
680 self._root_logger.addHandler(self._mgr_log_handler)
681 if log_to_file:
682 self._root_logger.addHandler(self._file_log_handler)
683 if log_to_cluster:
684 self._root_logger.addHandler(self._cluster_log_handler)
685
686 self._root_logger.setLevel(logging.NOTSET)
687 self._set_log_level(mgr_level, module_level, cluster_level)
688
f67539c2 689 def _unconfigure_logging(self) -> None:
9f95a23c
TL
690 # remove existing handlers:
691 rm_handlers = [
f67539c2
TL
692 h for h in self._root_logger.handlers
693 if (isinstance(h, CPlusPlusHandler) or
694 isinstance(h, FileHandler) or
695 isinstance(h, ClusterLogHandler))]
9f95a23c
TL
696 for h in rm_handlers:
697 self._root_logger.removeHandler(h)
698 self.log_to_file = False
699 self.log_to_cluster = False
700
f67539c2
TL
701 def _set_log_level(self,
702 mgr_level: str,
703 module_level: str,
704 cluster_level: str) -> None:
9f95a23c
TL
705 self._cluster_log_handler.setLevel(cluster_level.upper())
706
707 module_level = module_level.upper() if module_level else ''
708 if not self._module_level:
709 # using debug_mgr level
710 if not module_level and self._mgr_level == mgr_level:
711 # no change in module level neither in debug_mgr
712 return
713 else:
714 if self._module_level == module_level:
715 # no change in module level
716 return
717
718 if not self._module_level and not module_level:
719 level = self._ceph_log_level_to_python(mgr_level)
f67539c2
TL
720 self.getLogger().debug("setting log level based on debug_mgr: %s (%s)",
721 level, mgr_level)
9f95a23c
TL
722 elif self._module_level and not module_level:
723 level = self._ceph_log_level_to_python(mgr_level)
724 self.getLogger().warning("unsetting module log level, falling back to "
725 "debug_mgr level: %s (%s)", level, mgr_level)
726 elif module_level:
727 level = module_level
728 self.getLogger().debug("setting log level: %s", level)
729
730 self._module_level = module_level
731 self._mgr_level = mgr_level
732
733 self._mgr_log_handler.setLevel(level)
734 self._file_log_handler.setLevel(level)
735
f67539c2 736 def _enable_file_log(self) -> None:
9f95a23c
TL
737 # enable file log
738 self.getLogger().warning("enabling logging to file")
739 self.log_to_file = True
740 self._root_logger.addHandler(self._file_log_handler)
741
f67539c2 742 def _disable_file_log(self) -> None:
9f95a23c
TL
743 # disable file log
744 self.getLogger().warning("disabling logging to file")
745 self.log_to_file = False
746 self._root_logger.removeHandler(self._file_log_handler)
747
f67539c2 748 def _enable_cluster_log(self) -> None:
9f95a23c
TL
749 # enable cluster log
750 self.getLogger().warning("enabling logging to cluster")
751 self.log_to_cluster = True
752 self._root_logger.addHandler(self._cluster_log_handler)
753
f67539c2 754 def _disable_cluster_log(self) -> None:
9f95a23c
TL
755 # disable cluster log
756 self.getLogger().warning("disabling logging to cluster")
757 self.log_to_cluster = False
758 self._root_logger.removeHandler(self._cluster_log_handler)
759
f67539c2
TL
760 def _ceph_log_level_to_python(self, log_level: str) -> str:
761 if log_level:
9f95a23c 762 try:
f67539c2 763 ceph_log_level = int(log_level.split("/", 1)[0])
9f95a23c
TL
764 except ValueError:
765 ceph_log_level = 0
766 else:
767 ceph_log_level = 0
768
769 log_level = "DEBUG"
770 if ceph_log_level <= 0:
771 log_level = "CRITICAL"
772 elif ceph_log_level <= 1:
773 log_level = "WARNING"
774 elif ceph_log_level <= 4:
775 log_level = "INFO"
776 return log_level
777
f67539c2 778 def getLogger(self, name: Optional[str] = None) -> logging.Logger:
9f95a23c
TL
779 return logging.getLogger(name)
780
781
782class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
3efd9988
FG
783 """
784 Standby modules only implement a serve and shutdown method, they
785 are not permitted to implement commands and they do not receive
786 any notifications.
787
b32b8144 788 They only have access to the mgrmap (for accessing service URI info
3efd9988
FG
789 from their active peer), and to configuration settings (read only).
790 """
791
f67539c2 792 MODULE_OPTIONS: List[Option] = []
9f95a23c 793 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
11fdf7f2 794
f67539c2 795 def __init__(self, module_name: str, capsule: Any):
3efd9988
FG
796 super(MgrStandbyModule, self).__init__(capsule)
797 self.module_name = module_name
9f95a23c 798
11fdf7f2
TL
799 # see also MgrModule.__init__()
800 for o in self.MODULE_OPTIONS:
801 if 'default' in o:
802 if 'type' in o:
803 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
804 else:
805 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
3efd9988 806
f67539c2
TL
807 # mock does not return a str
808 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
809 log_level = cast(str, self.get_module_option("log_level"))
810 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
9f95a23c
TL
811 self._configure_logging(mgr_level, log_level, cluster_level,
812 False, False)
813
814 # for backwards compatibility
815 self._logger = self.getLogger()
816
f67539c2 817 def __del__(self) -> None:
cd265ab1 818 self._cleanup()
9f95a23c
TL
819 self._unconfigure_logging()
820
f67539c2 821 def _cleanup(self) -> None:
cd265ab1
TL
822 pass
823
9f95a23c 824 @classmethod
f67539c2 825 def _register_options(cls, module_name: str) -> None:
9f95a23c
TL
826 cls.MODULE_OPTIONS.append(
827 Option(name='log_level', type='str', default="", runtime=True,
828 enum_allowed=['info', 'debug', 'critical', 'error',
829 'warning', '']))
830 cls.MODULE_OPTIONS.append(
831 Option(name='log_to_file', type='bool', default=False, runtime=True))
832 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
833 cls.MODULE_OPTIONS.append(
834 Option(name='log_to_cluster', type='bool', default=False,
835 runtime=True))
836 cls.MODULE_OPTIONS.append(
837 Option(name='log_to_cluster_level', type='str', default='info',
838 runtime=True,
839 enum_allowed=['info', 'debug', 'critical', 'error',
840 'warning', '']))
3efd9988
FG
841
842 @property
f67539c2 843 def log(self) -> logging.Logger:
3efd9988
FG
844 return self._logger
845
f67539c2 846 def serve(self) -> None:
3efd9988
FG
847 """
848 The serve method is mandatory for standby modules.
849 :return:
850 """
851 raise NotImplementedError()
852
f67539c2 853 def get_mgr_id(self) -> str:
3efd9988
FG
854 return self._ceph_get_mgr_id()
855
f67539c2 856 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
b32b8144
FG
857 """
858 Retrieve the value of a persistent configuration setting
859
b32b8144 860 :param default: the default value of the config if it is not found
b32b8144 861 """
11fdf7f2 862 r = self._ceph_get_module_option(key)
b32b8144 863 if r is None:
11fdf7f2 864 return self.MODULE_OPTION_DEFAULTS.get(key, default)
b32b8144
FG
865 else:
866 return r
867
f67539c2 868 def get_ceph_option(self, key: str) -> OptionValue:
11fdf7f2
TL
869 return self._ceph_get_option(key)
870
f67539c2 871 def get_store(self, key: str) -> Optional[str]:
11fdf7f2
TL
872 """
873 Retrieve the value of a persistent KV store entry
874
875 :param key: String
876 :return: Byte string or None
877 """
878 return self._ceph_get_store(key)
3efd9988 879
f67539c2
TL
880 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
881 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
882 if r is None:
883 r = self._ceph_get_store(key)
884 if r is None:
885 r = default
886 return r
887
888 def get_active_uri(self) -> str:
3efd9988
FG
889 return self._ceph_get_active_uri()
890
20effc67 891 def get(self, data_name: str) -> Dict[str, Any]:
b3b6e05e
TL
892 return self._ceph_get(data_name)
893
894 def get_mgr_ip(self) -> str:
895 ips = self.get("mgr_ips").get('ips', [])
896 if not ips:
897 return socket.gethostname()
898 return ips[0]
899
1e59de90
TL
900 def get_hostname(self) -> str:
901 return socket.gethostname()
902
f67539c2 903 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
11fdf7f2 904 r = self._ceph_get_module_option(key, self.get_mgr_id())
3efd9988 905 if r is None:
11fdf7f2
TL
906 return self.MODULE_OPTION_DEFAULTS.get(key, default)
907 else:
908 return r
3efd9988 909
3efd9988 910
f67539c2
TL
911HealthChecksT = Mapping[str, Mapping[str, Union[int, str, Sequence[str]]]]
912# {"type": service_type, "id": service_id}
913ServiceInfoT = Dict[str, str]
914# {"hostname": hostname,
915# "ceph_version": version,
916# "services": [service_info, ..]}
917ServerInfoT = Dict[str, Union[str, List[ServiceInfoT]]]
918PerfCounterT = Dict[str, Any]
919
920
20effc67
TL
921class API:
922 def DecoratorFactory(attr: str, default: Any): # type: ignore
923 class DecoratorClass:
924 _ATTR_TOKEN = f'__ATTR_{attr.upper()}__'
925
926 def __init__(self, value: Any=default) -> None:
927 self.value = value
928
929 def __call__(self, func: Callable) -> Any:
930 setattr(func, self._ATTR_TOKEN, self.value)
931 return func
932
933 @classmethod
934 def get(cls, func: Callable) -> Any:
935 return getattr(func, cls._ATTR_TOKEN, default)
936
937 return DecoratorClass
938
939 perm = DecoratorFactory('perm', default='r')
940 expose = DecoratorFactory('expose', default=False)(True)
941
942
9f95a23c 943class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
20effc67
TL
944 MGR_POOL_NAME = ".mgr"
945
9f95a23c 946 COMMANDS = [] # type: List[Any]
f67539c2 947 MODULE_OPTIONS: List[Option] = []
9f95a23c 948 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
7c673cae 949
20effc67
TL
950 # Database Schema
951 SCHEMA = None # type: Optional[str]
952 SCHEMA_VERSIONED = None # type: Optional[List[str]]
953
3efd9988
FG
954 # Priority definitions for perf counters
955 PRIO_CRITICAL = 10
956 PRIO_INTERESTING = 8
957 PRIO_USEFUL = 5
958 PRIO_UNINTERESTING = 2
959 PRIO_DEBUGONLY = 0
960
961 # counter value types
962 PERFCOUNTER_TIME = 1
963 PERFCOUNTER_U64 = 2
964
965 # counter types
966 PERFCOUNTER_LONGRUNAVG = 4
967 PERFCOUNTER_COUNTER = 8
968 PERFCOUNTER_HISTOGRAM = 0x10
28e407b8 969 PERFCOUNTER_TYPE_MASK = ~3
7c673cae 970
1adf2230
AA
971 # units supported
972 BYTES = 0
973 NONE = 1
11fdf7f2
TL
974
975 # Cluster log priorities
f67539c2
TL
976 class ClusterLogPrio(IntEnum):
977 DEBUG = 0
978 INFO = 1
979 SEC = 2
980 WARN = 3
981 ERROR = 4
982
983 def __init__(self, module_name: str, py_modules_ptr: object, this_ptr: object):
3efd9988 984 self.module_name = module_name
3efd9988 985 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
7c673cae 986
11fdf7f2
TL
987 for o in self.MODULE_OPTIONS:
988 if 'default' in o:
989 if 'type' in o:
990 # we'll assume the declared type matches the
991 # supplied default value's type.
992 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
993 else:
994 # module not declaring it's type, so normalize the
995 # default value to be a string for consistent behavior
996 # with default and user-supplied option values.
997 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
998
f67539c2
TL
999 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
1000 log_level = cast(str, self.get_module_option("log_level"))
1001 cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
9f95a23c 1002 log_to_file = self.get_module_option("log_to_file")
f67539c2 1003 assert isinstance(log_to_file, bool)
9f95a23c 1004 log_to_cluster = self.get_module_option("log_to_cluster")
f67539c2 1005 assert isinstance(log_to_cluster, bool)
9f95a23c
TL
1006 self._configure_logging(mgr_level, log_level, cluster_level,
1007 log_to_file, log_to_cluster)
1008
1009 # for backwards compatibility
1010 self._logger = self.getLogger()
1011
20effc67
TL
1012 self._db = None # type: Optional[sqlite3.Connection]
1013
9f95a23c
TL
1014 self._version = self._ceph_get_version()
1015
1016 self._perf_schema_cache = None
1017
1018 # Keep a librados instance for those that need it.
f67539c2 1019 self._rados: Optional[rados.Rados] = None
9f95a23c 1020
20effc67
TL
1021 # this does not change over the lifetime of an active mgr
1022 self._mgr_ips: Optional[str] = None
1023
1024 self._db_lock = threading.Lock()
1025
f67539c2 1026 def __del__(self) -> None:
9f95a23c
TL
1027 self._unconfigure_logging()
1028
1029 @classmethod
f67539c2 1030 def _register_options(cls, module_name: str) -> None:
9f95a23c
TL
1031 cls.MODULE_OPTIONS.append(
1032 Option(name='log_level', type='str', default="", runtime=True,
1033 enum_allowed=['info', 'debug', 'critical', 'error',
1034 'warning', '']))
1035 cls.MODULE_OPTIONS.append(
1036 Option(name='log_to_file', type='bool', default=False, runtime=True))
1037 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
1038 cls.MODULE_OPTIONS.append(
1039 Option(name='log_to_cluster', type='bool', default=False,
1040 runtime=True))
1041 cls.MODULE_OPTIONS.append(
1042 Option(name='log_to_cluster_level', type='str', default='info',
1043 runtime=True,
1044 enum_allowed=['info', 'debug', 'critical', 'error',
1045 'warning', '']))
3efd9988 1046
11fdf7f2 1047 @classmethod
f67539c2 1048 def _register_commands(cls, module_name: str) -> None:
11fdf7f2 1049 cls.COMMANDS.extend(CLICommand.dump_cmd_list())
7c673cae
FG
1050
1051 @property
f67539c2 1052 def log(self) -> logging.Logger:
7c673cae
FG
1053 return self._logger
1054
f67539c2 1055 def cluster_log(self, channel: str, priority: ClusterLogPrio, message: str) -> None:
11fdf7f2
TL
1056 """
1057 :param channel: The log channel. This can be 'cluster', 'audit', ...
f67539c2 1058 :param priority: The log message priority.
11fdf7f2 1059 :param message: The message to log.
11fdf7f2 1060 """
f67539c2 1061 self._ceph_cluster_log(channel, priority.value, message)
11fdf7f2 1062
7c673cae 1063 @property
f67539c2 1064 def version(self) -> str:
7c673cae
FG
1065 return self._version
1066
20effc67
TL
1067 @API.expose
1068 def pool_exists(self, name: str) -> bool:
1069 pools = [p['pool_name'] for p in self.get('osd_map')['pools']]
1070 return name in pools
1071
1072 @API.expose
1073 def have_enough_osds(self) -> bool:
1074 # wait until we have enough OSDs to allow the pool to be healthy
1075 ready = 0
1076 for osd in self.get("osd_map")["osds"]:
1077 if osd["up"] and osd["in"]:
1078 ready += 1
1079
1080 need = cast(int, self.get_ceph_option("osd_pool_default_size"))
1081 return ready >= need
1082
1083 @API.perm('w')
1084 @API.expose
1085 def rename_pool(self, srcpool: str, destpool: str) -> None:
1086 c = {
1087 'prefix': 'osd pool rename',
1088 'format': 'json',
1089 'srcpool': srcpool,
1090 'destpool': destpool,
1e59de90 1091 'yes_i_really_mean_it': True
20effc67
TL
1092 }
1093 self.check_mon_command(c)
1094
1095 @API.perm('w')
1096 @API.expose
1097 def create_pool(self, pool: str) -> None:
1098 c = {
1099 'prefix': 'osd pool create',
1100 'format': 'json',
1101 'pool': pool,
1102 'pg_num': 1,
1103 'pg_num_min': 1,
1104 'pg_num_max': 32,
1e59de90 1105 'yes_i_really_mean_it': True
20effc67
TL
1106 }
1107 self.check_mon_command(c)
1108
1109 @API.perm('w')
1110 @API.expose
1111 def appify_pool(self, pool: str, app: str) -> None:
1112 c = {
1113 'prefix': 'osd pool application enable',
1114 'format': 'json',
1115 'pool': pool,
1116 'app': app,
1117 'yes_i_really_mean_it': True
1118 }
1119 self.check_mon_command(c)
1120
1121 @API.perm('w')
1122 @API.expose
1123 def create_mgr_pool(self) -> None:
1124 self.log.info("creating mgr pool")
1125
1126 ov = self.get_module_option_ex('devicehealth', 'pool_name', 'device_health_metrics')
1127 devhealth = cast(str, ov)
1128 if devhealth is not None and self.pool_exists(devhealth):
1129 self.log.debug("reusing devicehealth pool")
1130 self.rename_pool(devhealth, self.MGR_POOL_NAME)
1131 self.appify_pool(self.MGR_POOL_NAME, 'mgr')
1132 else:
1133 self.log.debug("creating new mgr pool")
1134 self.create_pool(self.MGR_POOL_NAME)
1135 self.appify_pool(self.MGR_POOL_NAME, 'mgr')
1136
1137 def create_skeleton_schema(self, db: sqlite3.Connection) -> None:
1138 SQL = """
1139 CREATE TABLE IF NOT EXISTS MgrModuleKV (
1140 key TEXT PRIMARY KEY,
1141 value NOT NULL
1142 ) WITHOUT ROWID;
1143 INSERT OR IGNORE INTO MgrModuleKV (key, value) VALUES ('__version', 0);
1144 """
1145
1146 db.executescript(SQL)
1147
1148 def update_schema_version(self, db: sqlite3.Connection, version: int) -> None:
1149 SQL = "UPDATE OR ROLLBACK MgrModuleKV SET value = ? WHERE key = '__version';"
1150
1151 db.execute(SQL, (version,))
1152
1153 def set_kv(self, key: str, value: Any) -> None:
1154 SQL = "INSERT OR REPLACE INTO MgrModuleKV (key, value) VALUES (?, ?);"
1155
1156 assert key[:2] != "__"
1157
1158 self.log.debug(f"set_kv('{key}', '{value}')")
1159
1160 with self._db_lock, self.db:
1161 self.db.execute(SQL, (key, value))
1162
1163 @API.expose
1164 def get_kv(self, key: str) -> Any:
1165 SQL = "SELECT value FROM MgrModuleKV WHERE key = ?;"
1166
1167 assert key[:2] != "__"
1168
1169 self.log.debug(f"get_kv('{key}')")
1170
1171 with self._db_lock, self.db:
1172 cur = self.db.execute(SQL, (key,))
1173 row = cur.fetchone()
1174 if row is None:
1175 return None
1176 else:
1177 v = row['value']
1178 self.log.debug(f" = {v}")
1179 return v
1180
1181 def maybe_upgrade(self, db: sqlite3.Connection, version: int) -> None:
1182 if version <= 0:
1183 self.log.info(f"creating main.db for {self.module_name}")
1184 assert self.SCHEMA is not None
1185 cur = db.executescript(self.SCHEMA)
1186 self.update_schema_version(db, 1)
1187 else:
1188 assert self.SCHEMA_VERSIONED is not None
1189 latest = len(self.SCHEMA_VERSIONED)
1190 if latest < version:
1191 raise RuntimeError(f"main.db version is newer ({version}) than module ({latest})")
1192 for i in range(version, latest):
1193 self.log.info(f"upgrading main.db for {self.module_name} from {i-1}:{i}")
1194 SQL = self.SCHEMA_VERSIONED[i]
1195 db.executescript(SQL)
1196 if version < latest:
1197 self.update_schema_version(db, latest)
1198
1199 def load_schema(self, db: sqlite3.Connection) -> None:
1200 SQL = """
1201 SELECT value FROM MgrModuleKV WHERE key = '__version';
1202 """
1203
1204 with db:
1205 self.create_skeleton_schema(db)
1206 cur = db.execute(SQL)
1207 row = cur.fetchone()
1208 self.maybe_upgrade(db, int(row['value']))
1209 assert cur.fetchone() is None
1210 cur.close()
1211
1212 def configure_db(self, db: sqlite3.Connection) -> None:
1213 db.execute('PRAGMA FOREIGN_KEYS = 1')
1214 db.execute('PRAGMA JOURNAL_MODE = PERSIST')
1215 db.execute('PRAGMA PAGE_SIZE = 65536')
1216 db.execute('PRAGMA CACHE_SIZE = 64')
39ae355f 1217 db.execute('PRAGMA TEMP_STORE = memory')
20effc67
TL
1218 db.row_factory = sqlite3.Row
1219 self.load_schema(db)
1220
1221 def open_db(self) -> Optional[sqlite3.Connection]:
1222 if not self.pool_exists(self.MGR_POOL_NAME):
1223 if not self.have_enough_osds():
1224 return None
1225 self.create_mgr_pool()
1226 uri = f"file:///{self.MGR_POOL_NAME}:{self.module_name}/main.db?vfs=ceph";
1227 self.log.debug(f"using uri {uri}")
1228 db = sqlite3.connect(uri, check_same_thread=False, uri=True)
1229 self.configure_db(db)
1230 return db
1231
1232 @API.expose
1233 def db_ready(self) -> bool:
1234 with self._db_lock:
1235 try:
1236 return self.db is not None
1237 except MgrDBNotReady:
1238 return False
1239
1240 @property
1241 def db(self) -> sqlite3.Connection:
1242 assert self._db_lock.locked()
1243 if self._db is not None:
1244 return self._db
1245 db_allowed = self.get_ceph_option("mgr_pool")
1246 if not db_allowed:
1247 raise MgrDBNotReady();
1248 self._db = self.open_db()
1249 if self._db is None:
1250 raise MgrDBNotReady();
1251 return self._db
1252
92f5a8d4 1253 @property
f67539c2 1254 def release_name(self) -> str:
92f5a8d4
TL
1255 """
1256 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
1257 :return: Returns the release name of the Ceph version in lower case.
1258 :rtype: str
1259 """
1260 return self._ceph_get_release_name()
1261
20effc67 1262 @API.expose
f67539c2
TL
1263 def lookup_release_name(self, major: int) -> str:
1264 return self._ceph_lookup_release_name(major)
1265
1266 def get_context(self) -> object:
3efd9988
FG
1267 """
1268 :return: a Python capsule containing a C++ CephContext pointer
1269 """
1270 return self._ceph_get_context()
1271
20effc67 1272 def notify(self, notify_type: NotifyType, notify_id: str) -> None:
7c673cae
FG
1273 """
1274 Called by the ceph-mgr service to notify the Python plugin
20effc67
TL
1275 that new state is available. This method is *only* called for
1276 notify_types that are listed in the NOTIFY_TYPES string list
1277 member of the module class.
11fdf7f2
TL
1278
1279 :param notify_type: string indicating what kind of notification,
1280 such as osd_map, mon_map, fs_map, mon_status,
1281 health, pg_summary, command, service_map
1282 :param notify_id: string (may be empty) that optionally specifies
1283 which entity is being notified about. With
1284 "command" notifications this is set to the tag
1285 ``from send_command``.
1286 """
1287 pass
1288
f67539c2 1289 def _config_notify(self) -> None:
9f95a23c 1290 # check logging options for changes
f67539c2
TL
1291 mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
1292 module_level = cast(str, self.get_module_option("log_level"))
1293 cluster_level = cast(str, self.get_module_option("log_to_cluster_level"))
1294 assert isinstance(cluster_level, str)
9f95a23c 1295 log_to_file = self.get_module_option("log_to_file", False)
f67539c2 1296 assert isinstance(log_to_file, bool)
9f95a23c 1297 log_to_cluster = self.get_module_option("log_to_cluster", False)
f67539c2 1298 assert isinstance(log_to_cluster, bool)
9f95a23c
TL
1299 self._set_log_level(mgr_level, module_level, cluster_level)
1300
1301 if log_to_file != self.log_to_file:
1302 if log_to_file:
1303 self._enable_file_log()
1304 else:
1305 self._disable_file_log()
1306 if log_to_cluster != self.log_to_cluster:
1307 if log_to_cluster:
1308 self._enable_cluster_log()
1309 else:
1310 self._disable_cluster_log()
1311
1312 # call module subclass implementations
1313 self.config_notify()
1314
f67539c2 1315 def config_notify(self) -> None:
11fdf7f2
TL
1316 """
1317 Called by the ceph-mgr service to notify the Python plugin
1318 that the configuration may have changed. Modules will want to
1319 refresh any configuration values stored in config variables.
7c673cae
FG
1320 """
1321 pass
1322
f67539c2 1323 def serve(self) -> None:
7c673cae
FG
1324 """
1325 Called by the ceph-mgr service to start any server that
1326 is provided by this Python plugin. The implementation
1327 of this function should block until ``shutdown`` is called.
1328
1329 You *must* implement ``shutdown`` if you implement ``serve``
1330 """
1331 pass
1332
f67539c2 1333 def shutdown(self) -> None:
7c673cae
FG
1334 """
1335 Called by the ceph-mgr service to request that this
1336 module drop out of its serve() function. You do not
1337 need to implement this if you do not implement serve()
1338
1339 :return: None
1340 """
11fdf7f2 1341 if self._rados:
9f95a23c 1342 addrs = self._rados.get_addrs()
11fdf7f2 1343 self._rados.shutdown()
9f95a23c 1344 self._ceph_unregister_client(addrs)
1e59de90 1345 self._rados = None
7c673cae 1346
20effc67
TL
1347 @API.expose
1348 def get(self, data_name: str) -> Any:
7c673cae 1349 """
11fdf7f2
TL
1350 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
1351
20effc67 1352 :param str data_name: Valid things to fetch are osdmap_crush_map_text,
11fdf7f2
TL
1353 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
1354 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
9f95a23c 1355 health, mon_status, devices, device <devid>, pg_stats,
20effc67
TL
1356 pool_stats, pg_ready, osd_ping_times, mgr_map, mgr_ips,
1357 modified_config_options, service_map, mds_metadata,
1358 have_local_config_map, osd_pool_stats, pg_status.
11fdf7f2
TL
1359
1360 Note:
1361 All these structures have their own JSON representations: experiment
1362 or look at the C++ ``dump()`` methods to learn about them.
7c673cae 1363 """
20effc67
TL
1364 obj = self._ceph_get(data_name)
1365 if isinstance(obj, bytes):
1366 obj = json.loads(obj)
1367
1368 return obj
7c673cae 1369
f67539c2 1370 def _stattype_to_str(self, stattype: int) -> str:
11fdf7f2 1371
94b18763
FG
1372 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
1373 if typeonly == 0:
1374 return 'gauge'
1375 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
1376 # this lie matches the DaemonState decoding: only val, no counts
1377 return 'counter'
1378 if typeonly == self.PERFCOUNTER_COUNTER:
1379 return 'counter'
1380 if typeonly == self.PERFCOUNTER_HISTOGRAM:
1381 return 'histogram'
11fdf7f2 1382
94b18763
FG
1383 return ''
1384
f67539c2
TL
1385 def _perfpath_to_path_labels(self, daemon: str,
1386 path: str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]:
20effc67
TL
1387 if daemon.startswith('rgw.'):
1388 label_name = 'instance_id'
1389 daemon = daemon[len('rgw.'):]
1390 else:
1391 label_name = 'ceph_daemon'
1392
1393 label_names = (label_name,) # type: Tuple[str, ...]
9f95a23c 1394 labels = (daemon,) # type: Tuple[str, ...]
81eedcae
TL
1395
1396 if daemon.startswith('rbd-mirror.'):
1397 match = re.match(
9f95a23c 1398 r'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
81eedcae
TL
1399 path
1400 )
1401 if match:
9f95a23c 1402 path = 'rbd_mirror_image_' + match.group(4)
81eedcae
TL
1403 pool = match.group(1)
1404 namespace = match.group(2) or ''
1405 image = match.group(3)
1406 label_names += ('pool', 'namespace', 'image')
1407 labels += (pool, namespace, image)
1408
1409 return path, label_names, labels,
1410
f67539c2 1411 def _perfvalue_to_value(self, stattype: int, value: Union[int, float]) -> Union[float, int]:
28e407b8
AA
1412 if stattype & self.PERFCOUNTER_TIME:
1413 # Convert from ns to seconds
1414 return value / 1000000000.0
1415 else:
1416 return value
1417
f67539c2 1418 def _unit_to_str(self, unit: int) -> str:
1adf2230
AA
1419 if unit == self.NONE:
1420 return "/s"
1421 elif unit == self.BYTES:
11fdf7f2 1422 return "B/s"
f67539c2
TL
1423 else:
1424 raise ValueError(f'bad unit "{unit}"')
11fdf7f2
TL
1425
1426 @staticmethod
f67539c2 1427 def to_pretty_iec(n: int) -> str:
11fdf7f2
TL
1428 for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
1429 (20, 'Mi'), (10, 'Ki')]:
1430 if n > 10 << bits:
1431 return str(n >> bits) + ' ' + suffix
1432 return str(n) + ' '
1433
1434 @staticmethod
f67539c2 1435 def get_pretty_row(elems: Sequence[str], width: int) -> str:
11fdf7f2
TL
1436 """
1437 Takes an array of elements and returns a string with those elements
1438 formatted as a table row. Useful for polling modules.
1439
1440 :param elems: the elements to be printed
1441 :param width: the width of the terminal
1442 """
1443 n = len(elems)
1444 column_width = int(width / n)
1445
1446 ret = '|'
1447 for elem in elems:
1448 ret += '{0:>{w}} |'.format(elem, w=column_width - 2)
1449
1450 return ret
1451
f67539c2 1452 def get_pretty_header(self, elems: Sequence[str], width: int) -> str:
11fdf7f2
TL
1453 """
1454 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
1455
1456 :param elems: the elements to be printed
1457 :param width: the width of the terminal
1458 """
1459 n = len(elems)
1460 column_width = int(width / n)
1461
1462 # dash line
1463 ret = '+'
1464 for i in range(0, n):
1465 ret += '-' * (column_width - 1) + '+'
1466 ret += '\n'
1467
1468 # title
1469 ret += self.get_pretty_row(elems, width)
1470 ret += '\n'
1471
1472 # dash line
1473 ret += '+'
1474 for i in range(0, n):
1475 ret += '-' * (column_width - 1) + '+'
1476 ret += '\n'
1477
1478 return ret
1479
20effc67
TL
1480 @API.expose
1481 def get_server(self, hostname: str) -> ServerInfoT:
7c673cae 1482 """
11fdf7f2
TL
1483 Called by the plugin to fetch metadata about a particular hostname from
1484 ceph-mgr.
1485
1486 This is information that ceph-mgr has gleaned from the daemon metadata
1487 reported by daemons running on a particular server.
7c673cae 1488
11fdf7f2 1489 :param hostname: a hostname
7c673cae 1490 """
f67539c2 1491 return cast(ServerInfoT, self._ceph_get_server(hostname))
7c673cae 1492
20effc67 1493 @API.expose
f67539c2
TL
1494 def get_perf_schema(self,
1495 svc_type: str,
1496 svc_name: str) -> Dict[str,
1497 Dict[str, Dict[str, Union[str, int]]]]:
c07f9fc5
FG
1498 """
1499 Called by the plugin to fetch perf counter schema info.
1500 svc_name can be nullptr, as can svc_type, in which case
1501 they are wildcards
1502
11fdf7f2
TL
1503 :param str svc_type:
1504 :param str svc_name:
c07f9fc5
FG
1505 :return: list of dicts describing the counters requested
1506 """
3efd9988 1507 return self._ceph_get_perf_schema(svc_type, svc_name)
c07f9fc5 1508
20effc67
TL
1509 def get_rocksdb_version(self) -> str:
1510 """
1511 Called by the plugin to fetch the latest RocksDB version number.
1512
1513 :return: str representing the major, minor, and patch RocksDB version numbers
1514 """
1515 return self._ceph_get_rocksdb_version()
1516
1517 @API.expose
f67539c2
TL
1518 def get_counter(self,
1519 svc_type: str,
1520 svc_name: str,
1521 path: str) -> Dict[str, List[Tuple[float, int]]]:
7c673cae 1522 """
11fdf7f2
TL
1523 Called by the plugin to fetch the latest performance counter data for a
1524 particular counter on a particular service.
7c673cae 1525
11fdf7f2
TL
1526 :param str svc_type:
1527 :param str svc_name:
1528 :param str path: a period-separated concatenation of the subsystem and the
1529 counter name, for example "mds.inodes".
f67539c2
TL
1530 :return: A dict of counter names to their values. each value is a list of
1531 of two-tuples of (timestamp, value). This may be empty if no data is
1532 available.
7c673cae 1533 """
3efd9988 1534 return self._ceph_get_counter(svc_type, svc_name, path)
7c673cae 1535
20effc67 1536 @API.expose
f67539c2
TL
1537 def get_latest_counter(self,
1538 svc_type: str,
1539 svc_name: str,
1540 path: str) -> Dict[str, Union[Tuple[float, int],
1541 Tuple[float, int, int]]]:
11fdf7f2
TL
1542 """
1543 Called by the plugin to fetch only the newest performance counter data
f67539c2 1544 point for a particular counter on a particular service.
11fdf7f2
TL
1545
1546 :param str svc_type:
1547 :param str svc_name:
1548 :param str path: a period-separated concatenation of the subsystem and the
1549 counter name, for example "mds.inodes".
f67539c2
TL
1550 :return: A list of two-tuples of (timestamp, value) or three-tuple of
1551 (timestamp, value, count) is returned. This may be empty if no
1552 data is available.
11fdf7f2
TL
1553 """
1554 return self._ceph_get_latest_counter(svc_type, svc_name, path)
1555
20effc67 1556 @API.expose
f67539c2 1557 def list_servers(self) -> List[ServerInfoT]:
7c673cae 1558 """
11fdf7f2
TL
1559 Like ``get_server``, but gives information about all servers (i.e. all
1560 unique hostnames that have been mentioned in daemon metadata)
1561
1562 :return: a list of information about all servers
1563 :rtype: list
7c673cae 1564 """
f67539c2 1565 return cast(List[ServerInfoT], self._ceph_get_server(None))
7c673cae 1566
f67539c2
TL
1567 def get_metadata(self,
1568 svc_type: str,
1569 svc_id: str,
1570 default: Optional[Dict[str, str]] = None) -> Optional[Dict[str, str]]:
7c673cae 1571 """
11fdf7f2 1572 Fetch the daemon metadata for a particular service.
7c673cae 1573
11fdf7f2
TL
1574 ceph-mgr fetches metadata asynchronously, so are windows of time during
1575 addition/removal of services where the metadata is not available to
1576 modules. ``None`` is returned if no metadata is available.
1577
1578 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1579 :param str svc_id: service id. convert OSD integer IDs to strings when
1580 calling this
1581 :rtype: dict, or None if no metadata found
7c673cae 1582 """
f6b5b4d7 1583 metadata = self._ceph_get_metadata(svc_type, svc_id)
39ae355f 1584 if not metadata:
f6b5b4d7
TL
1585 return default
1586 return metadata
7c673cae 1587
20effc67 1588 @API.expose
f67539c2 1589 def get_daemon_status(self, svc_type: str, svc_id: str) -> Dict[str, str]:
224ce89b
WB
1590 """
1591 Fetch the latest status for a particular service daemon.
1592
11fdf7f2
TL
1593 This method may return ``None`` if no status information is
1594 available, for example because the daemon hasn't fully started yet.
1595
224ce89b
WB
1596 :param svc_type: string (e.g., 'rgw')
1597 :param svc_id: string
11fdf7f2 1598 :return: dict, or None if the service is not found
224ce89b 1599 """
3efd9988 1600 return self._ceph_get_daemon_status(svc_type, svc_id)
224ce89b 1601
f67539c2 1602 def check_mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> HandleCommandResult:
e306af50
TL
1603 """
1604 Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
1605 if ``retval != 0``.
1606 """
1607
cd265ab1 1608 r = HandleCommandResult(*self.mon_command(cmd_dict, inbuf))
e306af50
TL
1609 if r.retval:
1610 raise MonCommandFailed(f'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
1611 return r
1612
f67539c2 1613 def mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
11fdf7f2
TL
1614 """
1615 Helper for modules that do simple, synchronous mon command
1616 execution.
1617
1618 See send_command for general case.
1619
1620 :return: status int, out std, err str
1621 """
1622
1623 t1 = time.time()
1624 result = CommandResult()
cd265ab1 1625 self.send_command(result, "mon", "", json.dumps(cmd_dict), "", inbuf)
11fdf7f2
TL
1626 r = result.wait()
1627 t2 = time.time()
1628
1629 self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1630 cmd_dict['prefix'], r[0], t2 - t1
1631 ))
1632
1633 return r
1634
20effc67
TL
1635 def osd_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1636 """
1637 Helper for osd command execution.
1638
1639 See send_command for general case. Also, see osd/OSD.cc for available commands.
1640
1641 :param dict cmd_dict: expects a prefix and an osd id, i.e.:
1642 cmd_dict = {
1643 'prefix': 'perf histogram dump',
1644 'id': '0'
1645 }
1646 :return: status int, out std, err str
1647 """
1648 t1 = time.time()
1649 result = CommandResult()
1650 self.send_command(result, "osd", cmd_dict['id'], json.dumps(cmd_dict), "", inbuf)
1651 r = result.wait()
1652 t2 = time.time()
1653
1654 self.log.debug("osd_command: '{0}' -> {1} in {2:.3f}s".format(
1655 cmd_dict['prefix'], r[0], t2 - t1
1656 ))
1657
1658 return r
1659
2a845540
TL
1660 def tell_command(self, daemon_type: str, daemon_id: str, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
1661 """
1662 Helper for `ceph tell` command execution.
1663
1664 See send_command for general case.
1665
1666 :param dict cmd_dict: expects a prefix i.e.:
1667 cmd_dict = {
1668 'prefix': 'heap',
1669 'heapcmd': 'stats',
1670 }
1671 :return: status int, out std, err str
1672 """
1673 t1 = time.time()
1674 result = CommandResult()
1675 self.send_command(result, daemon_type, daemon_id, json.dumps(cmd_dict), "", inbuf)
1676 r = result.wait()
1677 t2 = time.time()
1678
1679 self.log.debug("tell_command on {0}.{1}: '{2}' -> {3} in {4:.5f}s".format(
1680 daemon_type, daemon_id, cmd_dict['prefix'], r[0], t2 - t1
1681 ))
1682
1683 return r
1684
cd265ab1
TL
1685 def send_command(
1686 self,
1687 result: CommandResult,
1688 svc_type: str,
1689 svc_id: str,
1690 command: str,
1691 tag: str,
f67539c2 1692 inbuf: Optional[str] = None) -> None:
7c673cae
FG
1693 """
1694 Called by the plugin to send a command to the mon
1695 cluster.
11fdf7f2
TL
1696
1697 :param CommandResult result: an instance of the ``CommandResult``
1698 class, defined in the same module as MgrModule. This acts as a
1699 completion and stores the output of the command. Use
1700 ``CommandResult.wait()`` if you want to block on completion.
1701 :param str svc_type:
1702 :param str svc_id:
1703 :param str command: a JSON-serialized command. This uses the same
1704 format as the ceph command line, which is a dictionary of command
1705 arguments, with the extra ``prefix`` key containing the command
1706 name itself. Consult MonCommands.h for available commands and
1707 their expected arguments.
1708 :param str tag: used for nonblocking operation: when a command
1709 completes, the ``notify()`` callback on the MgrModule instance is
1710 triggered, with notify_type set to "command", and notify_id set to
1711 the tag of the command.
cd265ab1 1712 :param str inbuf: input buffer for sending additional data.
7c673cae 1713 """
cd265ab1 1714 self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
7c673cae 1715
a4b75251
TL
1716 def tool_exec(
1717 self,
1718 args: List[str],
1719 timeout: int = 10,
1720 stdin: Optional[bytes] = None
1721 ) -> Tuple[int, str, str]:
1722 try:
1723 tool = args.pop(0)
1724 cmd = [
1725 tool,
1726 '-k', str(self.get_ceph_option('keyring')),
1727 '-n', f'mgr.{self.get_mgr_id()}',
1728 ] + args
1729 self.log.debug('exec: ' + ' '.join(cmd))
1730 p = subprocess.run(
1731 cmd,
1732 input=stdin,
1733 stdout=subprocess.PIPE,
1734 stderr=subprocess.PIPE,
1735 timeout=timeout,
1736 )
1737 except subprocess.TimeoutExpired as ex:
1738 self.log.error(ex)
1739 return -errno.ETIMEDOUT, '', str(ex)
1740 if p.returncode:
1741 self.log.error(f'Non-zero return from {cmd}: {p.stderr.decode()}')
1742 return p.returncode, p.stdout.decode(), p.stderr.decode()
1743
f67539c2 1744 def set_health_checks(self, checks: HealthChecksT) -> None:
c07f9fc5 1745 """
c07f9fc5
FG
1746 Set the module's current map of health checks. Argument is a
1747 dict of check names to info, in this form:
1748
11fdf7f2
TL
1749 ::
1750
c07f9fc5
FG
1751 {
1752 'CHECK_FOO': {
1753 'severity': 'warning', # or 'error'
1754 'summary': 'summary string',
9f95a23c 1755 'count': 4, # quantify badness
c07f9fc5
FG
1756 'detail': [ 'list', 'of', 'detail', 'strings' ],
1757 },
1758 'CHECK_BAR': {
1759 'severity': 'error',
1760 'summary': 'bars are bad',
1761 'detail': [ 'too hard' ],
1762 },
1763 }
1764
1765 :param list: dict of health check dicts
1766 """
3efd9988 1767 self._ceph_set_health_checks(checks)
c07f9fc5 1768
f67539c2
TL
1769 def _handle_command(self,
1770 inbuf: str,
1771 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1772 Tuple[int, str, str]]:
11fdf7f2
TL
1773 if cmd['prefix'] not in CLICommand.COMMANDS:
1774 return self.handle_command(inbuf, cmd)
9f95a23c 1775
11fdf7f2
TL
1776 return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
1777
f67539c2
TL
1778 def handle_command(self,
1779 inbuf: str,
1780 cmd: Dict[str, Any]) -> Union[HandleCommandResult,
1781 Tuple[int, str, str]]:
7c673cae
FG
1782 """
1783 Called by ceph-mgr to request the plugin to handle one
1784 of the commands that it declared in self.COMMANDS
1785
1786 Return a status code, an output buffer, and an
1787 output string. The output buffer is for data results,
1788 the output string is for informative text.
1789
11fdf7f2
TL
1790 :param inbuf: content of any "-i <file>" supplied to ceph cli
1791 :type inbuf: str
1792 :param cmd: from Ceph's cmdmap_t
1793 :type cmd: dict
7c673cae 1794
11fdf7f2 1795 :return: HandleCommandResult or a 3-tuple of (int, str, str)
7c673cae
FG
1796 """
1797
1798 # Should never get called if they didn't declare
1799 # any ``COMMANDS``
1800 raise NotImplementedError()
1801
f67539c2 1802 def get_mgr_id(self) -> str:
31f18b77 1803 """
11fdf7f2
TL
1804 Retrieve the name of the manager daemon where this plugin
1805 is currently being executed (i.e. the active manager).
31f18b77
FG
1806
1807 :return: str
1808 """
3efd9988 1809 return self._ceph_get_mgr_id()
31f18b77 1810
20effc67 1811 @API.expose
b3b6e05e
TL
1812 def get_ceph_conf_path(self) -> str:
1813 return self._ceph_get_ceph_conf_path()
1814
20effc67 1815 @API.expose
b3b6e05e 1816 def get_mgr_ip(self) -> str:
20effc67
TL
1817 if not self._mgr_ips:
1818 ips = self.get("mgr_ips").get('ips', [])
1819 if not ips:
1820 return socket.gethostname()
1821 self._mgr_ips = ips[0]
1822 assert self._mgr_ips is not None
1823 return self._mgr_ips
1824
1e59de90
TL
1825 @API.expose
1826 def get_hostname(self) -> str:
1827 return socket.gethostname()
1828
20effc67 1829 @API.expose
f67539c2 1830 def get_ceph_option(self, key: str) -> OptionValue:
11fdf7f2 1831 return self._ceph_get_option(key)
7c673cae 1832
20effc67 1833 @API.expose
f67539c2
TL
1834 def get_foreign_ceph_option(self, entity: str, key: str) -> OptionValue:
1835 return self._ceph_get_foreign_option(entity, key)
1836
1837 def _validate_module_option(self, key: str) -> None:
11fdf7f2
TL
1838 """
1839 Helper: don't allow get/set config callers to
1840 access config options that they didn't declare
1841 in their schema.
7c673cae 1842 """
11fdf7f2
TL
1843 if key not in [o['name'] for o in self.MODULE_OPTIONS]:
1844 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1845 format(key, self.__class__.__name__))
1846
f67539c2
TL
1847 def _get_module_option(self,
1848 key: str,
1849 default: OptionValue,
1850 localized_prefix: str = "") -> OptionValue:
11fdf7f2
TL
1851 r = self._ceph_get_module_option(self.module_name, key,
1852 localized_prefix)
3efd9988 1853 if r is None:
11fdf7f2 1854 return self.MODULE_OPTION_DEFAULTS.get(key, default)
3efd9988
FG
1855 else:
1856 return r
7c673cae 1857
f67539c2 1858 def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
11fdf7f2
TL
1859 """
1860 Retrieve the value of a persistent configuration setting
11fdf7f2
TL
1861 """
1862 self._validate_module_option(key)
1863 return self._get_module_option(key, default)
1864
f67539c2
TL
1865 def get_module_option_ex(self, module: str,
1866 key: str,
1867 default: OptionValue = None) -> OptionValue:
11fdf7f2
TL
1868 """
1869 Retrieve the value of a persistent configuration setting
1870 for the specified module.
1871
f67539c2 1872 :param module: The name of the module, e.g. 'dashboard'
11fdf7f2 1873 or 'telemetry'.
f67539c2
TL
1874 :param key: The configuration key, e.g. 'server_addr'.
1875 :param default: The default value to use when the
11fdf7f2 1876 returned value is ``None``. Defaults to ``None``.
11fdf7f2
TL
1877 """
1878 if module == self.module_name:
1879 self._validate_module_option(key)
1880 r = self._ceph_get_module_option(module, key)
1881 return default if r is None else r
1882
20effc67 1883 @API.expose
f67539c2 1884 def get_store_prefix(self, key_prefix: str) -> Dict[str, str]:
31f18b77 1885 """
11fdf7f2
TL
1886 Retrieve a dict of KV store keys to values, where the keys
1887 have the given prefix
31f18b77 1888
11fdf7f2 1889 :param str key_prefix:
31f18b77
FG
1890 :return: str
1891 """
11fdf7f2 1892 return self._ceph_get_store_prefix(key_prefix)
31f18b77 1893
f67539c2
TL
1894 def _set_localized(self,
1895 key: str,
1896 val: Optional[str],
1897 setter: Callable[[str, Optional[str]], None]) -> None:
11fdf7f2
TL
1898 return setter(_get_localized_key(self.get_mgr_id(), key), val)
1899
f67539c2 1900 def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
224ce89b
WB
1901 """
1902 Retrieve localized configuration for this ceph-mgr instance
224ce89b 1903 """
11fdf7f2
TL
1904 self._validate_module_option(key)
1905 return self._get_module_option(key, default, self.get_mgr_id())
224ce89b 1906
f67539c2 1907 def _set_module_option(self, key: str, val: Any) -> None:
eafe8130
TL
1908 return self._ceph_set_module_option(self.module_name, key,
1909 None if val is None else str(val))
224ce89b 1910
f67539c2 1911 def set_module_option(self, key: str, val: Any) -> None:
7c673cae
FG
1912 """
1913 Set the value of a persistent configuration setting
1914
11fdf7f2
TL
1915 :param str key:
1916 :type val: str | None
20effc67 1917 :raises ValueError: if `val` cannot be parsed or it is out of the specified range
7c673cae 1918 """
11fdf7f2
TL
1919 self._validate_module_option(key)
1920 return self._set_module_option(key, val)
7c673cae 1921
f67539c2 1922 def set_module_option_ex(self, module: str, key: str, val: OptionValue) -> None:
11fdf7f2
TL
1923 """
1924 Set the value of a persistent configuration setting
1925 for the specified module.
1926
1927 :param str module:
1928 :param str key:
1929 :param str val:
1930 """
1931 if module == self.module_name:
1932 self._validate_module_option(key)
1933 return self._ceph_set_module_option(module, key, str(val))
1934
20effc67
TL
1935 @API.perm('w')
1936 @API.expose
f67539c2 1937 def set_localized_module_option(self, key: str, val: Optional[str]) -> None:
224ce89b
WB
1938 """
1939 Set localized configuration for this ceph-mgr instance
11fdf7f2
TL
1940 :param str key:
1941 :param str val:
224ce89b
WB
1942 :return: str
1943 """
11fdf7f2
TL
1944 self._validate_module_option(key)
1945 return self._set_localized(key, val, self._set_module_option)
224ce89b 1946
20effc67
TL
1947 @API.perm('w')
1948 @API.expose
f67539c2 1949 def set_store(self, key: str, val: Optional[str]) -> None:
7c673cae 1950 """
11fdf7f2
TL
1951 Set a value in this module's persistent key value store.
1952 If val is None, remove key from store
7c673cae 1953 """
11fdf7f2 1954 self._ceph_set_store(key, val)
7c673cae 1955
20effc67 1956 @API.expose
f67539c2 1957 def get_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
7c673cae 1958 """
11fdf7f2 1959 Get a value from this module's persistent key value store
7c673cae 1960 """
11fdf7f2
TL
1961 r = self._ceph_get_store(key)
1962 if r is None:
1963 return default
7c673cae 1964 else:
11fdf7f2
TL
1965 return r
1966
20effc67 1967 @API.expose
f67539c2 1968 def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
11fdf7f2
TL
1969 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
1970 if r is None:
1971 r = self._ceph_get_store(key)
1972 if r is None:
1973 r = default
1974 return r
1975
20effc67
TL
1976 @API.perm('w')
1977 @API.expose
f67539c2 1978 def set_localized_store(self, key: str, val: Optional[str]) -> None:
11fdf7f2 1979 return self._set_localized(key, val, self.set_store)
224ce89b 1980
20effc67 1981 def self_test(self) -> Optional[str]:
224ce89b
WB
1982 """
1983 Run a self-test on the module. Override this function and implement
1984 a best as possible self-test for (automated) testing of the module
11fdf7f2
TL
1985
1986 Indicate any failures by raising an exception. This does not have
1987 to be pretty, it's mainly for picking up regressions during
1988 development, rather than use in the field.
1989
1990 :return: None, or an advisory string for developer interest, such
1991 as a json dump of some state.
224ce89b
WB
1992 """
1993 pass
3efd9988 1994
f67539c2 1995 def get_osdmap(self) -> OSDMap:
3efd9988
FG
1996 """
1997 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1998 OSDMap.
1999 :return: OSDMap
2000 """
f67539c2 2001 return cast(OSDMap, self._ceph_get_osdmap())
3efd9988 2002
20effc67 2003 @API.expose
f67539c2 2004 def get_latest(self, daemon_type: str, daemon_name: str, counter: str) -> int:
11fdf7f2
TL
2005 data = self.get_latest_counter(
2006 daemon_type, daemon_name, counter)[counter]
2007 if data:
2008 return data[1]
2009 else:
2010 return 0
2011
20effc67 2012 @API.expose
f67539c2 2013 def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tuple[int, int]:
11fdf7f2
TL
2014 data = self.get_latest_counter(
2015 daemon_type, daemon_name, counter)[counter]
2016 if data:
f67539c2
TL
2017 # https://github.com/python/mypy/issues/1178
2018 _, value, count = cast(Tuple[float, int, int], data)
2019 return value, count
11fdf7f2
TL
2020 else:
2021 return 0, 0
2022
20effc67 2023 @API.expose
f6b5b4d7 2024 @profile_method()
f67539c2
TL
2025 def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL,
2026 services: Sequence[str] = ("mds", "mon", "osd",
2027 "rbd-mirror", "rgw",
2028 "tcmu-runner")) -> Dict[str, dict]:
3efd9988
FG
2029 """
2030 Return the perf counters currently known to this ceph-mgr
2031 instance, filtered by priority equal to or greater than `prio_limit`.
2032
11fdf7f2 2033 The result is a map of string to dict, associating services
3efd9988
FG
2034 (like "osd.123") with their counters. The counter
2035 dict for each service maps counter paths to a counter
2036 info structure, which is the information from
2037 the schema, plus an additional "value" member with the latest
2038 value.
2039 """
2040
9f95a23c 2041 result = defaultdict(dict) # type: Dict[str, dict]
3efd9988 2042
3efd9988 2043 for server in self.list_servers():
f67539c2 2044 for service in cast(List[ServiceInfoT], server['services']):
11fdf7f2 2045 if service['type'] not in services:
3efd9988
FG
2046 continue
2047
f67539c2
TL
2048 schemas = self.get_perf_schema(service['type'], service['id'])
2049 if not schemas:
e306af50 2050 self.log.warning("No perf counter schema for {0}.{1}".format(
3efd9988
FG
2051 service['type'], service['id']
2052 ))
2053 continue
2054
2055 # Value is returned in a potentially-multi-service format,
2056 # get just the service we're asking about
11fdf7f2
TL
2057 svc_full_name = "{0}.{1}".format(
2058 service['type'], service['id'])
f67539c2 2059 schema = schemas[svc_full_name]
3efd9988
FG
2060
2061 # Populate latest values
2062 for counter_path, counter_schema in schema.items():
2063 # self.log.debug("{0}: {1}".format(
2064 # counter_path, json.dumps(counter_schema)
2065 # ))
f67539c2
TL
2066 priority = counter_schema['priority']
2067 assert isinstance(priority, int)
2068 if priority < prio_limit:
3efd9988
FG
2069 continue
2070
f67539c2
TL
2071 tp = counter_schema['type']
2072 assert isinstance(tp, int)
28e407b8 2073 counter_info = dict(counter_schema)
28e407b8 2074 # Also populate count for the long running avgs
f67539c2 2075 if tp & self.PERFCOUNTER_LONGRUNAVG:
11fdf7f2 2076 v, c = self.get_latest_avg(
28e407b8
AA
2077 service['type'],
2078 service['id'],
2079 counter_path
2080 )
2081 counter_info['value'], counter_info['count'] = v, c
2082 result[svc_full_name][counter_path] = counter_info
2083 else:
11fdf7f2 2084 counter_info['value'] = self.get_latest(
28e407b8
AA
2085 service['type'],
2086 service['id'],
2087 counter_path
2088 )
2089
3efd9988
FG
2090 result[svc_full_name][counter_path] = counter_info
2091
2092 self.log.debug("returning {0} counter".format(len(result)))
2093
2094 return result
2095
20effc67 2096 @API.expose
f67539c2 2097 def set_uri(self, uri: str) -> None:
3efd9988
FG
2098 """
2099 If the module exposes a service, then call this to publish the
2100 address once it is available.
2101
2102 :return: a string
2103 """
2104 return self._ceph_set_uri(uri)
94b18763 2105
20effc67
TL
2106 @API.perm('w')
2107 @API.expose
f67539c2
TL
2108 def set_device_wear_level(self, devid: str, wear_level: float) -> None:
2109 return self._ceph_set_device_wear_level(devid, wear_level)
2110
20effc67 2111 @API.expose
f67539c2 2112 def have_mon_connection(self) -> bool:
94b18763
FG
2113 """
2114 Check whether this ceph-mgr daemon has an open connection
2115 to a monitor. If it doesn't, then it's likely that the
2116 information we have about the cluster is out of date,
2117 and/or the monitor cluster is down.
2118 """
2119
28e407b8 2120 return self._ceph_have_mon_connection()
11fdf7f2 2121
f67539c2
TL
2122 def update_progress_event(self,
2123 evid: str,
2124 desc: str,
2125 progress: float,
2126 add_to_ceph_s: bool) -> None:
2127 return self._ceph_update_progress_event(evid, desc, progress, add_to_ceph_s)
11fdf7f2 2128
20effc67
TL
2129 @API.perm('w')
2130 @API.expose
f67539c2
TL
2131 def complete_progress_event(self, evid: str) -> None:
2132 return self._ceph_complete_progress_event(evid)
11fdf7f2 2133
20effc67
TL
2134 @API.perm('w')
2135 @API.expose
f67539c2 2136 def clear_all_progress_events(self) -> None:
11fdf7f2
TL
2137 return self._ceph_clear_all_progress_events()
2138
2139 @property
f67539c2 2140 def rados(self) -> rados.Rados:
11fdf7f2
TL
2141 """
2142 A librados instance to be shared by any classes within
2143 this mgr module that want one.
2144 """
2145 if self._rados:
2146 return self._rados
2147
2148 ctx_capsule = self.get_context()
2149 self._rados = rados.Rados(context=ctx_capsule)
2150 self._rados.connect()
9f95a23c 2151 self._ceph_register_client(self._rados.get_addrs())
11fdf7f2
TL
2152 return self._rados
2153
2154 @staticmethod
f67539c2 2155 def can_run() -> Tuple[bool, str]:
11fdf7f2
TL
2156 """
2157 Implement this function to report whether the module's dependencies
2158 are met. For example, if the module needs to import a particular
2159 dependency to work, then use a try/except around the import at
2160 file scope, and then report here if the import failed.
2161
2162 This will be called in a blocking way from the C++ code, so do not
2163 do any I/O that could block in this function.
2164
2165 :return a 2-tuple consisting of a boolean and explanatory string
2166 """
2167
2168 return True, ""
2169
20effc67 2170 @API.expose
f67539c2 2171 def remote(self, module_name: str, method_name: str, *args: Any, **kwargs: Any) -> Any:
11fdf7f2
TL
2172 """
2173 Invoke a method on another module. All arguments, and the return
2174 value from the other module must be serializable.
2175
2176 Limitation: Do not import any modules within the called method.
2177 Otherwise you will get an error in Python 2::
2178
2179 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
2180
2181
2182
2183 :param module_name: Name of other module. If module isn't loaded,
2184 an ImportError exception is raised.
2185 :param method_name: Method name. If it does not exist, a NameError
2186 exception is raised.
2187 :param args: Argument tuple
2188 :param kwargs: Keyword argument dict
2189 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
2190 :raises ImportError: No such module
2191 """
2192 return self._ceph_dispatch_remote(module_name, method_name,
2193 args, kwargs)
2194
f67539c2 2195 def add_osd_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
11fdf7f2
TL
2196 """
2197 Register an OSD perf query. Argument is a
2198 dict of the query parameters, in this form:
2199
2200 ::
2201
2202 {
2203 'key_descriptor': [
2204 {'type': subkey_type, 'regex': regex_pattern},
2205 ...
2206 ],
2207 'performance_counter_descriptors': [
2208 list, of, descriptor, types
2209 ],
2210 'limit': {'order_by': performance_counter_type, 'max_count': n},
2211 }
2212
2213 Valid subkey types:
2214 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
2215 'pg_id', 'object_name', 'snap_id'
2216 Valid performance counter types:
2217 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
2218 'latency', 'write_latency', 'read_latency'
2219
2220 :param object query: query
2221 :rtype: int (query id)
2222 """
2223 return self._ceph_add_osd_perf_query(query)
2224
20effc67
TL
2225 @API.perm('w')
2226 @API.expose
f67539c2 2227 def remove_osd_perf_query(self, query_id: int) -> None:
11fdf7f2
TL
2228 """
2229 Unregister an OSD perf query.
2230
2231 :param int query_id: query ID
2232 """
2233 return self._ceph_remove_osd_perf_query(query_id)
2234
20effc67 2235 @API.expose
f67539c2 2236 def get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
11fdf7f2
TL
2237 """
2238 Get stats collected for an OSD perf query.
2239
2240 :param int query_id: query ID
2241 """
2242 return self._ceph_get_osd_perf_counters(query_id)
494da23a 2243
f67539c2
TL
2244 def add_mds_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
2245 """
2246 Register an MDS perf query. Argument is a
2247 dict of the query parameters, in this form:
2248
2249 ::
2250
2251 {
2252 'key_descriptor': [
2253 {'type': subkey_type, 'regex': regex_pattern},
2254 ...
2255 ],
2256 'performance_counter_descriptors': [
2257 list, of, descriptor, types
2258 ],
2259 }
2260
2261 NOTE: 'limit' and 'order_by' are not supported (yet).
2262
2263 Valid subkey types:
2264 'mds_rank', 'client_id'
2265 Valid performance counter types:
2266 'cap_hit_metric'
2267
2268 :param object query: query
2269 :rtype: int (query id)
2270 """
2271 return self._ceph_add_mds_perf_query(query)
2272
20effc67
TL
2273 @API.perm('w')
2274 @API.expose
f67539c2
TL
2275 def remove_mds_perf_query(self, query_id: int) -> None:
2276 """
2277 Unregister an MDS perf query.
2278
2279 :param int query_id: query ID
2280 """
2281 return self._ceph_remove_mds_perf_query(query_id)
2282
20effc67 2283 @API.expose
33c7a0ef
TL
2284
2285 def reregister_mds_perf_queries(self) -> None:
2286 """
2287 Re-register MDS perf queries.
2288 """
2289 return self._ceph_reregister_mds_perf_queries()
2290
f67539c2
TL
2291 def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
2292 """
2293 Get stats collected for an MDS perf query.
2294
2295 :param int query_id: query ID
2296 """
2297 return self._ceph_get_mds_perf_counters(query_id)
2298
39ae355f
TL
2299 def get_daemon_health_metrics(self) -> Dict[str, List[Dict[str, Any]]]:
2300 """
2301 Get the list of health metrics per daemon. This includes SLOW_OPS health metrics
2302 in MON and OSD daemons, and PENDING_CREATING_PGS health metrics for OSDs.
2303 """
2304 return self._ceph_get_daemon_health_metrics()
2305
f67539c2 2306 def is_authorized(self, arguments: Dict[str, str]) -> bool:
92f5a8d4
TL
2307 """
2308 Verifies that the current session caps permit executing the py service
2309 or current module with the provided arguments. This provides a generic
2310 way to allow modules to restrict by more fine-grained controls (e.g.
2311 pools).
2312
2313 :param arguments: dict of key/value arguments to test
2314 """
2315 return self._ceph_is_authorized(arguments)
522d829b 2316
20effc67 2317 @API.expose
522d829b
TL
2318 def send_rgwadmin_command(self, args: List[str],
2319 stdout_as_json: bool = True) -> Tuple[int, Union[str, dict], str]:
2320 try:
2321 cmd = [
2322 'radosgw-admin',
2323 '-c', str(self.get_ceph_conf_path()),
2324 '-k', str(self.get_ceph_option('keyring')),
2325 '-n', f'mgr.{self.get_mgr_id()}',
2326 ] + args
2327 self.log.debug('Executing %s', str(cmd))
2328 result = subprocess.run( # pylint: disable=subprocess-run-check
2329 cmd,
2330 stdout=subprocess.PIPE,
2331 stderr=subprocess.PIPE,
2332 timeout=10,
2333 )
2334 stdout = result.stdout.decode('utf-8')
2335 stderr = result.stderr.decode('utf-8')
2336 if stdout and stdout_as_json:
2337 stdout = json.loads(stdout)
2338 if result.returncode:
2339 self.log.debug('Error %s executing %s: %s', result.returncode, str(cmd), stderr)
2340 return result.returncode, stdout, stderr
2341 except subprocess.CalledProcessError as ex:
2342 self.log.exception('Error executing radosgw-admin %s: %s', str(ex.cmd), str(ex.output))
2343 raise
2344 except subprocess.TimeoutExpired as ex:
2345 self.log.error('Timeout (10s) executing radosgw-admin %s', str(ex.cmd))
2346 raise