import ceph_module # noqa
-try:
- from typing import Set, Tuple, Iterator, Any, Dict, Optional, Callable, List
-except ImportError:
- # just for type checking
- pass
+from typing import cast, Tuple, Any, Dict, Generic, Optional, Callable, List, \
+ Mapping, NamedTuple, Sequence, Union, TYPE_CHECKING
+if TYPE_CHECKING:
+ import sys
+ if sys.version_info >= (3, 8):
+ from typing import Literal
+ else:
+ from typing_extensions import Literal
+
+import inspect
import logging
import errno
+import functools
import json
-import six
import threading
-from collections import defaultdict, namedtuple
+from collections import defaultdict
+from enum import IntEnum
import rados
import re
+import sys
import time
+from ceph_argparse import CephArgtype
from mgr_util import profile_method
+if sys.version_info >= (3, 8):
+ from typing import get_args, get_origin
+else:
+ def get_args(tp):
+ if tp is Generic:
+ return tp
+ else:
+ return getattr(tp, '__args__', ())
+
+ def get_origin(tp):
+ return getattr(tp, '__origin__', None)
+
+
ERROR_MSG_EMPTY_INPUT_FILE = 'Empty content: please add a password/secret to the file.'
ERROR_MSG_NO_INPUT_FILE = 'Please specify the file containing the password/secret with "-i" option.'
# Full list of strings in "osd_types.cc:pg_state_string()"
Use with MgrModule.send_command
"""
- def __init__(self, tag=None):
+ def __init__(self, tag: Optional[str] = None):
self.ev = threading.Event()
self.outs = ""
self.outb = ""
# C++ land, to avoid passing addresses around in messages.
self.tag = tag if tag else ""
- def complete(self, r, outb, outs):
+ def complete(self, r: int, outb: str, outs: str) -> None:
self.r = r
self.outb = outb
self.outs = outs
self.ev.set()
- def wait(self):
+ def wait(self) -> Tuple[int, str, str]:
self.ev.wait()
return self.r, self.outb, self.outs
-class HandleCommandResult(namedtuple('HandleCommandResult', ['retval', 'stdout', 'stderr'])):
- def __new__(cls, retval=0, stdout="", stderr=""):
- """
- Tuple containing the result of `handle_command()`
-
- Only write to stderr if there is an error, or in extraordinary circumstances
+class HandleCommandResult(NamedTuple):
+ """
+ Tuple containing the result of `handle_command()`
- Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
- is critical information to include there.
+ Only write to stderr if there is an error, or in extraordinary circumstances
- Everything programmatically consumable should be put on stdout
+ Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
+ is critical information to include there.
- :param retval: return code. E.g. 0 or -errno.EINVAL
- :type retval: int
- :param stdout: data of this result.
- :type stdout: str
- :param stderr: Typically used for error messages.
- :type stderr: str
- """
- return super(HandleCommandResult, cls).__new__(cls, retval, stdout, stderr)
+ Everything programmatically consumable should be put on stdout
+ """
+ retval: int = 0 # return code. E.g. 0 or -errno.EINVAL
+ stdout: str = "" # data of this result.
+ stderr: str = "" # Typically used for error messages.
class MonCommandFailed(RuntimeError): pass
class OSDMap(ceph_module.BasePyOSDMap):
- def get_epoch(self):
+ def get_epoch(self) -> int:
return self._get_epoch()
- def get_crush_version(self):
+ def get_crush_version(self) -> int:
return self._get_crush_version()
- def dump(self):
+ def dump(self) -> Dict[str, Any]:
return self._dump()
- def get_pools(self):
+ def get_pools(self) -> Dict[int, Dict[str, Any]]:
# FIXME: efficient implementation
d = self._dump()
return dict([(p['pool'], p) for p in d['pools']])
- def get_pools_by_name(self):
+ def get_pools_by_name(self) -> Dict[str, Dict[str, Any]]:
# FIXME: efficient implementation
d = self._dump()
return dict([(p['pool_name'], p) for p in d['pools']])
- def new_incremental(self):
+ def new_incremental(self) -> 'OSDMapIncremental':
return self._new_incremental()
- def apply_incremental(self, inc):
+ def apply_incremental(self, inc: 'OSDMapIncremental') -> 'OSDMap':
return self._apply_incremental(inc)
- def get_crush(self):
+ def get_crush(self) -> 'CRUSHMap':
return self._get_crush()
- def get_pools_by_take(self, take):
+ def get_pools_by_take(self, take: int) -> List[int]:
return self._get_pools_by_take(take).get('pools', [])
- def calc_pg_upmaps(self, inc,
- max_deviation=.01, max_iterations=10, pools=None):
+ def calc_pg_upmaps(self, inc: 'OSDMapIncremental',
+ max_deviation: int,
+ max_iterations: int = 10,
+ pools: Optional[List[str]] = None) -> int:
if pools is None:
pools = []
return self._calc_pg_upmaps(
inc,
max_deviation, max_iterations, pools)
- def map_pool_pgs_up(self, poolid):
+ def map_pool_pgs_up(self, poolid: int) -> List[int]:
return self._map_pool_pgs_up(poolid)
- def pg_to_up_acting_osds(self, pool_id, ps):
+ def pg_to_up_acting_osds(self, pool_id: int, ps: int) -> Dict[str, Any]:
return self._pg_to_up_acting_osds(pool_id, ps)
- def pool_raw_used_rate(self, pool_id):
+ def pool_raw_used_rate(self, pool_id: int) -> float:
return self._pool_raw_used_rate(pool_id)
- def get_ec_profile(self, name):
+ def get_ec_profile(self, name: str) -> Optional[List[Dict[str, str]]]:
# FIXME: efficient implementation
d = self._dump()
return d['erasure_code_profiles'].get(name, None)
- def get_require_osd_release(self):
+ def get_require_osd_release(self) -> str:
d = self._dump()
return d['require_osd_release']
class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
- def get_epoch(self):
+ def get_epoch(self) -> int:
return self._get_epoch()
- def dump(self):
+ def dump(self) -> Dict[str, Any]:
return self._dump()
- def set_osd_reweights(self, weightmap):
+ def set_osd_reweights(self, weightmap: Dict[int, float]) -> None:
"""
weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
"""
return self._set_osd_reweights(weightmap)
- def set_crush_compat_weight_set_weights(self, weightmap):
+ def set_crush_compat_weight_set_weights(self, weightmap: Dict[str, float]) -> None:
"""
weightmap is a dict, int to float. devices only. e.g.,
{ 0: 3.4, 1: 3.3, 2: 3.334 }
ITEM_NONE = 0x7fffffff
DEFAULT_CHOOSE_ARGS = '-1'
- def dump(self):
+ def dump(self) -> Dict[str, Any]:
return self._dump()
- def get_item_weight(self, item):
+ def get_item_weight(self, item: int) -> Optional[int]:
return self._get_item_weight(item)
- def get_item_name(self, item):
+ def get_item_name(self, item: int) -> Optional[str]:
return self._get_item_name(item)
- def find_takes(self):
+ def find_takes(self) -> List[int]:
return self._find_takes().get('takes', [])
- def get_take_weight_osd_map(self, root):
+ def get_take_weight_osd_map(self, root: int) -> Dict[int, float]:
uglymap = self._get_take_weight_osd_map(root)
- return {int(k): v for k, v in six.iteritems(uglymap.get('weights', {}))}
+ return {int(k): v for k, v in uglymap.get('weights', {}).items()}
@staticmethod
- def have_default_choose_args(dump):
+ def have_default_choose_args(dump: Dict[str, Any]) -> bool:
return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
@staticmethod
- def get_default_choose_args(dump):
- return dump.get('choose_args').get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
+ def get_default_choose_args(dump: Dict[str, Any]) -> List[Dict[str, Any]]:
+ choose_args = dump.get('choose_args')
+ assert isinstance(choose_args, dict)
+ return choose_args.get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
- def get_rule(self, rule_name):
+ def get_rule(self, rule_name: str) -> Optional[Dict[str, Any]]:
# TODO efficient implementation
for rule in self.dump()['rules']:
if rule_name == rule['rule_name']:
return None
- def get_rule_by_id(self, rule_id):
+ def get_rule_by_id(self, rule_id: int) -> Optional[Dict[str, Any]]:
for rule in self.dump()['rules']:
if rule['rule_id'] == rule_id:
return rule
return None
- def get_rule_root(self, rule_name):
+ def get_rule_root(self, rule_name: str) -> Optional[int]:
rule = self.get_rule(rule_name)
if rule is None:
return None
try:
- first_take = [s for s in rule['steps'] if s['op'] == 'take'][0]
- except IndexError:
+ first_take = next(s for s in rule['steps'] if s.get('op') == 'take')
+ except StopIteration:
logging.warning("CRUSH rule '{0}' has no 'take' step".format(
rule_name))
return None
else:
return first_take['item']
- def get_osds_under(self, root_id):
+ def get_osds_under(self, root_id: int) -> List[int]:
# TODO don't abuse dump like this
d = self.dump()
buckets = dict([(b['id'], b) for b in d['buckets']])
osd_list = []
- def accumulate(b):
+ def accumulate(b: Dict[str, Any]) -> None:
for item in b['items']:
if item['id'] >= 0:
osd_list.append(item['id'])
return osd_list
- def device_class_counts(self):
+ def device_class_counts(self) -> Dict[str, int]:
result = defaultdict(int) # type: Dict[str, int]
# TODO don't abuse dump like this
d = self.dump()
return dict(result)
+HandlerFuncType = Callable[..., Tuple[int, str, str]]
+
+
class CLICommand(object):
COMMANDS = {} # type: Dict[str, CLICommand]
- def __init__(self, prefix, args="", desc="", perm="rw"):
+ def __init__(self,
+ prefix: str,
+ perm: str = 'rw',
+ poll: bool = False):
self.prefix = prefix
- self.args = args
- self.args_dict = {}
- self.desc = desc
self.perm = perm
+ self.poll = poll
self.func = None # type: Optional[Callable]
- self._parse_args()
-
- def _parse_args(self):
- if not self.args:
- return
- args = self.args.split(" ")
- for arg in args:
- arg_desc = arg.strip().split(",")
- arg_d = {}
- for kv in arg_desc:
- k, v = kv.split("=")
- if k != "name":
- arg_d[k] = v
- else:
- self.args_dict[v] = arg_d
+ self.arg_spec = {} # type: Dict[str, Any]
+ self.first_default = -1
- def __call__(self, func):
+ KNOWN_ARGS = '_', 'self', 'mgr', 'inbuf', 'return'
+
+ @staticmethod
+ def load_func_metadata(f: HandlerFuncType) -> Tuple[str, Dict[str, Any], int, str]:
+ desc = inspect.getdoc(f) or ''
+ full_argspec = inspect.getfullargspec(f)
+ arg_spec = full_argspec.annotations
+ first_default = len(arg_spec)
+ if full_argspec.defaults:
+ first_default -= len(full_argspec.defaults)
+ args = []
+ for index, arg in enumerate(full_argspec.args):
+ if arg in CLICommand.KNOWN_ARGS:
+ continue
+ assert arg in arg_spec, \
+ f"'{arg}' is not annotated for {f}: {full_argspec}"
+ has_default = index >= first_default
+ args.append(CephArgtype.to_argdesc(arg_spec[arg],
+ dict(name=arg),
+ has_default))
+ return desc, arg_spec, first_default, ' '.join(args)
+
+ def store_func_metadata(self, f: HandlerFuncType) -> None:
+ self.desc, self.arg_spec, self.first_default, self.args = \
+ self.load_func_metadata(f)
+
+ def __call__(self, func: HandlerFuncType) -> HandlerFuncType:
+ self.store_func_metadata(func)
self.func = func
self.COMMANDS[self.prefix] = self
return self.func
- def call(self, mgr, cmd_dict, inbuf):
+ def _get_arg_value(self, kwargs_switch: bool, key: str, val: Any) -> Tuple[bool, str, Any]:
+ def start_kwargs() -> bool:
+ if isinstance(val, str) and '=' in val:
+ k, v = val.split('=', 1)
+ if k in self.arg_spec:
+ return True
+ return False
+
+ if not kwargs_switch:
+ kwargs_switch = start_kwargs()
+
+ if kwargs_switch:
+ k, v = val.split('=', 1)
+ else:
+ k, v = key, val
+ return kwargs_switch, k.replace('-', '_'), v
+
+ def _collect_args_by_argspec(self, cmd_dict: Dict[str, Any]) -> Dict[str, Any]:
kwargs = {}
- for a, d in self.args_dict.items():
- if 'req' in d and d['req'] == "false" and a not in cmd_dict:
+ kwargs_switch = False
+ for index, (name, tp) in enumerate(self.arg_spec.items()):
+ if name in CLICommand.KNOWN_ARGS:
continue
- kwargs[a.replace("-", "_")] = cmd_dict[a]
+ assert self.first_default >= 0
+ raw_v = cmd_dict.get(name)
+ if index >= self.first_default:
+ if raw_v is None:
+ continue
+ kwargs_switch, k, v = self._get_arg_value(kwargs_switch,
+ name, raw_v)
+ kwargs[k] = CephArgtype.cast_to(tp, v)
+ return kwargs
+
+ def call(self,
+ mgr: Any,
+ cmd_dict: Dict[str, Any],
+ inbuf: Optional[str] = None) -> HandleCommandResult:
+ kwargs = self._collect_args_by_argspec(cmd_dict)
if inbuf:
kwargs['inbuf'] = inbuf
assert self.func
return self.func(mgr, **kwargs)
- def dump_cmd(self):
+ def dump_cmd(self) -> Dict[str, Union[str, bool]]:
return {
'cmd': '{} {}'.format(self.prefix, self.args),
'desc': self.desc,
- 'perm': self.perm
+ 'perm': self.perm,
+ 'poll': self.poll,
}
@classmethod
- def dump_cmd_list(cls):
+ def dump_cmd_list(cls) -> List[Dict[str, Union[str, bool]]]:
return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
-def CLIReadCommand(prefix, args="", desc=""):
- return CLICommand(prefix, args, desc, "r")
+def CLIReadCommand(prefix: str, poll: bool = False) -> CLICommand:
+ return CLICommand(prefix, "r", poll)
-def CLIWriteCommand(prefix, args="", desc=""):
- return CLICommand(prefix, args, desc, "w")
+def CLIWriteCommand(prefix: str, poll: bool = False) -> CLICommand:
+ return CLICommand(prefix, "w", poll)
-def CLICheckNonemptyFileInput(func):
- def check(*args, **kwargs):
- if not 'inbuf' in kwargs:
+def CLICheckNonemptyFileInput(func: HandlerFuncType) -> HandlerFuncType:
+ @functools.wraps(func)
+ def check(*args: Any, **kwargs: Any) -> Tuple[int, str, str]:
+ if 'inbuf' not in kwargs:
return -errno.EINVAL, '', ERROR_MSG_NO_INPUT_FILE
if isinstance(kwargs['inbuf'], str):
# Delete new line separator at EOF (it may have been added by a text editor).
if not kwargs['inbuf']:
return -errno.EINVAL, '', ERROR_MSG_EMPTY_INPUT_FILE
return func(*args, **kwargs)
+ check.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
return check
-def _get_localized_key(prefix, key):
+def _get_localized_key(prefix: str, key: str) -> str:
return '{}/{}'.format(prefix, key)
-class Option(dict):
- """
- Helper class to declare options for MODULE_OPTIONS list.
+"""
+MODULE_OPTIONS types and Option Class
+"""
+if TYPE_CHECKING:
+ OptionTypeLabel = Literal[
+ 'uint', 'int', 'str', 'float', 'bool', 'addr', 'addrvec', 'uuid', 'size', 'secs']
+
+
+# common/options.h: value_t
+OptionValue = Optional[Union[bool, int, float, str]]
- Caveat: it uses argument names matching Python keywords (type, min, max),
- so any further processing should happen in a separate method.
- TODO: type validation.
+class Option(Dict):
+ """
+ Helper class to declare options for MODULE_OPTIONS list.
+ TODO: Replace with typing.TypedDict when in python_version >= 3.8
"""
def __init__(
- self, name,
- default=None,
- type='str',
- desc=None, longdesc=None,
- min=None, max=None,
- enum_allowed=None,
- see_also=None,
- tags=None,
- runtime=False,
+ self,
+ name: str,
+ default: OptionValue = None,
+ type: 'OptionTypeLabel' = 'str',
+ desc: Optional[str] = None,
+ long_desc: Optional[str] = None,
+ min: OptionValue = None,
+ max: OptionValue = None,
+ enum_allowed: Optional[List[str]] = None,
+ tags: Optional[List[str]] = None,
+ see_also: Optional[List[str]] = None,
+ runtime: bool = False,
):
super(Option, self).__init__(
(k, v) for k, v in vars().items()
if k != 'self' and v is not None)
+
class Command(dict):
"""
Helper class to declare options for COMMANDS list.
def __init__(
self,
- prefix,
- args=None,
- perm="rw",
- desc=None,
- poll=False,
- handler=None
+ prefix: str,
+ handler: HandlerFuncType,
+ perm: str = "rw",
+ poll: bool = False,
):
- super(Command, self).__init__(
- cmd=prefix + (' ' + args if args else ''),
- perm=perm,
- desc=desc,
- poll=poll)
+ super().__init__(perm=perm,
+ poll=poll)
self.prefix = prefix
- self.args = args
self.handler = handler
- def register(self, instance=False):
+ @staticmethod
+ def returns_command_result(instance: Any,
+ f: HandlerFuncType) -> Callable[..., HandleCommandResult]:
+ @functools.wraps(f)
+ def wrapper(mgr: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
+ retval, stdout, stderr = f(instance or mgr, *args, **kwargs)
+ return HandleCommandResult(retval, stdout, stderr)
+ wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined]
+ return wrapper
+
+ def register(self, instance: bool = False) -> HandlerFuncType:
"""
Register a CLICommand handler. It allows an instance to register bound
methods. In that case, the mgr instance is not passed, and it's expected
It also uses HandleCommandResult helper to return a wrapped a tuple of 3
items.
"""
- return CLICommand(
- prefix=self.prefix,
- args=self.args,
- desc=self['desc'],
- perm=self['perm']
- )(
- func=lambda mgr, *args, **kwargs: HandleCommandResult(*self.handler(
- *((instance or mgr,) + args), **kwargs))
- )
+ cmd = CLICommand(prefix=self.prefix, perm=self['perm'])
+ return cmd(self.returns_command_result(instance, self.handler))
class CPlusPlusHandler(logging.Handler):
- def __init__(self, module_inst):
+ def __init__(self, module_inst: Any):
super(CPlusPlusHandler, self).__init__()
self._module = module_inst
self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
.format(module_inst.module_name)))
- def emit(self, record):
+ def emit(self, record: logging.LogRecord) -> None:
if record.levelno >= self.level:
self._module._ceph_log(self.format(record))
+
class ClusterLogHandler(logging.Handler):
- def __init__(self, module_inst):
+ def __init__(self, module_inst: Any):
super().__init__()
self._module = module_inst
self.setFormatter(logging.Formatter("%(message)s"))
- def emit(self, record):
+ def emit(self, record: logging.LogRecord) -> None:
levelmap = {
- 'DEBUG': MgrModule.CLUSTER_LOG_PRIO_DEBUG,
- 'INFO': MgrModule.CLUSTER_LOG_PRIO_INFO,
- 'WARNING': MgrModule.CLUSTER_LOG_PRIO_WARN,
- 'ERROR': MgrModule.CLUSTER_LOG_PRIO_ERROR,
- 'CRITICAL': MgrModule.CLUSTER_LOG_PRIO_ERROR,
+ logging.DEBUG: MgrModule.ClusterLogPrio.DEBUG,
+ logging.INFO: MgrModule.ClusterLogPrio.INFO,
+ logging.WARNING: MgrModule.ClusterLogPrio.WARN,
+ logging.ERROR: MgrModule.ClusterLogPrio.ERROR,
+ logging.CRITICAL: MgrModule.ClusterLogPrio.ERROR,
}
- level = levelmap[record.levelname]
+ level = levelmap[record.levelno]
if record.levelno >= self.level:
self._module.cluster_log(self._module.module_name,
level,
self.format(record))
+
class FileHandler(logging.FileHandler):
- def __init__(self, module_inst):
+ def __init__(self, module_inst: Any):
path = module_inst.get_ceph_option("log_file")
idx = path.rfind(".log")
if idx != -1:
class MgrModuleLoggingMixin(object):
- def _configure_logging(self, mgr_level, module_level, cluster_level,
- log_to_file, log_to_cluster):
- self._mgr_level = None
- self._module_level = None
+ def _configure_logging(self,
+ mgr_level: str,
+ module_level: str,
+ cluster_level: str,
+ log_to_file: bool,
+ log_to_cluster: bool) -> None:
+ self._mgr_level: Optional[str] = None
+ self._module_level: Optional[str] = None
self._root_logger = logging.getLogger()
self._unconfigure_logging()
self._root_logger.setLevel(logging.NOTSET)
self._set_log_level(mgr_level, module_level, cluster_level)
-
- def _unconfigure_logging(self):
+ def _unconfigure_logging(self) -> None:
# remove existing handlers:
rm_handlers = [
- h for h in self._root_logger.handlers if isinstance(h, CPlusPlusHandler) or isinstance(h, FileHandler) or isinstance(h, ClusterLogHandler)]
+ h for h in self._root_logger.handlers
+ if (isinstance(h, CPlusPlusHandler) or
+ isinstance(h, FileHandler) or
+ isinstance(h, ClusterLogHandler))]
for h in rm_handlers:
self._root_logger.removeHandler(h)
self.log_to_file = False
self.log_to_cluster = False
- def _set_log_level(self, mgr_level, module_level, cluster_level):
+ def _set_log_level(self,
+ mgr_level: str,
+ module_level: str,
+ cluster_level: str) -> None:
self._cluster_log_handler.setLevel(cluster_level.upper())
module_level = module_level.upper() if module_level else ''
if not self._module_level and not module_level:
level = self._ceph_log_level_to_python(mgr_level)
- self.getLogger().debug("setting log level based on debug_mgr: %s (%s)", level, mgr_level)
+ self.getLogger().debug("setting log level based on debug_mgr: %s (%s)",
+ level, mgr_level)
elif self._module_level and not module_level:
level = self._ceph_log_level_to_python(mgr_level)
self.getLogger().warning("unsetting module log level, falling back to "
self._mgr_log_handler.setLevel(level)
self._file_log_handler.setLevel(level)
- def _enable_file_log(self):
+ def _enable_file_log(self) -> None:
# enable file log
self.getLogger().warning("enabling logging to file")
self.log_to_file = True
self._root_logger.addHandler(self._file_log_handler)
- def _disable_file_log(self):
+ def _disable_file_log(self) -> None:
# disable file log
self.getLogger().warning("disabling logging to file")
self.log_to_file = False
self._root_logger.removeHandler(self._file_log_handler)
- def _enable_cluster_log(self):
+ def _enable_cluster_log(self) -> None:
# enable cluster log
self.getLogger().warning("enabling logging to cluster")
self.log_to_cluster = True
self._root_logger.addHandler(self._cluster_log_handler)
- def _disable_cluster_log(self):
+ def _disable_cluster_log(self) -> None:
# disable cluster log
self.getLogger().warning("disabling logging to cluster")
self.log_to_cluster = False
self._root_logger.removeHandler(self._cluster_log_handler)
- def _ceph_log_level_to_python(self, ceph_log_level):
- if ceph_log_level:
+ def _ceph_log_level_to_python(self, log_level: str) -> str:
+ if log_level:
try:
- ceph_log_level = int(ceph_log_level.split("/", 1)[0])
+ ceph_log_level = int(log_level.split("/", 1)[0])
except ValueError:
ceph_log_level = 0
else:
log_level = "INFO"
return log_level
- def getLogger(self, name=None):
+ def getLogger(self, name: Optional[str] = None) -> logging.Logger:
return logging.getLogger(name)
from their active peer), and to configuration settings (read only).
"""
- MODULE_OPTIONS = [] # type: List[Dict[str, Any]]
+ MODULE_OPTIONS: List[Option] = []
MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
- def __init__(self, module_name, capsule):
+ def __init__(self, module_name: str, capsule: Any):
super(MgrStandbyModule, self).__init__(capsule)
self.module_name = module_name
else:
self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
- mgr_level = self.get_ceph_option("debug_mgr")
- log_level = self.get_module_option("log_level")
- cluster_level = self.get_module_option('log_to_cluster_level')
+ # mock does not return a str
+ mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
+ log_level = cast(str, self.get_module_option("log_level"))
+ cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
self._configure_logging(mgr_level, log_level, cluster_level,
False, False)
# for backwards compatibility
self._logger = self.getLogger()
- def __del__(self):
+ def __del__(self) -> None:
self._cleanup()
self._unconfigure_logging()
- def _cleanup(self):
+ def _cleanup(self) -> None:
pass
@classmethod
- def _register_options(cls, module_name):
+ def _register_options(cls, module_name: str) -> None:
cls.MODULE_OPTIONS.append(
Option(name='log_level', type='str', default="", runtime=True,
enum_allowed=['info', 'debug', 'critical', 'error',
'warning', '']))
@property
- def log(self):
+ def log(self) -> logging.Logger:
return self._logger
- def serve(self):
+ def serve(self) -> None:
"""
The serve method is mandatory for standby modules.
:return:
"""
raise NotImplementedError()
- def get_mgr_id(self):
+ def get_mgr_id(self) -> str:
return self._ceph_get_mgr_id()
- def get_module_option(self, key, default=None):
+ def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
"""
Retrieve the value of a persistent configuration setting
- :param str key:
:param default: the default value of the config if it is not found
- :return: str
"""
r = self._ceph_get_module_option(key)
if r is None:
else:
return r
- def get_ceph_option(self, key):
+ def get_ceph_option(self, key: str) -> OptionValue:
return self._ceph_get_option(key)
- def get_store(self, key):
+ def get_store(self, key: str) -> Optional[str]:
"""
Retrieve the value of a persistent KV store entry
"""
return self._ceph_get_store(key)
- def get_active_uri(self):
+ def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
+ r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
+ if r is None:
+ r = self._ceph_get_store(key)
+ if r is None:
+ r = default
+ return r
+
+ def get_active_uri(self) -> str:
return self._ceph_get_active_uri()
- def get_localized_module_option(self, key, default=None):
+ def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
r = self._ceph_get_module_option(key, self.get_mgr_id())
if r is None:
return self.MODULE_OPTION_DEFAULTS.get(key, default)
return r
+HealthChecksT = Mapping[str, Mapping[str, Union[int, str, Sequence[str]]]]
+# {"type": service_type, "id": service_id}
+ServiceInfoT = Dict[str, str]
+# {"hostname": hostname,
+# "ceph_version": version,
+# "services": [service_info, ..]}
+ServerInfoT = Dict[str, Union[str, List[ServiceInfoT]]]
+PerfCounterT = Dict[str, Any]
+
+
class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
COMMANDS = [] # type: List[Any]
- MODULE_OPTIONS = [] # type: List[dict]
+ MODULE_OPTIONS: List[Option] = []
MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
# Priority definitions for perf counters
NONE = 1
# Cluster log priorities
- CLUSTER_LOG_PRIO_DEBUG = 0
- CLUSTER_LOG_PRIO_INFO = 1
- CLUSTER_LOG_PRIO_SEC = 2
- CLUSTER_LOG_PRIO_WARN = 3
- CLUSTER_LOG_PRIO_ERROR = 4
-
- def __init__(self, module_name, py_modules_ptr, this_ptr):
+ class ClusterLogPrio(IntEnum):
+ DEBUG = 0
+ INFO = 1
+ SEC = 2
+ WARN = 3
+ ERROR = 4
+
+ def __init__(self, module_name: str, py_modules_ptr: object, this_ptr: object):
self.module_name = module_name
super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
# with default and user-supplied option values.
self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
- mgr_level = self.get_ceph_option("debug_mgr")
- log_level = self.get_module_option("log_level")
- cluster_level = self.get_module_option('log_to_cluster_level')
+ mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
+ log_level = cast(str, self.get_module_option("log_level"))
+ cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
log_to_file = self.get_module_option("log_to_file")
+ assert isinstance(log_to_file, bool)
log_to_cluster = self.get_module_option("log_to_cluster")
+ assert isinstance(log_to_cluster, bool)
self._configure_logging(mgr_level, log_level, cluster_level,
log_to_file, log_to_cluster)
self._perf_schema_cache = None
# Keep a librados instance for those that need it.
- self._rados = None
+ self._rados: Optional[rados.Rados] = None
-
- def __del__(self):
+ def __del__(self) -> None:
self._unconfigure_logging()
@classmethod
- def _register_options(cls, module_name):
+ def _register_options(cls, module_name: str) -> None:
cls.MODULE_OPTIONS.append(
Option(name='log_level', type='str', default="", runtime=True,
enum_allowed=['info', 'debug', 'critical', 'error',
'warning', '']))
@classmethod
- def _register_commands(cls, module_name):
+ def _register_commands(cls, module_name: str) -> None:
cls.COMMANDS.extend(CLICommand.dump_cmd_list())
@property
- def log(self):
+ def log(self) -> logging.Logger:
return self._logger
- def cluster_log(self, channel, priority, message):
+ def cluster_log(self, channel: str, priority: ClusterLogPrio, message: str) -> None:
"""
:param channel: The log channel. This can be 'cluster', 'audit', ...
- :type channel: str
- :param priority: The log message priority. This can be
- CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO,
- CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or
- CLUSTER_LOG_PRIO_ERROR.
- :type priority: int
+ :param priority: The log message priority.
:param message: The message to log.
- :type message: str
"""
- self._ceph_cluster_log(channel, priority, message)
+ self._ceph_cluster_log(channel, priority.value, message)
@property
- def version(self):
+ def version(self) -> str:
return self._version
@property
- def release_name(self):
+ def release_name(self) -> str:
"""
Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
:return: Returns the release name of the Ceph version in lower case.
"""
return self._ceph_get_release_name()
- def get_context(self):
+ def lookup_release_name(self, major: int) -> str:
+ return self._ceph_lookup_release_name(major)
+
+ def get_context(self) -> object:
"""
:return: a Python capsule containing a C++ CephContext pointer
"""
return self._ceph_get_context()
- def notify(self, notify_type, notify_id):
+ def notify(self, notify_type: str, notify_id: str) -> None:
"""
Called by the ceph-mgr service to notify the Python plugin
that new state is available.
"""
pass
- def _config_notify(self):
+ def _config_notify(self) -> None:
# check logging options for changes
- mgr_level = self.get_ceph_option("debug_mgr")
- module_level = self.get_module_option("log_level")
- cluster_level = self.get_module_option("log_to_cluster_level")
+ mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
+ module_level = cast(str, self.get_module_option("log_level"))
+ cluster_level = cast(str, self.get_module_option("log_to_cluster_level"))
+ assert isinstance(cluster_level, str)
log_to_file = self.get_module_option("log_to_file", False)
+ assert isinstance(log_to_file, bool)
log_to_cluster = self.get_module_option("log_to_cluster", False)
-
+ assert isinstance(log_to_cluster, bool)
self._set_log_level(mgr_level, module_level, cluster_level)
if log_to_file != self.log_to_file:
# call module subclass implementations
self.config_notify()
- def config_notify(self):
+ def config_notify(self) -> None:
"""
Called by the ceph-mgr service to notify the Python plugin
that the configuration may have changed. Modules will want to
"""
pass
- def serve(self):
+ def serve(self) -> None:
"""
Called by the ceph-mgr service to start any server that
is provided by this Python plugin. The implementation
"""
pass
- def shutdown(self):
+ def shutdown(self) -> None:
"""
Called by the ceph-mgr service to request that this
module drop out of its serve() function. You do not
self._rados.shutdown()
self._ceph_unregister_client(addrs)
- def get(self, data_name):
+ def get(self, data_name: str):
"""
Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
"""
return self._ceph_get(data_name)
- def _stattype_to_str(self, stattype):
+ def _stattype_to_str(self, stattype: int) -> str:
typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
if typeonly == 0:
return ''
- def _perfpath_to_path_labels(self, daemon, path):
- # type: (str, str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]
+ def _perfpath_to_path_labels(self, daemon: str,
+ path: str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]:
label_names = ("ceph_daemon",) # type: Tuple[str, ...]
labels = (daemon,) # type: Tuple[str, ...]
return path, label_names, labels,
- def _perfvalue_to_value(self, stattype, value):
+ def _perfvalue_to_value(self, stattype: int, value: Union[int, float]) -> Union[float, int]:
if stattype & self.PERFCOUNTER_TIME:
# Convert from ns to seconds
return value / 1000000000.0
else:
return value
- def _unit_to_str(self, unit):
+ def _unit_to_str(self, unit: int) -> str:
if unit == self.NONE:
return "/s"
elif unit == self.BYTES:
return "B/s"
+ else:
+ raise ValueError(f'bad unit "{unit}"')
@staticmethod
- def to_pretty_iec(n):
+ def to_pretty_iec(n: int) -> str:
for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
(20, 'Mi'), (10, 'Ki')]:
if n > 10 << bits:
return str(n) + ' '
@staticmethod
- def get_pretty_row(elems, width):
+ def get_pretty_row(elems: Sequence[str], width: int) -> str:
"""
Takes an array of elements and returns a string with those elements
formatted as a table row. Useful for polling modules.
return ret
- def get_pretty_header(self, elems, width):
+ def get_pretty_header(self, elems: Sequence[str], width: int) -> str:
"""
Like ``get_pretty_row`` but adds dashes, to be used as a table title.
return ret
- def get_server(self, hostname):
+ def get_server(self, hostname) -> ServerInfoT:
"""
Called by the plugin to fetch metadata about a particular hostname from
ceph-mgr.
:param hostname: a hostname
"""
- return self._ceph_get_server(hostname)
+ return cast(ServerInfoT, self._ceph_get_server(hostname))
- def get_perf_schema(self, svc_type, svc_name):
+ def get_perf_schema(self,
+ svc_type: str,
+ svc_name: str) -> Dict[str,
+ Dict[str, Dict[str, Union[str, int]]]]:
"""
Called by the plugin to fetch perf counter schema info.
svc_name can be nullptr, as can svc_type, in which case
"""
return self._ceph_get_perf_schema(svc_type, svc_name)
- def get_counter(self, svc_type, svc_name, path):
+ def get_counter(self,
+ svc_type: str,
+ svc_name: str,
+ path: str) -> Dict[str, List[Tuple[float, int]]]:
"""
Called by the plugin to fetch the latest performance counter data for a
particular counter on a particular service.
:param str svc_name:
:param str path: a period-separated concatenation of the subsystem and the
counter name, for example "mds.inodes".
- :return: A list of two-tuples of (timestamp, value) is returned. This may be
- empty if no data is available.
+ :return: A dict of counter names to their values. each value is a list of
+ of two-tuples of (timestamp, value). This may be empty if no data is
+ available.
"""
return self._ceph_get_counter(svc_type, svc_name, path)
- def get_latest_counter(self, svc_type, svc_name, path):
+ def get_latest_counter(self,
+ svc_type: str,
+ svc_name: str,
+ path: str) -> Dict[str, Union[Tuple[float, int],
+ Tuple[float, int, int]]]:
"""
Called by the plugin to fetch only the newest performance counter data
- pointfor a particular counter on a particular service.
+ point for a particular counter on a particular service.
:param str svc_type:
:param str svc_name:
:param str path: a period-separated concatenation of the subsystem and the
counter name, for example "mds.inodes".
- :return: A list of two-tuples of (timestamp, value) is returned. This may be
- empty if no data is available.
+ :return: A list of two-tuples of (timestamp, value) or three-tuple of
+ (timestamp, value, count) is returned. This may be empty if no
+ data is available.
"""
return self._ceph_get_latest_counter(svc_type, svc_name, path)
- def list_servers(self):
+ def list_servers(self) -> List[ServerInfoT]:
"""
Like ``get_server``, but gives information about all servers (i.e. all
unique hostnames that have been mentioned in daemon metadata)
:return: a list of information about all servers
:rtype: list
"""
- return self._ceph_get_server(None)
+ return cast(List[ServerInfoT], self._ceph_get_server(None))
- def get_metadata(self, svc_type, svc_id, default=None):
+ def get_metadata(self,
+ svc_type: str,
+ svc_id: str,
+ default: Optional[Dict[str, str]] = None) -> Optional[Dict[str, str]]:
"""
Fetch the daemon metadata for a particular service.
return default
return metadata
- def get_daemon_status(self, svc_type, svc_id):
+ def get_daemon_status(self, svc_type: str, svc_id: str) -> Dict[str, str]:
"""
Fetch the latest status for a particular service daemon.
"""
return self._ceph_get_daemon_status(svc_type, svc_id)
- def check_mon_command(self, cmd_dict: dict, inbuf: Optional[str]=None) -> HandleCommandResult:
+ def check_mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> HandleCommandResult:
"""
Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
if ``retval != 0``.
raise MonCommandFailed(f'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
return r
- def mon_command(self, cmd_dict: dict, inbuf: Optional[str]=None):
+ def mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
"""
Helper for modules that do simple, synchronous mon command
execution.
svc_id: str,
command: str,
tag: str,
- inbuf: Optional[str]=None):
+ inbuf: Optional[str] = None) -> None:
"""
Called by the plugin to send a command to the mon
cluster.
"""
self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
- def set_health_checks(self, checks):
+ def set_health_checks(self, checks: HealthChecksT) -> None:
"""
Set the module's current map of health checks. Argument is a
dict of check names to info, in this form:
"""
self._ceph_set_health_checks(checks)
- def _handle_command(self, inbuf, cmd):
+ def _handle_command(self,
+ inbuf: str,
+ cmd: Dict[str, Any]) -> Union[HandleCommandResult,
+ Tuple[int, str, str]]:
if cmd['prefix'] not in CLICommand.COMMANDS:
return self.handle_command(inbuf, cmd)
return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
- def handle_command(self, inbuf, cmd):
+ def handle_command(self,
+ inbuf: str,
+ cmd: Dict[str, Any]) -> Union[HandleCommandResult,
+ Tuple[int, str, str]]:
"""
Called by ceph-mgr to request the plugin to handle one
of the commands that it declared in self.COMMANDS
# any ``COMMANDS``
raise NotImplementedError()
- def get_mgr_id(self):
+ def get_mgr_id(self) -> str:
"""
Retrieve the name of the manager daemon where this plugin
is currently being executed (i.e. the active manager).
"""
return self._ceph_get_mgr_id()
- def get_ceph_option(self, key):
+ def get_ceph_option(self, key: str) -> OptionValue:
return self._ceph_get_option(key)
- def _validate_module_option(self, key):
+ def get_foreign_ceph_option(self, entity: str, key: str) -> OptionValue:
+ return self._ceph_get_foreign_option(entity, key)
+
+ def _validate_module_option(self, key: str) -> None:
"""
Helper: don't allow get/set config callers to
access config options that they didn't declare
raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
format(key, self.__class__.__name__))
- def _get_module_option(self, key, default, localized_prefix=""):
+ def _get_module_option(self,
+ key: str,
+ default: OptionValue,
+ localized_prefix: str = "") -> OptionValue:
r = self._ceph_get_module_option(self.module_name, key,
localized_prefix)
if r is None:
else:
return r
- def get_module_option(self, key, default=None):
+ def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
"""
Retrieve the value of a persistent configuration setting
-
- :param str key:
- :param str default:
- :return: str
"""
self._validate_module_option(key)
return self._get_module_option(key, default)
- def get_module_option_ex(self, module, key, default=None):
+ def get_module_option_ex(self, module: str,
+ key: str,
+ default: OptionValue = None) -> OptionValue:
"""
Retrieve the value of a persistent configuration setting
for the specified module.
- :param str module: The name of the module, e.g. 'dashboard'
+ :param module: The name of the module, e.g. 'dashboard'
or 'telemetry'.
- :param str key: The configuration key, e.g. 'server_addr'.
- :param str,None default: The default value to use when the
+ :param key: The configuration key, e.g. 'server_addr'.
+ :param default: The default value to use when the
returned value is ``None``. Defaults to ``None``.
- :return: str,int,bool,float,None
"""
if module == self.module_name:
self._validate_module_option(key)
r = self._ceph_get_module_option(module, key)
return default if r is None else r
- def get_store_prefix(self, key_prefix):
+ def get_store_prefix(self, key_prefix: str) -> Dict[str, str]:
"""
Retrieve a dict of KV store keys to values, where the keys
have the given prefix
"""
return self._ceph_get_store_prefix(key_prefix)
- def _set_localized(self, key, val, setter):
+ def _set_localized(self,
+ key: str,
+ val: Optional[str],
+ setter: Callable[[str, Optional[str]], None]) -> None:
return setter(_get_localized_key(self.get_mgr_id(), key), val)
- def get_localized_module_option(self, key, default=None):
+ def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
"""
Retrieve localized configuration for this ceph-mgr instance
- :param str key:
- :param str default:
- :return: str
"""
self._validate_module_option(key)
return self._get_module_option(key, default, self.get_mgr_id())
- def _set_module_option(self, key, val):
+ def _set_module_option(self, key: str, val: Any) -> None:
return self._ceph_set_module_option(self.module_name, key,
None if val is None else str(val))
- def set_module_option(self, key, val):
+ def set_module_option(self, key: str, val: Any) -> None:
"""
Set the value of a persistent configuration setting
self._validate_module_option(key)
return self._set_module_option(key, val)
- def set_module_option_ex(self, module, key, val):
+ def set_module_option_ex(self, module: str, key: str, val: OptionValue) -> None:
"""
Set the value of a persistent configuration setting
for the specified module.
self._validate_module_option(key)
return self._ceph_set_module_option(module, key, str(val))
- def set_localized_module_option(self, key, val):
+ def set_localized_module_option(self, key: str, val: Optional[str]) -> None:
"""
Set localized configuration for this ceph-mgr instance
:param str key:
self._validate_module_option(key)
return self._set_localized(key, val, self._set_module_option)
- def set_store(self, key, val):
+ def set_store(self, key: str, val: Optional[str]) -> None:
"""
Set a value in this module's persistent key value store.
If val is None, remove key from store
-
- :param str key:
- :param str val:
"""
self._ceph_set_store(key, val)
- def get_store(self, key, default=None):
+ def get_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
"""
Get a value from this module's persistent key value store
"""
else:
return r
- def get_localized_store(self, key, default=None):
+ def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
if r is None:
r = self._ceph_get_store(key)
r = default
return r
- def set_localized_store(self, key, val):
+ def set_localized_store(self, key: str, val: Optional[str]) -> None:
return self._set_localized(key, val, self.set_store)
- def self_test(self):
+ def self_test(self) -> None:
"""
Run a self-test on the module. Override this function and implement
a best as possible self-test for (automated) testing of the module
"""
pass
- def get_osdmap(self):
+ def get_osdmap(self) -> OSDMap:
"""
Get a handle to an OSDMap. If epoch==0, get a handle for the latest
OSDMap.
:return: OSDMap
"""
- return self._ceph_get_osdmap()
+ return cast(OSDMap, self._ceph_get_osdmap())
- def get_latest(self, daemon_type, daemon_name, counter):
+ def get_latest(self, daemon_type: str, daemon_name: str, counter: str) -> int:
data = self.get_latest_counter(
daemon_type, daemon_name, counter)[counter]
if data:
else:
return 0
- def get_latest_avg(self, daemon_type, daemon_name, counter):
+ def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tuple[int, int]:
data = self.get_latest_counter(
daemon_type, daemon_name, counter)[counter]
if data:
- return data[1], data[2]
+ # https://github.com/python/mypy/issues/1178
+ _, value, count = cast(Tuple[float, int, int], data)
+ return value, count
else:
return 0, 0
@profile_method()
- def get_all_perf_counters(self, prio_limit=PRIO_USEFUL,
- services=("mds", "mon", "osd",
- "rbd-mirror", "rgw", "tcmu-runner")):
+ def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL,
+ services: Sequence[str] = ("mds", "mon", "osd",
+ "rbd-mirror", "rgw",
+ "tcmu-runner")) -> Dict[str, dict]:
"""
Return the perf counters currently known to this ceph-mgr
instance, filtered by priority equal to or greater than `prio_limit`.
result = defaultdict(dict) # type: Dict[str, dict]
for server in self.list_servers():
- for service in server['services']:
+ for service in cast(List[ServiceInfoT], server['services']):
if service['type'] not in services:
continue
- schema = self.get_perf_schema(service['type'], service['id'])
- if not schema:
+ schemas = self.get_perf_schema(service['type'], service['id'])
+ if not schemas:
self.log.warning("No perf counter schema for {0}.{1}".format(
service['type'], service['id']
))
# get just the service we're asking about
svc_full_name = "{0}.{1}".format(
service['type'], service['id'])
- schema = schema[svc_full_name]
+ schema = schemas[svc_full_name]
# Populate latest values
for counter_path, counter_schema in schema.items():
# self.log.debug("{0}: {1}".format(
# counter_path, json.dumps(counter_schema)
# ))
- if counter_schema['priority'] < prio_limit:
+ priority = counter_schema['priority']
+ assert isinstance(priority, int)
+ if priority < prio_limit:
continue
+ tp = counter_schema['type']
+ assert isinstance(tp, int)
counter_info = dict(counter_schema)
-
# Also populate count for the long running avgs
- if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG:
+ if tp & self.PERFCOUNTER_LONGRUNAVG:
v, c = self.get_latest_avg(
service['type'],
service['id'],
return result
- def set_uri(self, uri):
+ def set_uri(self, uri: str) -> None:
"""
If the module exposes a service, then call this to publish the
address once it is available.
"""
return self._ceph_set_uri(uri)
- def have_mon_connection(self):
+ def set_device_wear_level(self, devid: str, wear_level: float) -> None:
+ return self._ceph_set_device_wear_level(devid, wear_level)
+
+ def have_mon_connection(self) -> bool:
"""
Check whether this ceph-mgr daemon has an open connection
to a monitor. If it doesn't, then it's likely that the
return self._ceph_have_mon_connection()
- def update_progress_event(self, evid, desc, progress):
- return self._ceph_update_progress_event(str(evid),
- str(desc),
- float(progress))
+ def update_progress_event(self,
+ evid: str,
+ desc: str,
+ progress: float,
+ add_to_ceph_s: bool) -> None:
+ return self._ceph_update_progress_event(evid, desc, progress, add_to_ceph_s)
- def complete_progress_event(self, evid):
- return self._ceph_complete_progress_event(str(evid))
+ def complete_progress_event(self, evid: str) -> None:
+ return self._ceph_complete_progress_event(evid)
- def clear_all_progress_events(self):
+ def clear_all_progress_events(self) -> None:
return self._ceph_clear_all_progress_events()
@property
- def rados(self):
+ def rados(self) -> rados.Rados:
"""
A librados instance to be shared by any classes within
this mgr module that want one.
return self._rados
@staticmethod
- def can_run():
+ def can_run() -> Tuple[bool, str]:
"""
Implement this function to report whether the module's dependencies
are met. For example, if the module needs to import a particular
return True, ""
- def remote(self, module_name, method_name, *args, **kwargs):
+ def remote(self, module_name: str, method_name: str, *args: Any, **kwargs: Any) -> Any:
"""
Invoke a method on another module. All arguments, and the return
value from the other module must be serializable.
return self._ceph_dispatch_remote(module_name, method_name,
args, kwargs)
- def add_osd_perf_query(self, query):
+ def add_osd_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
"""
Register an OSD perf query. Argument is a
dict of the query parameters, in this form:
"""
return self._ceph_add_osd_perf_query(query)
- def remove_osd_perf_query(self, query_id):
+ def remove_osd_perf_query(self, query_id: int) -> None:
"""
Unregister an OSD perf query.
"""
return self._ceph_remove_osd_perf_query(query_id)
- def get_osd_perf_counters(self, query_id):
+ def get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
"""
Get stats collected for an OSD perf query.
"""
return self._ceph_get_osd_perf_counters(query_id)
- def is_authorized(self, arguments):
+ def add_mds_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
+ """
+ Register an MDS perf query. Argument is a
+ dict of the query parameters, in this form:
+
+ ::
+
+ {
+ 'key_descriptor': [
+ {'type': subkey_type, 'regex': regex_pattern},
+ ...
+ ],
+ 'performance_counter_descriptors': [
+ list, of, descriptor, types
+ ],
+ }
+
+ NOTE: 'limit' and 'order_by' are not supported (yet).
+
+ Valid subkey types:
+ 'mds_rank', 'client_id'
+ Valid performance counter types:
+ 'cap_hit_metric'
+
+ :param object query: query
+ :rtype: int (query id)
+ """
+ return self._ceph_add_mds_perf_query(query)
+
+ def remove_mds_perf_query(self, query_id: int) -> None:
+ """
+ Unregister an MDS perf query.
+
+ :param int query_id: query ID
+ """
+ return self._ceph_remove_mds_perf_query(query_id)
+
+ def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
+ """
+ Get stats collected for an MDS perf query.
+
+ :param int query_id: query ID
+ """
+ return self._ceph_get_mds_perf_counters(query_id)
+
+ def is_authorized(self, arguments: Dict[str, str]) -> bool:
"""
Verifies that the current session caps permit executing the py service
or current module with the provided arguments. This provides a generic