]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/mgr_module.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
index a35d6115996661e51bc69a4ea18f3a29440203b7..701b1eaf1e5dc33765d420469d229f8c1f48140b 100644 (file)
@@ -1,21 +1,42 @@
 import ceph_module  # noqa
 
-try:
-    from typing import Set, Tuple, Iterator, Any, Dict, Optional, Callable, List
-except ImportError:
-    # just for type checking
-    pass
+from typing import cast, Tuple, Any, Dict, Generic, Optional, Callable, List, \
+    Mapping, NamedTuple, Sequence, Union, TYPE_CHECKING
+if TYPE_CHECKING:
+    import sys
+    if sys.version_info >= (3, 8):
+        from typing import Literal
+    else:
+        from typing_extensions import Literal
+
+import inspect
 import logging
 import errno
+import functools
 import json
-import six
 import threading
-from collections import defaultdict, namedtuple
+from collections import defaultdict
+from enum import IntEnum
 import rados
 import re
+import sys
 import time
+from ceph_argparse import CephArgtype
 from mgr_util import profile_method
 
+if sys.version_info >= (3, 8):
+    from typing import get_args, get_origin
+else:
+    def get_args(tp):
+        if tp is Generic:
+            return tp
+        else:
+            return getattr(tp, '__args__', ())
+
+    def get_origin(tp):
+        return getattr(tp, '__origin__', None)
+
+
 ERROR_MSG_EMPTY_INPUT_FILE = 'Empty content: please add a password/secret to the file.'
 ERROR_MSG_NO_INPUT_FILE = 'Please specify the file containing the password/secret with "-i" option.'
 # Full list of strings in "osd_types.cc:pg_state_string()"
@@ -62,7 +83,7 @@ class CommandResult(object):
     Use with MgrModule.send_command
     """
 
-    def __init__(self, tag=None):
+    def __init__(self, tag: Optional[str] = None):
         self.ev = threading.Event()
         self.outs = ""
         self.outb = ""
@@ -72,115 +93,111 @@ class CommandResult(object):
         # C++ land, to avoid passing addresses around in messages.
         self.tag = tag if tag else ""
 
-    def complete(self, r, outb, outs):
+    def complete(self, r: int, outb: str, outs: str) -> None:
         self.r = r
         self.outb = outb
         self.outs = outs
         self.ev.set()
 
-    def wait(self):
+    def wait(self) -> Tuple[int, str, str]:
         self.ev.wait()
         return self.r, self.outb, self.outs
 
 
-class HandleCommandResult(namedtuple('HandleCommandResult', ['retval', 'stdout', 'stderr'])):
-    def __new__(cls, retval=0, stdout="", stderr=""):
-        """
-        Tuple containing the result of `handle_command()`
-
-        Only write to stderr if there is an error, or in extraordinary circumstances
+class HandleCommandResult(NamedTuple):
+    """
+    Tuple containing the result of `handle_command()`
 
-        Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
-        is critical information to include there.
+    Only write to stderr if there is an error, or in extraordinary circumstances
 
-        Everything programmatically consumable should be put on stdout
+    Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
+    is critical information to include there.
 
-        :param retval: return code. E.g. 0 or -errno.EINVAL
-        :type retval: int
-        :param stdout: data of this result.
-        :type stdout: str
-        :param stderr: Typically used for error messages.
-        :type stderr: str
-        """
-        return super(HandleCommandResult, cls).__new__(cls, retval, stdout, stderr)
+    Everything programmatically consumable should be put on stdout
+    """
+    retval: int = 0             # return code. E.g. 0 or -errno.EINVAL
+    stdout: str = ""            # data of this result.
+    stderr: str = ""            # Typically used for error messages.
 
 
 class MonCommandFailed(RuntimeError): pass
 
 
 class OSDMap(ceph_module.BasePyOSDMap):
-    def get_epoch(self):
+    def get_epoch(self) -> int:
         return self._get_epoch()
 
-    def get_crush_version(self):
+    def get_crush_version(self) -> int:
         return self._get_crush_version()
 
-    def dump(self):
+    def dump(self) -> Dict[str, Any]:
         return self._dump()
 
-    def get_pools(self):
+    def get_pools(self) -> Dict[int, Dict[str, Any]]:
         # FIXME: efficient implementation
         d = self._dump()
         return dict([(p['pool'], p) for p in d['pools']])
 
-    def get_pools_by_name(self):
+    def get_pools_by_name(self) -> Dict[str, Dict[str, Any]]:
         # FIXME: efficient implementation
         d = self._dump()
         return dict([(p['pool_name'], p) for p in d['pools']])
 
-    def new_incremental(self):
+    def new_incremental(self) -> 'OSDMapIncremental':
         return self._new_incremental()
 
-    def apply_incremental(self, inc):
+    def apply_incremental(self, inc: 'OSDMapIncremental') -> 'OSDMap':
         return self._apply_incremental(inc)
 
-    def get_crush(self):
+    def get_crush(self) -> 'CRUSHMap':
         return self._get_crush()
 
-    def get_pools_by_take(self, take):
+    def get_pools_by_take(self, take: int) -> List[int]:
         return self._get_pools_by_take(take).get('pools', [])
 
-    def calc_pg_upmaps(self, inc,
-                       max_deviation=.01, max_iterations=10, pools=None):
+    def calc_pg_upmaps(self, inc: 'OSDMapIncremental',
+                       max_deviation: int,
+                       max_iterations: int = 10,
+                       pools: Optional[List[str]] = None) -> int:
         if pools is None:
             pools = []
         return self._calc_pg_upmaps(
             inc,
             max_deviation, max_iterations, pools)
 
-    def map_pool_pgs_up(self, poolid):
+    def map_pool_pgs_up(self, poolid: int) -> List[int]:
         return self._map_pool_pgs_up(poolid)
 
-    def pg_to_up_acting_osds(self, pool_id, ps):
+    def pg_to_up_acting_osds(self, pool_id: int, ps: int) -> Dict[str, Any]:
         return self._pg_to_up_acting_osds(pool_id, ps)
 
-    def pool_raw_used_rate(self, pool_id):
+    def pool_raw_used_rate(self, pool_id: int) -> float:
         return self._pool_raw_used_rate(pool_id)
 
-    def get_ec_profile(self, name):
+    def get_ec_profile(self, name: str) -> Optional[List[Dict[str, str]]]:
         # FIXME: efficient implementation
         d = self._dump()
         return d['erasure_code_profiles'].get(name, None)
 
-    def get_require_osd_release(self):
+    def get_require_osd_release(self) -> str:
         d = self._dump()
         return d['require_osd_release']
 
 
 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
-    def get_epoch(self):
+    def get_epoch(self) -> int:
         return self._get_epoch()
 
-    def dump(self):
+    def dump(self) -> Dict[str, Any]:
         return self._dump()
 
-    def set_osd_reweights(self, weightmap):
+    def set_osd_reweights(self, weightmap: Dict[int, float]) -> None:
         """
         weightmap is a dict, int to float.  e.g. { 0: .9, 1: 1.0, 3: .997 }
         """
         return self._set_osd_reweights(weightmap)
 
-    def set_crush_compat_weight_set_weights(self, weightmap):
+    def set_crush_compat_weight_set_weights(self, weightmap: Dict[str, float]) -> None:
         """
         weightmap is a dict, int to float.  devices only.  e.g.,
         { 0: 3.4, 1: 3.3, 2: 3.334 }
@@ -192,31 +209,33 @@ class CRUSHMap(ceph_module.BasePyCRUSH):
     ITEM_NONE = 0x7fffffff
     DEFAULT_CHOOSE_ARGS = '-1'
 
-    def dump(self):
+    def dump(self) -> Dict[str, Any]:
         return self._dump()
 
-    def get_item_weight(self, item):
+    def get_item_weight(self, item: int) -> Optional[int]:
         return self._get_item_weight(item)
 
-    def get_item_name(self, item):
+    def get_item_name(self, item: int) -> Optional[str]:
         return self._get_item_name(item)
 
-    def find_takes(self):
+    def find_takes(self) -> List[int]:
         return self._find_takes().get('takes', [])
 
-    def get_take_weight_osd_map(self, root):
+    def get_take_weight_osd_map(self, root: int) -> Dict[int, float]:
         uglymap = self._get_take_weight_osd_map(root)
-        return {int(k): v for k, v in six.iteritems(uglymap.get('weights', {}))}
+        return {int(k): v for k, v in uglymap.get('weights', {}).items()}
 
     @staticmethod
-    def have_default_choose_args(dump):
+    def have_default_choose_args(dump: Dict[str, Any]) -> bool:
         return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
 
     @staticmethod
-    def get_default_choose_args(dump):
-        return dump.get('choose_args').get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
+    def get_default_choose_args(dump: Dict[str, Any]) -> List[Dict[str, Any]]:
+        choose_args = dump.get('choose_args')
+        assert isinstance(choose_args, dict)
+        return choose_args.get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
 
-    def get_rule(self, rule_name):
+    def get_rule(self, rule_name: str) -> Optional[Dict[str, Any]]:
         # TODO efficient implementation
         for rule in self.dump()['rules']:
             if rule_name == rule['rule_name']:
@@ -224,35 +243,35 @@ class CRUSHMap(ceph_module.BasePyCRUSH):
 
         return None
 
-    def get_rule_by_id(self, rule_id):
+    def get_rule_by_id(self, rule_id: int) -> Optional[Dict[str, Any]]:
         for rule in self.dump()['rules']:
             if rule['rule_id'] == rule_id:
                 return rule
 
         return None
 
-    def get_rule_root(self, rule_name):
+    def get_rule_root(self, rule_name: str) -> Optional[int]:
         rule = self.get_rule(rule_name)
         if rule is None:
             return None
 
         try:
-            first_take = [s for s in rule['steps'] if s['op'] == 'take'][0]
-        except IndexError:
+            first_take = next(s for s in rule['steps'] if s.get('op') == 'take')
+        except StopIteration:
             logging.warning("CRUSH rule '{0}' has no 'take' step".format(
                 rule_name))
             return None
         else:
             return first_take['item']
 
-    def get_osds_under(self, root_id):
+    def get_osds_under(self, root_id: int) -> List[int]:
         # TODO don't abuse dump like this
         d = self.dump()
         buckets = dict([(b['id'], b) for b in d['buckets']])
 
         osd_list = []
 
-        def accumulate(b):
+        def accumulate(b: Dict[str, Any]) -> None:
             for item in b['items']:
                 if item['id'] >= 0:
                     osd_list.append(item['id'])
@@ -266,7 +285,7 @@ class CRUSHMap(ceph_module.BasePyCRUSH):
 
         return osd_list
 
-    def device_class_counts(self):
+    def device_class_counts(self) -> Dict[str, int]:
         result = defaultdict(int)  # type: Dict[str, int]
         # TODO don't abuse dump like this
         d = self.dump()
@@ -277,71 +296,123 @@ class CRUSHMap(ceph_module.BasePyCRUSH):
         return dict(result)
 
 
+HandlerFuncType = Callable[..., Tuple[int, str, str]]
+
+
 class CLICommand(object):
     COMMANDS = {}  # type: Dict[str, CLICommand]
 
-    def __init__(self, prefix, args="", desc="", perm="rw"):
+    def __init__(self,
+                 prefix: str,
+                 perm: str = 'rw',
+                 poll: bool = False):
         self.prefix = prefix
-        self.args = args
-        self.args_dict = {}
-        self.desc = desc
         self.perm = perm
+        self.poll = poll
         self.func = None  # type: Optional[Callable]
-        self._parse_args()
-
-    def _parse_args(self):
-        if not self.args:
-            return
-        args = self.args.split(" ")
-        for arg in args:
-            arg_desc = arg.strip().split(",")
-            arg_d = {}
-            for kv in arg_desc:
-                k, v = kv.split("=")
-                if k != "name":
-                    arg_d[k] = v
-                else:
-                    self.args_dict[v] = arg_d
+        self.arg_spec = {}    # type: Dict[str, Any]
+        self.first_default = -1
 
-    def __call__(self, func):
+    KNOWN_ARGS = '_', 'self', 'mgr', 'inbuf', 'return'
+
+    @staticmethod
+    def load_func_metadata(f: HandlerFuncType) -> Tuple[str, Dict[str, Any], int, str]:
+        desc = inspect.getdoc(f) or ''
+        full_argspec = inspect.getfullargspec(f)
+        arg_spec = full_argspec.annotations
+        first_default = len(arg_spec)
+        if full_argspec.defaults:
+            first_default -= len(full_argspec.defaults)
+        args = []
+        for index, arg in enumerate(full_argspec.args):
+            if arg in CLICommand.KNOWN_ARGS:
+                continue
+            assert arg in arg_spec, \
+                f"'{arg}' is not annotated for {f}: {full_argspec}"
+            has_default = index >= first_default
+            args.append(CephArgtype.to_argdesc(arg_spec[arg],
+                                               dict(name=arg),
+                                               has_default))
+        return desc, arg_spec, first_default, ' '.join(args)
+
+    def store_func_metadata(self, f: HandlerFuncType) -> None:
+        self.desc, self.arg_spec, self.first_default, self.args = \
+            self.load_func_metadata(f)
+
+    def __call__(self, func: HandlerFuncType) -> HandlerFuncType:
+        self.store_func_metadata(func)
         self.func = func
         self.COMMANDS[self.prefix] = self
         return self.func
 
-    def call(self, mgr, cmd_dict, inbuf):
+    def _get_arg_value(self, kwargs_switch: bool, key: str, val: Any) -> Tuple[bool, str, Any]:
+        def start_kwargs() -> bool:
+            if isinstance(val, str) and '=' in val:
+                k, v = val.split('=', 1)
+                if k in self.arg_spec:
+                    return True
+            return False
+
+        if not kwargs_switch:
+            kwargs_switch = start_kwargs()
+
+        if kwargs_switch:
+            k, v = val.split('=', 1)
+        else:
+            k, v = key, val
+        return kwargs_switch, k.replace('-', '_'), v
+
+    def _collect_args_by_argspec(self, cmd_dict: Dict[str, Any]) -> Dict[str, Any]:
         kwargs = {}
-        for a, d in self.args_dict.items():
-            if 'req' in d and d['req'] == "false" and a not in cmd_dict:
+        kwargs_switch = False
+        for index, (name, tp) in enumerate(self.arg_spec.items()):
+            if name in CLICommand.KNOWN_ARGS:
                 continue
-            kwargs[a.replace("-", "_")] = cmd_dict[a]
+            assert self.first_default >= 0
+            raw_v = cmd_dict.get(name)
+            if index >= self.first_default:
+                if raw_v is None:
+                    continue
+            kwargs_switch, k, v = self._get_arg_value(kwargs_switch,
+                                                      name, raw_v)
+            kwargs[k] = CephArgtype.cast_to(tp, v)
+        return kwargs
+
+    def call(self,
+             mgr: Any,
+             cmd_dict: Dict[str, Any],
+             inbuf: Optional[str] = None) -> HandleCommandResult:
+        kwargs = self._collect_args_by_argspec(cmd_dict)
         if inbuf:
             kwargs['inbuf'] = inbuf
         assert self.func
         return self.func(mgr, **kwargs)
 
-    def dump_cmd(self):
+    def dump_cmd(self) -> Dict[str, Union[str, bool]]:
         return {
             'cmd': '{} {}'.format(self.prefix, self.args),
             'desc': self.desc,
-            'perm': self.perm
+            'perm': self.perm,
+            'poll': self.poll,
         }
 
     @classmethod
-    def dump_cmd_list(cls):
+    def dump_cmd_list(cls) -> List[Dict[str, Union[str, bool]]]:
         return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
 
 
-def CLIReadCommand(prefix, args="", desc=""):
-    return CLICommand(prefix, args, desc, "r")
+def CLIReadCommand(prefix: str, poll: bool = False) -> CLICommand:
+    return CLICommand(prefix, "r", poll)
 
 
-def CLIWriteCommand(prefix, args="", desc=""):
-    return CLICommand(prefix, args, desc, "w")
+def CLIWriteCommand(prefix: str, poll: bool = False) -> CLICommand:
+    return CLICommand(prefix, "w", poll)
 
 
-def CLICheckNonemptyFileInput(func):
-    def check(*args, **kwargs):
-        if not 'inbuf' in kwargs:
+def CLICheckNonemptyFileInput(func: HandlerFuncType) -> HandlerFuncType:
+    @functools.wraps(func)
+    def check(*args: Any, **kwargs: Any) -> Tuple[int, str, str]:
+        if 'inbuf' not in kwargs:
             return -errno.EINVAL, '', ERROR_MSG_NO_INPUT_FILE
         if isinstance(kwargs['inbuf'], str):
             # Delete new line separator at EOF (it may have been added by a text editor).
@@ -349,38 +420,51 @@ def CLICheckNonemptyFileInput(func):
         if not kwargs['inbuf']:
             return -errno.EINVAL, '', ERROR_MSG_EMPTY_INPUT_FILE
         return func(*args, **kwargs)
+    check.__signature__ = inspect.signature(func)  # type: ignore[attr-defined]
     return check
 
 
-def _get_localized_key(prefix, key):
+def _get_localized_key(prefix: str, key: str) -> str:
     return '{}/{}'.format(prefix, key)
 
 
-class Option(dict):
-    """
-    Helper class to declare options for MODULE_OPTIONS list.
+"""
+MODULE_OPTIONS types and Option Class
+"""
+if TYPE_CHECKING:
+    OptionTypeLabel = Literal[
+        'uint', 'int', 'str', 'float', 'bool', 'addr', 'addrvec', 'uuid', 'size', 'secs']
+
+
+# common/options.h: value_t
+OptionValue = Optional[Union[bool, int, float, str]]
 
-    Caveat: it uses argument names matching Python keywords (type, min, max),
-    so any further processing should happen in a separate method.
 
-    TODO: type validation.
+class Option(Dict):
+    """
+    Helper class to declare options for MODULE_OPTIONS list.
+    TODO: Replace with typing.TypedDict when in python_version >= 3.8
     """
 
     def __init__(
-            self, name,
-            default=None,
-            type='str',
-            desc=None, longdesc=None,
-            min=None, max=None,
-            enum_allowed=None,
-            see_also=None,
-            tags=None,
-            runtime=False,
+            self,
+            name: str,
+            default: OptionValue = None,
+            type: 'OptionTypeLabel' = 'str',
+            desc: Optional[str] = None,
+            long_desc: Optional[str] = None,
+            min: OptionValue = None,
+            max: OptionValue = None,
+            enum_allowed: Optional[List[str]] = None,
+            tags: Optional[List[str]] = None,
+            see_also: Optional[List[str]] = None,
+            runtime: bool = False,
     ):
         super(Option, self).__init__(
             (k, v) for k, v in vars().items()
             if k != 'self' and v is not None)
 
+
 class Command(dict):
     """
     Helper class to declare options for COMMANDS list.
@@ -398,23 +482,27 @@ class Command(dict):
 
     def __init__(
             self,
-            prefix,
-            args=None,
-            perm="rw",
-            desc=None,
-            poll=False,
-            handler=None
+            prefix: str,
+            handler: HandlerFuncType,
+            perm: str = "rw",
+            poll: bool = False,
     ):
-        super(Command, self).__init__(
-            cmd=prefix + (' ' + args if args else ''),
-            perm=perm,
-            desc=desc,
-            poll=poll)
+        super().__init__(perm=perm,
+                         poll=poll)
         self.prefix = prefix
-        self.args = args
         self.handler = handler
 
-    def register(self, instance=False):
+    @staticmethod
+    def returns_command_result(instance: Any,
+                               f: HandlerFuncType) -> Callable[..., HandleCommandResult]:
+        @functools.wraps(f)
+        def wrapper(mgr: Any, *args: Any, **kwargs: Any) -> HandleCommandResult:
+            retval, stdout, stderr = f(instance or mgr, *args, **kwargs)
+            return HandleCommandResult(retval, stdout, stderr)
+        wrapper.__signature__ = inspect.signature(f)  # type: ignore[attr-defined]
+        return wrapper
+
+    def register(self, instance: bool = False) -> HandlerFuncType:
         """
         Register a CLICommand handler. It allows an instance to register bound
         methods. In that case, the mgr instance is not passed, and it's expected
@@ -422,50 +510,45 @@ class Command(dict):
         It also uses HandleCommandResult helper to return a wrapped a tuple of 3
         items.
         """
-        return CLICommand(
-            prefix=self.prefix,
-            args=self.args,
-            desc=self['desc'],
-            perm=self['perm']
-        )(
-            func=lambda mgr, *args, **kwargs: HandleCommandResult(*self.handler(
-                *((instance or mgr,) + args), **kwargs))
-        )
+        cmd = CLICommand(prefix=self.prefix, perm=self['perm'])
+        return cmd(self.returns_command_result(instance, self.handler))
 
 
 class CPlusPlusHandler(logging.Handler):
-    def __init__(self, module_inst):
+    def __init__(self, module_inst: Any):
         super(CPlusPlusHandler, self).__init__()
         self._module = module_inst
         self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
                           .format(module_inst.module_name)))
 
-    def emit(self, record):
+    def emit(self, record: logging.LogRecord) -> None:
         if record.levelno >= self.level:
             self._module._ceph_log(self.format(record))
 
+
 class ClusterLogHandler(logging.Handler):
-    def __init__(self, module_inst):
+    def __init__(self, module_inst: Any):
         super().__init__()
         self._module = module_inst
         self.setFormatter(logging.Formatter("%(message)s"))
 
-    def emit(self, record):
+    def emit(self, record: logging.LogRecord) -> None:
         levelmap = {
-            'DEBUG': MgrModule.CLUSTER_LOG_PRIO_DEBUG,
-            'INFO': MgrModule.CLUSTER_LOG_PRIO_INFO,
-            'WARNING': MgrModule.CLUSTER_LOG_PRIO_WARN,
-            'ERROR': MgrModule.CLUSTER_LOG_PRIO_ERROR,
-            'CRITICAL': MgrModule.CLUSTER_LOG_PRIO_ERROR,
+            logging.DEBUG: MgrModule.ClusterLogPrio.DEBUG,
+            logging.INFO: MgrModule.ClusterLogPrio.INFO,
+            logging.WARNING: MgrModule.ClusterLogPrio.WARN,
+            logging.ERROR: MgrModule.ClusterLogPrio.ERROR,
+            logging.CRITICAL: MgrModule.ClusterLogPrio.ERROR,
         }
-        level = levelmap[record.levelname]
+        level = levelmap[record.levelno]
         if record.levelno >= self.level:
             self._module.cluster_log(self._module.module_name,
                                      level,
                                      self.format(record))
 
+
 class FileHandler(logging.FileHandler):
-    def __init__(self, module_inst):
+    def __init__(self, module_inst: Any):
         path = module_inst.get_ceph_option("log_file")
         idx = path.rfind(".log")
         if idx != -1:
@@ -477,10 +560,14 @@ class FileHandler(logging.FileHandler):
 
 
 class MgrModuleLoggingMixin(object):
-    def _configure_logging(self, mgr_level, module_level, cluster_level,
-                           log_to_file, log_to_cluster):
-        self._mgr_level = None
-        self._module_level = None
+    def _configure_logging(self,
+                           mgr_level: str,
+                           module_level: str,
+                           cluster_level: str,
+                           log_to_file: bool,
+                           log_to_cluster: bool) -> None:
+        self._mgr_level: Optional[str] = None
+        self._module_level: Optional[str] = None
         self._root_logger = logging.getLogger()
 
         self._unconfigure_logging()
@@ -502,17 +589,22 @@ class MgrModuleLoggingMixin(object):
         self._root_logger.setLevel(logging.NOTSET)
         self._set_log_level(mgr_level, module_level, cluster_level)
 
-
-    def _unconfigure_logging(self):
+    def _unconfigure_logging(self) -> None:
         # remove existing handlers:
         rm_handlers = [
-            h for h in self._root_logger.handlers if isinstance(h, CPlusPlusHandler) or isinstance(h, FileHandler) or isinstance(h, ClusterLogHandler)]
+            h for h in self._root_logger.handlers
+            if (isinstance(h, CPlusPlusHandler) or
+                isinstance(h, FileHandler) or
+                isinstance(h, ClusterLogHandler))]
         for h in rm_handlers:
             self._root_logger.removeHandler(h)
         self.log_to_file = False
         self.log_to_cluster = False
 
-    def _set_log_level(self, mgr_level, module_level, cluster_level):
+    def _set_log_level(self,
+                       mgr_level: str,
+                       module_level: str,
+                       cluster_level: str) -> None:
         self._cluster_log_handler.setLevel(cluster_level.upper())
 
         module_level = module_level.upper() if module_level else ''
@@ -528,7 +620,8 @@ class MgrModuleLoggingMixin(object):
 
         if not self._module_level and not module_level:
             level = self._ceph_log_level_to_python(mgr_level)
-            self.getLogger().debug("setting log level based on debug_mgr: %s (%s)", level, mgr_level)
+            self.getLogger().debug("setting log level based on debug_mgr: %s (%s)",
+                                   level, mgr_level)
         elif self._module_level and not module_level:
             level = self._ceph_log_level_to_python(mgr_level)
             self.getLogger().warning("unsetting module log level, falling back to "
@@ -543,34 +636,34 @@ class MgrModuleLoggingMixin(object):
         self._mgr_log_handler.setLevel(level)
         self._file_log_handler.setLevel(level)
 
-    def _enable_file_log(self):
+    def _enable_file_log(self) -> None:
         # enable file log
         self.getLogger().warning("enabling logging to file")
         self.log_to_file = True
         self._root_logger.addHandler(self._file_log_handler)
 
-    def _disable_file_log(self):
+    def _disable_file_log(self) -> None:
         # disable file log
         self.getLogger().warning("disabling logging to file")
         self.log_to_file = False
         self._root_logger.removeHandler(self._file_log_handler)
 
-    def _enable_cluster_log(self):
+    def _enable_cluster_log(self) -> None:
         # enable cluster log
         self.getLogger().warning("enabling logging to cluster")
         self.log_to_cluster = True
         self._root_logger.addHandler(self._cluster_log_handler)
 
-    def _disable_cluster_log(self):
+    def _disable_cluster_log(self) -> None:
         # disable cluster log
         self.getLogger().warning("disabling logging to cluster")
         self.log_to_cluster = False
         self._root_logger.removeHandler(self._cluster_log_handler)
 
-    def _ceph_log_level_to_python(self, ceph_log_level):
-        if ceph_log_level:
+    def _ceph_log_level_to_python(self, log_level: str) -> str:
+        if log_level:
             try:
-                ceph_log_level = int(ceph_log_level.split("/", 1)[0])
+                ceph_log_level = int(log_level.split("/", 1)[0])
             except ValueError:
                 ceph_log_level = 0
         else:
@@ -585,7 +678,7 @@ class MgrModuleLoggingMixin(object):
             log_level = "INFO"
         return log_level
 
-    def getLogger(self, name=None):
+    def getLogger(self, name: Optional[str] = None) -> logging.Logger:
         return logging.getLogger(name)
 
 
@@ -599,10 +692,10 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
     from their active peer), and to configuration settings (read only).
     """
 
-    MODULE_OPTIONS = []  # type: List[Dict[str, Any]]
+    MODULE_OPTIONS: List[Option] = []
     MODULE_OPTION_DEFAULTS = {}  # type: Dict[str, Any]
 
-    def __init__(self, module_name, capsule):
+    def __init__(self, module_name: str, capsule: Any):
         super(MgrStandbyModule, self).__init__(capsule)
         self.module_name = module_name
 
@@ -614,24 +707,25 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
                 else:
                     self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
 
-        mgr_level = self.get_ceph_option("debug_mgr")
-        log_level = self.get_module_option("log_level")
-        cluster_level = self.get_module_option('log_to_cluster_level')
+        # mock does not return a str
+        mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
+        log_level = cast(str, self.get_module_option("log_level"))
+        cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
         self._configure_logging(mgr_level, log_level, cluster_level,
                                 False, False)
 
         # for backwards compatibility
         self._logger = self.getLogger()
 
-    def __del__(self):
+    def __del__(self) -> None:
         self._cleanup()
         self._unconfigure_logging()
 
-    def _cleanup(self):
+    def _cleanup(self) -> None:
         pass
 
     @classmethod
-    def _register_options(cls, module_name):
+    def _register_options(cls, module_name: str) -> None:
         cls.MODULE_OPTIONS.append(
             Option(name='log_level', type='str', default="", runtime=True,
                    enum_allowed=['info', 'debug', 'critical', 'error',
@@ -649,26 +743,24 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
                                  'warning', '']))
 
     @property
-    def log(self):
+    def log(self) -> logging.Logger:
         return self._logger
 
-    def serve(self):
+    def serve(self) -> None:
         """
         The serve method is mandatory for standby modules.
         :return:
         """
         raise NotImplementedError()
 
-    def get_mgr_id(self):
+    def get_mgr_id(self) -> str:
         return self._ceph_get_mgr_id()
 
-    def get_module_option(self, key, default=None):
+    def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
         """
         Retrieve the value of a persistent configuration setting
 
-        :param str key:
         :param default: the default value of the config if it is not found
-        :return: str
         """
         r = self._ceph_get_module_option(key)
         if r is None:
@@ -676,10 +768,10 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
         else:
             return r
 
-    def get_ceph_option(self, key):
+    def get_ceph_option(self, key: str) -> OptionValue:
         return self._ceph_get_option(key)
 
-    def get_store(self, key):
+    def get_store(self, key: str) -> Optional[str]:
         """
         Retrieve the value of a persistent KV store entry
 
@@ -688,10 +780,18 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
         """
         return self._ceph_get_store(key)
 
-    def get_active_uri(self):
+    def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
+        r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
+        if r is None:
+            r = self._ceph_get_store(key)
+            if r is None:
+                r = default
+        return r
+
+    def get_active_uri(self) -> str:
         return self._ceph_get_active_uri()
 
-    def get_localized_module_option(self, key, default=None):
+    def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
         r = self._ceph_get_module_option(key, self.get_mgr_id())
         if r is None:
             return self.MODULE_OPTION_DEFAULTS.get(key, default)
@@ -699,9 +799,19 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
             return r
 
 
+HealthChecksT = Mapping[str, Mapping[str, Union[int, str, Sequence[str]]]]
+# {"type": service_type, "id": service_id}
+ServiceInfoT = Dict[str, str]
+# {"hostname": hostname,
+#  "ceph_version": version,
+#  "services": [service_info, ..]}
+ServerInfoT = Dict[str, Union[str, List[ServiceInfoT]]]
+PerfCounterT = Dict[str, Any]
+
+
 class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
     COMMANDS = []  # type: List[Any]
-    MODULE_OPTIONS = []  # type: List[dict]
+    MODULE_OPTIONS: List[Option] = []
     MODULE_OPTION_DEFAULTS = {}  # type: Dict[str, Any]
 
     # Priority definitions for perf counters
@@ -726,13 +836,14 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
     NONE = 1
 
     # Cluster log priorities
-    CLUSTER_LOG_PRIO_DEBUG = 0
-    CLUSTER_LOG_PRIO_INFO = 1
-    CLUSTER_LOG_PRIO_SEC = 2
-    CLUSTER_LOG_PRIO_WARN = 3
-    CLUSTER_LOG_PRIO_ERROR = 4
-
-    def __init__(self, module_name, py_modules_ptr, this_ptr):
+    class ClusterLogPrio(IntEnum):
+        DEBUG = 0
+        INFO = 1
+        SEC = 2
+        WARN = 3
+        ERROR = 4
+
+    def __init__(self, module_name: str, py_modules_ptr: object, this_ptr: object):
         self.module_name = module_name
         super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
 
@@ -748,11 +859,13 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
                     # with default and user-supplied option values.
                     self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
 
-        mgr_level = self.get_ceph_option("debug_mgr")
-        log_level = self.get_module_option("log_level")
-        cluster_level = self.get_module_option('log_to_cluster_level')
+        mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
+        log_level = cast(str, self.get_module_option("log_level"))
+        cluster_level = cast(str, self.get_module_option('log_to_cluster_level'))
         log_to_file = self.get_module_option("log_to_file")
+        assert isinstance(log_to_file, bool)
         log_to_cluster = self.get_module_option("log_to_cluster")
+        assert isinstance(log_to_cluster, bool)
         self._configure_logging(mgr_level, log_level, cluster_level,
                                 log_to_file, log_to_cluster)
 
@@ -764,14 +877,13 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         self._perf_schema_cache = None
 
         # Keep a librados instance for those that need it.
-        self._rados = None
+        self._rados: Optional[rados.Rados] = None
 
-
-    def __del__(self):
+    def __del__(self) -> None:
         self._unconfigure_logging()
 
     @classmethod
-    def _register_options(cls, module_name):
+    def _register_options(cls, module_name: str) -> None:
         cls.MODULE_OPTIONS.append(
             Option(name='log_level', type='str', default="", runtime=True,
                    enum_allowed=['info', 'debug', 'critical', 'error',
@@ -789,33 +901,27 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
                                  'warning', '']))
 
     @classmethod
-    def _register_commands(cls, module_name):
+    def _register_commands(cls, module_name: str) -> None:
         cls.COMMANDS.extend(CLICommand.dump_cmd_list())
 
     @property
-    def log(self):
+    def log(self) -> logging.Logger:
         return self._logger
 
-    def cluster_log(self, channel, priority, message):
+    def cluster_log(self, channel: str, priority: ClusterLogPrio, message: str) -> None:
         """
         :param channel: The log channel. This can be 'cluster', 'audit', ...
-        :type channel: str
-        :param priority: The log message priority. This can be
-                         CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO,
-                         CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or
-                         CLUSTER_LOG_PRIO_ERROR.
-        :type priority: int
+        :param priority: The log message priority.
         :param message: The message to log.
-        :type message: str
         """
-        self._ceph_cluster_log(channel, priority, message)
+        self._ceph_cluster_log(channel, priority.value, message)
 
     @property
-    def version(self):
+    def version(self) -> str:
         return self._version
 
     @property
-    def release_name(self):
+    def release_name(self) -> str:
         """
         Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
         :return: Returns the release name of the Ceph version in lower case.
@@ -823,13 +929,16 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         return self._ceph_get_release_name()
 
-    def get_context(self):
+    def lookup_release_name(self, major: int) -> str:
+        return self._ceph_lookup_release_name(major)
+
+    def get_context(self) -> object:
         """
         :return: a Python capsule containing a C++ CephContext pointer
         """
         return self._ceph_get_context()
 
-    def notify(self, notify_type, notify_id):
+    def notify(self, notify_type: str, notify_id: str) -> None:
         """
         Called by the ceph-mgr service to notify the Python plugin
         that new state is available.
@@ -844,14 +953,16 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         pass
 
-    def _config_notify(self):
+    def _config_notify(self) -> None:
         # check logging options for changes
-        mgr_level = self.get_ceph_option("debug_mgr")
-        module_level = self.get_module_option("log_level")
-        cluster_level = self.get_module_option("log_to_cluster_level")
+        mgr_level = cast(str, self.get_ceph_option("debug_mgr"))
+        module_level = cast(str, self.get_module_option("log_level"))
+        cluster_level = cast(str, self.get_module_option("log_to_cluster_level"))
+        assert isinstance(cluster_level, str)
         log_to_file = self.get_module_option("log_to_file", False)
+        assert isinstance(log_to_file, bool)
         log_to_cluster = self.get_module_option("log_to_cluster", False)
-
+        assert isinstance(log_to_cluster, bool)
         self._set_log_level(mgr_level, module_level, cluster_level)
 
         if log_to_file != self.log_to_file:
@@ -868,7 +979,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         # call module subclass implementations
         self.config_notify()
 
-    def config_notify(self):
+    def config_notify(self) -> None:
         """
         Called by the ceph-mgr service to notify the Python plugin
         that the configuration may have changed.  Modules will want to
@@ -876,7 +987,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         pass
 
-    def serve(self):
+    def serve(self) -> None:
         """
         Called by the ceph-mgr service to start any server that
         is provided by this Python plugin.  The implementation
@@ -886,7 +997,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         pass
 
-    def shutdown(self):
+    def shutdown(self) -> None:
         """
         Called by the ceph-mgr service to request that this
         module drop out of its serve() function.  You do not
@@ -899,7 +1010,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
             self._rados.shutdown()
             self._ceph_unregister_client(addrs)
 
-    def get(self, data_name):
+    def get(self, data_name: str):
         """
         Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
 
@@ -915,7 +1026,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         return self._ceph_get(data_name)
 
-    def _stattype_to_str(self, stattype):
+    def _stattype_to_str(self, stattype: int) -> str:
 
         typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
         if typeonly == 0:
@@ -930,8 +1041,8 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
 
         return ''
 
-    def _perfpath_to_path_labels(self, daemon, path):
-        # type: (str, str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]
+    def _perfpath_to_path_labels(self, daemon: str,
+                                 path: str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]:
         label_names = ("ceph_daemon",)  # type: Tuple[str, ...]
         labels = (daemon,)  # type: Tuple[str, ...]
 
@@ -950,21 +1061,23 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
 
         return path, label_names, labels,
 
-    def _perfvalue_to_value(self, stattype, value):
+    def _perfvalue_to_value(self, stattype: int, value: Union[int, float]) -> Union[float, int]:
         if stattype & self.PERFCOUNTER_TIME:
             # Convert from ns to seconds
             return value / 1000000000.0
         else:
             return value
 
-    def _unit_to_str(self, unit):
+    def _unit_to_str(self, unit: int) -> str:
         if unit == self.NONE:
             return "/s"
         elif unit == self.BYTES:
             return "B/s"
+        else:
+            raise ValueError(f'bad unit "{unit}"')
 
     @staticmethod
-    def to_pretty_iec(n):
+    def to_pretty_iec(n: int) -> str:
         for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
                              (20, 'Mi'), (10, 'Ki')]:
             if n > 10 << bits:
@@ -972,7 +1085,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         return str(n) + ' '
 
     @staticmethod
-    def get_pretty_row(elems, width):
+    def get_pretty_row(elems: Sequence[str], width: int) -> str:
         """
         Takes an array of elements and returns a string with those elements
         formatted as a table row. Useful for polling modules.
@@ -989,7 +1102,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
 
         return ret
 
-    def get_pretty_header(self, elems, width):
+    def get_pretty_header(self, elems: Sequence[str], width: int) -> str:
         """
         Like ``get_pretty_row`` but adds dashes, to be used as a table title.
 
@@ -1017,7 +1130,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
 
         return ret
 
-    def get_server(self, hostname):
+    def get_server(self, hostname) -> ServerInfoT:
         """
         Called by the plugin to fetch metadata about a particular hostname from
         ceph-mgr.
@@ -1027,9 +1140,12 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
 
         :param hostname: a hostname
         """
-        return self._ceph_get_server(hostname)
+        return cast(ServerInfoT, self._ceph_get_server(hostname))
 
-    def get_perf_schema(self, svc_type, svc_name):
+    def get_perf_schema(self,
+                        svc_type: str,
+                        svc_name: str) -> Dict[str,
+                                               Dict[str, Dict[str, Union[str, int]]]]:
         """
         Called by the plugin to fetch perf counter schema info.
         svc_name can be nullptr, as can svc_type, in which case
@@ -1041,7 +1157,10 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         return self._ceph_get_perf_schema(svc_type, svc_name)
 
-    def get_counter(self, svc_type, svc_name, path):
+    def get_counter(self,
+                    svc_type: str,
+                    svc_name: str,
+                    path: str) -> Dict[str, List[Tuple[float, int]]]:
         """
         Called by the plugin to fetch the latest performance counter data for a
         particular counter on a particular service.
@@ -1050,26 +1169,32 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         :param str svc_name:
         :param str path: a period-separated concatenation of the subsystem and the
             counter name, for example "mds.inodes".
-        :return: A list of two-tuples of (timestamp, value) is returned.  This may be
-            empty if no data is available.
+        :return: A dict of counter names to their values. each value is a list of
+            of two-tuples of (timestamp, value).  This may be empty if no data is
+            available.
         """
         return self._ceph_get_counter(svc_type, svc_name, path)
 
-    def get_latest_counter(self, svc_type, svc_name, path):
+    def get_latest_counter(self,
+                           svc_type: str,
+                           svc_name: str,
+                           path: str) -> Dict[str, Union[Tuple[float, int],
+                                                         Tuple[float, int, int]]]:
         """
         Called by the plugin to fetch only the newest performance counter data
-        pointfor a particular counter on a particular service.
+        point for a particular counter on a particular service.
 
         :param str svc_type:
         :param str svc_name:
         :param str path: a period-separated concatenation of the subsystem and the
             counter name, for example "mds.inodes".
-        :return: A list of two-tuples of (timestamp, value) is returned.  This may be
-            empty if no data is available.
+        :return: A list of two-tuples of (timestamp, value) or three-tuple of
+            (timestamp, value, count) is returned.  This may be empty if no
+            data is available.
         """
         return self._ceph_get_latest_counter(svc_type, svc_name, path)
 
-    def list_servers(self):
+    def list_servers(self) -> List[ServerInfoT]:
         """
         Like ``get_server``, but gives information about all servers (i.e. all
         unique hostnames that have been mentioned in daemon metadata)
@@ -1077,9 +1202,12 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         :return: a list of information about all servers
         :rtype: list
         """
-        return self._ceph_get_server(None)
+        return cast(List[ServerInfoT], self._ceph_get_server(None))
 
-    def get_metadata(self, svc_type, svc_id, default=None):
+    def get_metadata(self,
+                     svc_type: str,
+                     svc_id: str,
+                     default: Optional[Dict[str, str]] = None) -> Optional[Dict[str, str]]:
         """
         Fetch the daemon metadata for a particular service.
 
@@ -1097,7 +1225,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
             return default
         return metadata
 
-    def get_daemon_status(self, svc_type, svc_id):
+    def get_daemon_status(self, svc_type: str, svc_id: str) -> Dict[str, str]:
         """
         Fetch the latest status for a particular service daemon.
 
@@ -1110,7 +1238,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         return self._ceph_get_daemon_status(svc_type, svc_id)
 
-    def check_mon_command(self, cmd_dict: dict, inbuf: Optional[str]=None) -> HandleCommandResult:
+    def check_mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> HandleCommandResult:
         """
         Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
         if ``retval != 0``.
@@ -1121,7 +1249,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
             raise MonCommandFailed(f'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
         return r
 
-    def mon_command(self, cmd_dict: dict, inbuf: Optional[str]=None):
+    def mon_command(self, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
         """
         Helper for modules that do simple, synchronous mon command
         execution.
@@ -1150,7 +1278,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
             svc_id: str,
             command: str,
             tag: str,
-            inbuf: Optional[str]=None):
+            inbuf: Optional[str] = None) -> None:
         """
         Called by the plugin to send a command to the mon
         cluster.
@@ -1174,7 +1302,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
 
-    def set_health_checks(self, checks):
+    def set_health_checks(self, checks: HealthChecksT) -> None:
         """
         Set the module's current map of health checks.  Argument is a
         dict of check names to info, in this form:
@@ -1199,13 +1327,19 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         self._ceph_set_health_checks(checks)
 
-    def _handle_command(self, inbuf, cmd):
+    def _handle_command(self,
+                        inbuf: str,
+                        cmd: Dict[str, Any]) -> Union[HandleCommandResult,
+                                                      Tuple[int, str, str]]:
         if cmd['prefix'] not in CLICommand.COMMANDS:
             return self.handle_command(inbuf, cmd)
 
         return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
 
-    def handle_command(self, inbuf, cmd):
+    def handle_command(self,
+                       inbuf: str,
+                       cmd: Dict[str, Any]) -> Union[HandleCommandResult,
+                                                     Tuple[int, str, str]]:
         """
         Called by ceph-mgr to request the plugin to handle one
         of the commands that it declared in self.COMMANDS
@@ -1226,7 +1360,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         # any ``COMMANDS``
         raise NotImplementedError()
 
-    def get_mgr_id(self):
+    def get_mgr_id(self) -> str:
         """
         Retrieve the name of the manager daemon where this plugin
         is currently being executed (i.e. the active manager).
@@ -1235,10 +1369,13 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         return self._ceph_get_mgr_id()
 
-    def get_ceph_option(self, key):
+    def get_ceph_option(self, key: str) -> OptionValue:
         return self._ceph_get_option(key)
 
-    def _validate_module_option(self, key):
+    def get_foreign_ceph_option(self, entity: str, key: str) -> OptionValue:
+        return self._ceph_get_foreign_option(entity, key)
+
+    def _validate_module_option(self, key: str) -> None:
         """
         Helper: don't allow get/set config callers to
         access config options that they didn't declare
@@ -1248,7 +1385,10 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
             raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
                                format(key, self.__class__.__name__))
 
-    def _get_module_option(self, key, default, localized_prefix=""):
+    def _get_module_option(self,
+                           key: str,
+                           default: OptionValue,
+                           localized_prefix: str = "") -> OptionValue:
         r = self._ceph_get_module_option(self.module_name, key,
                                          localized_prefix)
         if r is None:
@@ -1256,35 +1396,32 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         else:
             return r
 
-    def get_module_option(self, key, default=None):
+    def get_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
         """
         Retrieve the value of a persistent configuration setting
-
-        :param str key:
-        :param str default:
-        :return: str
         """
         self._validate_module_option(key)
         return self._get_module_option(key, default)
 
-    def get_module_option_ex(self, module, key, default=None):
+    def get_module_option_ex(self, module: str,
+                             key: str,
+                             default: OptionValue = None) -> OptionValue:
         """
         Retrieve the value of a persistent configuration setting
         for the specified module.
 
-        :param str module: The name of the module, e.g. 'dashboard'
+        :param module: The name of the module, e.g. 'dashboard'
             or 'telemetry'.
-        :param str key: The configuration key, e.g. 'server_addr'.
-        :param str,None default: The default value to use when the
+        :param key: The configuration key, e.g. 'server_addr'.
+        :param default: The default value to use when the
             returned value is ``None``. Defaults to ``None``.
-        :return: str,int,bool,float,None
         """
         if module == self.module_name:
             self._validate_module_option(key)
         r = self._ceph_get_module_option(module, key)
         return default if r is None else r
 
-    def get_store_prefix(self, key_prefix):
+    def get_store_prefix(self, key_prefix: str) -> Dict[str, str]:
         """
         Retrieve a dict of KV store keys to values, where the keys
         have the given prefix
@@ -1294,24 +1431,24 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         return self._ceph_get_store_prefix(key_prefix)
 
-    def _set_localized(self, key, val, setter):
+    def _set_localized(self,
+                       key: str,
+                       val: Optional[str],
+                       setter: Callable[[str, Optional[str]], None]) -> None:
         return setter(_get_localized_key(self.get_mgr_id(), key), val)
 
-    def get_localized_module_option(self, key, default=None):
+    def get_localized_module_option(self, key: str, default: OptionValue = None) -> OptionValue:
         """
         Retrieve localized configuration for this ceph-mgr instance
-        :param str key:
-        :param str default:
-        :return: str
         """
         self._validate_module_option(key)
         return self._get_module_option(key, default, self.get_mgr_id())
 
-    def _set_module_option(self, key, val):
+    def _set_module_option(self, key: str, val: Any) -> None:
         return self._ceph_set_module_option(self.module_name, key,
                                             None if val is None else str(val))
 
-    def set_module_option(self, key, val):
+    def set_module_option(self, key: str, val: Any) -> None:
         """
         Set the value of a persistent configuration setting
 
@@ -1321,7 +1458,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         self._validate_module_option(key)
         return self._set_module_option(key, val)
 
-    def set_module_option_ex(self, module, key, val):
+    def set_module_option_ex(self, module: str, key: str, val: OptionValue) -> None:
         """
         Set the value of a persistent configuration setting
         for the specified module.
@@ -1334,7 +1471,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
             self._validate_module_option(key)
         return self._ceph_set_module_option(module, key, str(val))
 
-    def set_localized_module_option(self, key, val):
+    def set_localized_module_option(self, key: str, val: Optional[str]) -> None:
         """
         Set localized configuration for this ceph-mgr instance
         :param str key:
@@ -1344,17 +1481,14 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         self._validate_module_option(key)
         return self._set_localized(key, val, self._set_module_option)
 
-    def set_store(self, key, val):
+    def set_store(self, key: str, val: Optional[str]) -> None:
         """
         Set a value in this module's persistent key value store.
         If val is None, remove key from store
-
-        :param str key:
-        :param str val:
         """
         self._ceph_set_store(key, val)
 
-    def get_store(self, key, default=None):
+    def get_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
         """
         Get a value from this module's persistent key value store
         """
@@ -1364,7 +1498,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         else:
             return r
 
-    def get_localized_store(self, key, default=None):
+    def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
         r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
         if r is None:
             r = self._ceph_get_store(key)
@@ -1372,10 +1506,10 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
                 r = default
         return r
 
-    def set_localized_store(self, key, val):
+    def set_localized_store(self, key: str, val: Optional[str]) -> None:
         return self._set_localized(key, val, self.set_store)
 
-    def self_test(self):
+    def self_test(self) -> None:
         """
         Run a self-test on the module. Override this function and implement
         a best as possible self-test for (automated) testing of the module
@@ -1389,15 +1523,15 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         pass
 
-    def get_osdmap(self):
+    def get_osdmap(self) -> OSDMap:
         """
         Get a handle to an OSDMap.  If epoch==0, get a handle for the latest
         OSDMap.
         :return: OSDMap
         """
-        return self._ceph_get_osdmap()
+        return cast(OSDMap, self._ceph_get_osdmap())
 
-    def get_latest(self, daemon_type, daemon_name, counter):
+    def get_latest(self, daemon_type: str, daemon_name: str, counter: str) -> int:
         data = self.get_latest_counter(
             daemon_type, daemon_name, counter)[counter]
         if data:
@@ -1405,18 +1539,21 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         else:
             return 0
 
-    def get_latest_avg(self, daemon_type, daemon_name, counter):
+    def get_latest_avg(self, daemon_type: str, daemon_name: str, counter: str) -> Tuple[int, int]:
         data = self.get_latest_counter(
             daemon_type, daemon_name, counter)[counter]
         if data:
-            return data[1], data[2]
+            # https://github.com/python/mypy/issues/1178
+            _, value, count = cast(Tuple[float, int, int], data)
+            return value, count
         else:
             return 0, 0
 
     @profile_method()
-    def get_all_perf_counters(self, prio_limit=PRIO_USEFUL,
-                              services=("mds", "mon", "osd",
-                                        "rbd-mirror", "rgw", "tcmu-runner")):
+    def get_all_perf_counters(self, prio_limit: int = PRIO_USEFUL,
+                              services: Sequence[str] = ("mds", "mon", "osd",
+                                                         "rbd-mirror", "rgw",
+                                                         "tcmu-runner")) -> Dict[str, dict]:
         """
         Return the perf counters currently known to this ceph-mgr
         instance, filtered by priority equal to or greater than `prio_limit`.
@@ -1432,12 +1569,12 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         result = defaultdict(dict)  # type: Dict[str, dict]
 
         for server in self.list_servers():
-            for service in server['services']:
+            for service in cast(List[ServiceInfoT], server['services']):
                 if service['type'] not in services:
                     continue
 
-                schema = self.get_perf_schema(service['type'], service['id'])
-                if not schema:
+                schemas = self.get_perf_schema(service['type'], service['id'])
+                if not schemas:
                     self.log.warning("No perf counter schema for {0}.{1}".format(
                         service['type'], service['id']
                     ))
@@ -1447,20 +1584,23 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
                 # get just the service we're asking about
                 svc_full_name = "{0}.{1}".format(
                     service['type'], service['id'])
-                schema = schema[svc_full_name]
+                schema = schemas[svc_full_name]
 
                 # Populate latest values
                 for counter_path, counter_schema in schema.items():
                     # self.log.debug("{0}: {1}".format(
                     #     counter_path, json.dumps(counter_schema)
                     # ))
-                    if counter_schema['priority'] < prio_limit:
+                    priority = counter_schema['priority']
+                    assert isinstance(priority, int)
+                    if priority < prio_limit:
                         continue
 
+                    tp = counter_schema['type']
+                    assert isinstance(tp, int)
                     counter_info = dict(counter_schema)
-
                     # Also populate count for the long running avgs
-                    if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG:
+                    if tp & self.PERFCOUNTER_LONGRUNAVG:
                         v, c = self.get_latest_avg(
                             service['type'],
                             service['id'],
@@ -1481,7 +1621,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
 
         return result
 
-    def set_uri(self, uri):
+    def set_uri(self, uri: str) -> None:
         """
         If the module exposes a service, then call this to publish the
         address once it is available.
@@ -1490,7 +1630,10 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         return self._ceph_set_uri(uri)
 
-    def have_mon_connection(self):
+    def set_device_wear_level(self, devid: str, wear_level: float) -> None:
+        return self._ceph_set_device_wear_level(devid, wear_level)
+
+    def have_mon_connection(self) -> bool:
         """
         Check whether this ceph-mgr daemon has an open connection
         to a monitor.  If it doesn't, then it's likely that the
@@ -1500,19 +1643,21 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
 
         return self._ceph_have_mon_connection()
 
-    def update_progress_event(self, evid, desc, progress):
-        return self._ceph_update_progress_event(str(evid),
-                                                str(desc),
-                                                float(progress))
+    def update_progress_event(self,
+                              evid: str,
+                              desc: str,
+                              progress: float,
+                              add_to_ceph_s: bool) -> None:
+        return self._ceph_update_progress_event(evid, desc, progress, add_to_ceph_s)
 
-    def complete_progress_event(self, evid):
-        return self._ceph_complete_progress_event(str(evid))
+    def complete_progress_event(self, evid: str) -> None:
+        return self._ceph_complete_progress_event(evid)
 
-    def clear_all_progress_events(self):
+    def clear_all_progress_events(self) -> None:
         return self._ceph_clear_all_progress_events()
 
     @property
-    def rados(self):
+    def rados(self) -> rados.Rados:
         """
         A librados instance to be shared by any classes within
         this mgr module that want one.
@@ -1527,7 +1672,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         return self._rados
 
     @staticmethod
-    def can_run():
+    def can_run() -> Tuple[bool, str]:
         """
         Implement this function to report whether the module's dependencies
         are met.  For example, if the module needs to import a particular
@@ -1542,7 +1687,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
 
         return True, ""
 
-    def remote(self, module_name, method_name, *args, **kwargs):
+    def remote(self, module_name: str, method_name: str, *args: Any, **kwargs: Any) -> Any:
         """
         Invoke a method on another module.  All arguments, and the return
         value from the other module must be serializable.
@@ -1566,7 +1711,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         return self._ceph_dispatch_remote(module_name, method_name,
                                           args, kwargs)
 
-    def add_osd_perf_query(self, query):
+    def add_osd_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
         """
         Register an OSD perf query.  Argument is a
         dict of the query parameters, in this form:
@@ -1596,7 +1741,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         return self._ceph_add_osd_perf_query(query)
 
-    def remove_osd_perf_query(self, query_id):
+    def remove_osd_perf_query(self, query_id: int) -> None:
         """
         Unregister an OSD perf query.
 
@@ -1604,7 +1749,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         return self._ceph_remove_osd_perf_query(query_id)
 
-    def get_osd_perf_counters(self, query_id):
+    def get_osd_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
         """
         Get stats collected for an OSD perf query.
 
@@ -1612,7 +1757,52 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         return self._ceph_get_osd_perf_counters(query_id)
 
-    def is_authorized(self, arguments):
+    def add_mds_perf_query(self, query: Dict[str, Any]) -> Optional[int]:
+        """
+        Register an MDS perf query.  Argument is a
+        dict of the query parameters, in this form:
+
+        ::
+
+           {
+             'key_descriptor': [
+               {'type': subkey_type, 'regex': regex_pattern},
+               ...
+             ],
+             'performance_counter_descriptors': [
+               list, of, descriptor, types
+             ],
+           }
+
+        NOTE: 'limit' and 'order_by' are not supported (yet).
+
+        Valid subkey types:
+           'mds_rank', 'client_id'
+        Valid performance counter types:
+           'cap_hit_metric'
+
+        :param object query: query
+        :rtype: int (query id)
+        """
+        return self._ceph_add_mds_perf_query(query)
+
+    def remove_mds_perf_query(self, query_id: int) -> None:
+        """
+        Unregister an MDS perf query.
+
+        :param int query_id: query ID
+        """
+        return self._ceph_remove_mds_perf_query(query_id)
+
+    def get_mds_perf_counters(self, query_id: int) -> Optional[Dict[str, List[PerfCounterT]]]:
+        """
+        Get stats collected for an MDS perf query.
+
+        :param int query_id: query ID
+        """
+        return self._ceph_get_mds_perf_counters(query_id)
+
+    def is_authorized(self, arguments: Dict[str, str]) -> bool:
         """
         Verifies that the current session caps permit executing the py service
         or current module with the provided arguments. This provides a generic