]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/mgr_module.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
index 40fc8433dfe0ba7fd60b98e7c94d110131730b5e..d6ba31f72086247711e4b14b375b53a66856047d 100644 (file)
@@ -1,11 +1,12 @@
 import ceph_module  # noqa
 
 try:
-    from typing import Set, Tuple, Iterator, Any
+    from typing import Set, Tuple, Iterator, Any, Dict, Optional, Callable, List
 except ImportError:
     # just for type checking
     pass
 import logging
+import errno
 import json
 import six
 import threading
@@ -47,72 +48,6 @@ PG_STATES = [
     "unknown"]
 
 
-class CPlusPlusHandler(logging.Handler):
-    def __init__(self, module_inst):
-        super(CPlusPlusHandler, self).__init__()
-        self._module = module_inst
-
-    def emit(self, record):
-        if record.levelno <= logging.DEBUG:
-            ceph_level = 20
-        elif record.levelno <= logging.INFO:
-            ceph_level = 4
-        elif record.levelno <= logging.WARNING:
-            ceph_level = 1
-        else:
-            ceph_level = 0
-
-        self._module._ceph_log(ceph_level, self.format(record))
-
-
-def configure_logger(module_inst, module_name):
-    """
-    Create and configure the logger with the specified module.
-
-    A handler will be added to the root logger which will redirect
-    the messages from all loggers (incl. 3rd party libraries) to the
-    Ceph log.
-
-    :param module_inst: The module instance.
-    :type module_inst: instance
-    :param module_name: The module name.
-    :type module_name: str
-    :return: Return the logger with the specified name.
-    """
-    logger = logging.getLogger(module_name)
-    # Don't filter any logs at the python level, leave it to C++.
-    # FIXME: We should learn the log level from C++ land, and then
-    #        avoid calling the C++ level log when we know a message
-    #        is of an insufficient level to be ultimately output.
-    logger.setLevel(logging.DEBUG)  # Don't use NOTSET
-
-    root_logger = logging.getLogger()
-    # Add handler to the root logger, thus this module and all
-    # 3rd party libraries will log their messages to the Ceph log.
-    root_logger.addHandler(CPlusPlusHandler(module_inst))
-    # Set the log level to ``ERROR`` to ensure that we only get
-    # those message from 3rd party libraries (only effective if
-    # they use the default log level ``NOTSET``).
-    # Check https://docs.python.org/3/library/logging.html#logging.Logger.setLevel
-    # for more information about how the effective log level is
-    # determined.
-    root_logger.setLevel(logging.ERROR)
-
-    return logger
-
-
-def unconfigure_logger(module_name=None):
-    """
-    :param module_name: The module name. Defaults to ``None``.
-    :type module_name: str or None
-    """
-    logger = logging.getLogger(module_name)
-    rm_handlers = [
-        h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
-    for h in rm_handlers:
-        logger.removeHandler(h)
-
-
 class CommandResult(object):
     """
     Use with MgrModule.send_command
@@ -215,6 +150,10 @@ class OSDMap(ceph_module.BasePyOSDMap):
         d = self._dump()
         return d['erasure_code_profiles'].get(name, None)
 
+    def get_require_osd_release(self):
+        d = self._dump()
+        return d['require_osd_release']
+
 
 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
     def get_epoch(self):
@@ -288,7 +227,7 @@ class CRUSHMap(ceph_module.BasePyCRUSH):
         try:
             first_take = [s for s in rule['steps'] if s['op'] == 'take'][0]
         except IndexError:
-            self.log.warn("CRUSH rule '{0}' has no 'take' step".format(
+            logging.warning("CRUSH rule '{0}' has no 'take' step".format(
                 rule_name))
             return None
         else:
@@ -316,7 +255,7 @@ class CRUSHMap(ceph_module.BasePyCRUSH):
         return osd_list
 
     def device_class_counts(self):
-        result = defaultdict(int)
+        result = defaultdict(int)  # type: Dict[str, int]
         # TODO don't abuse dump like this
         d = self.dump()
         for device in d['devices']:
@@ -327,7 +266,7 @@ class CRUSHMap(ceph_module.BasePyCRUSH):
 
 
 class CLICommand(object):
-    COMMANDS = {}
+    COMMANDS = {}  # type: Dict[str, CLICommand]
 
     def __init__(self, prefix, args="", desc="", perm="rw"):
         self.prefix = prefix
@@ -335,7 +274,7 @@ class CLICommand(object):
         self.args_dict = {}
         self.desc = desc
         self.perm = perm
-        self.func = None
+        self.func = None  # type: Optional[Callable]
         self._parse_args()
 
     def _parse_args(self):
@@ -365,15 +304,19 @@ class CLICommand(object):
             kwargs[a.replace("-", "_")] = cmd_dict[a]
         if inbuf:
             kwargs['inbuf'] = inbuf
+        assert self.func
         return self.func(mgr, **kwargs)
 
+    def dump_cmd(self):
+        return {
+            'cmd': '{} {}'.format(self.prefix, self.args),
+            'desc': self.desc,
+            'perm': self.perm
+        }
+
     @classmethod
     def dump_cmd_list(cls):
-        return [{
-            'cmd': '{} {}'.format(cmd.prefix, cmd.args),
-            'desc': cmd.desc,
-            'perm': cmd.perm
-        } for _, cmd in cls.COMMANDS.items()]
+        return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
 
 
 def CLIReadCommand(prefix, args="", desc=""):
@@ -465,7 +408,163 @@ class Command(dict):
         )
 
 
-class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
+class CPlusPlusHandler(logging.Handler):
+    def __init__(self, module_inst):
+        super(CPlusPlusHandler, self).__init__()
+        self._module = module_inst
+        self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
+                          .format(module_inst.module_name)))
+
+    def emit(self, record):
+        if record.levelno >= self.level:
+            self._module._ceph_log(self.format(record))
+
+class ClusterLogHandler(logging.Handler):
+    def __init__(self, module_inst):
+        super().__init__()
+        self._module = module_inst
+        self.setFormatter(logging.Formatter("%(message)s"))
+
+    def emit(self, record):
+        levelmap = {
+            'DEBUG': MgrModule.CLUSTER_LOG_PRIO_DEBUG,
+            'INFO': MgrModule.CLUSTER_LOG_PRIO_INFO,
+            'WARNING': MgrModule.CLUSTER_LOG_PRIO_WARN,
+            'ERROR': MgrModule.CLUSTER_LOG_PRIO_ERROR,
+            'CRITICAL': MgrModule.CLUSTER_LOG_PRIO_ERROR,
+        }
+        level = levelmap[record.levelname]
+        if record.levelno >= self.level:
+            self._module.cluster_log(self._module.module_name,
+                                     level,
+                                     self.format(record))
+
+class FileHandler(logging.FileHandler):
+    def __init__(self, module_inst):
+        path = module_inst.get_ceph_option("log_file")
+        idx = path.rfind(".log")
+        if idx != -1:
+            self.path = "{}.{}.log".format(path[:idx], module_inst.module_name)
+        else:
+            self.path = "{}.{}".format(path, module_inst.module_name)
+        super(FileHandler, self).__init__(self.path, delay=True)
+        self.setFormatter(logging.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
+
+
+class MgrModuleLoggingMixin(object):
+    def _configure_logging(self, mgr_level, module_level, cluster_level,
+                           log_to_file, log_to_cluster):
+        self._mgr_level = None
+        self._module_level = None
+        self._root_logger = logging.getLogger()
+
+        self._unconfigure_logging()
+
+        # the ceph log handler is initialized only once
+        self._mgr_log_handler = CPlusPlusHandler(self)
+        self._cluster_log_handler = ClusterLogHandler(self)
+        self._file_log_handler = FileHandler(self)
+
+        self.log_to_file = log_to_file
+        self.log_to_cluster = log_to_cluster
+
+        self._root_logger.addHandler(self._mgr_log_handler)
+        if log_to_file:
+            self._root_logger.addHandler(self._file_log_handler)
+        if log_to_cluster:
+            self._root_logger.addHandler(self._cluster_log_handler)
+
+        self._root_logger.setLevel(logging.NOTSET)
+        self._set_log_level(mgr_level, module_level, cluster_level)
+
+
+    def _unconfigure_logging(self):
+        # remove existing handlers:
+        rm_handlers = [
+            h for h in self._root_logger.handlers if isinstance(h, CPlusPlusHandler) or isinstance(h, FileHandler) or isinstance(h, ClusterLogHandler)]
+        for h in rm_handlers:
+            self._root_logger.removeHandler(h)
+        self.log_to_file = False
+        self.log_to_cluster = False
+
+    def _set_log_level(self, mgr_level, module_level, cluster_level):
+        self._cluster_log_handler.setLevel(cluster_level.upper())
+
+        module_level = module_level.upper() if module_level else ''
+        if not self._module_level:
+            # using debug_mgr level
+            if not module_level and self._mgr_level == mgr_level:
+                # no change in module level neither in debug_mgr
+                return
+        else:
+            if self._module_level == module_level:
+                # no change in module level
+                return
+
+        if not self._module_level and not module_level:
+            level = self._ceph_log_level_to_python(mgr_level)
+            self.getLogger().debug("setting log level based on debug_mgr: %s (%s)", level, mgr_level)
+        elif self._module_level and not module_level:
+            level = self._ceph_log_level_to_python(mgr_level)
+            self.getLogger().warning("unsetting module log level, falling back to "
+                                     "debug_mgr level: %s (%s)", level, mgr_level)
+        elif module_level:
+            level = module_level
+            self.getLogger().debug("setting log level: %s", level)
+
+        self._module_level = module_level
+        self._mgr_level = mgr_level
+
+        self._mgr_log_handler.setLevel(level)
+        self._file_log_handler.setLevel(level)
+
+    def _enable_file_log(self):
+        # enable file log
+        self.getLogger().warning("enabling logging to file")
+        self.log_to_file = True
+        self._root_logger.addHandler(self._file_log_handler)
+
+    def _disable_file_log(self):
+        # disable file log
+        self.getLogger().warning("disabling logging to file")
+        self.log_to_file = False
+        self._root_logger.removeHandler(self._file_log_handler)
+
+    def _enable_cluster_log(self):
+        # enable cluster log
+        self.getLogger().warning("enabling logging to cluster")
+        self.log_to_cluster = True
+        self._root_logger.addHandler(self._cluster_log_handler)
+
+    def _disable_cluster_log(self):
+        # disable cluster log
+        self.getLogger().warning("disabling logging to cluster")
+        self.log_to_cluster = False
+        self._root_logger.removeHandler(self._cluster_log_handler)
+
+    def _ceph_log_level_to_python(self, ceph_log_level):
+        if ceph_log_level:
+            try:
+                ceph_log_level = int(ceph_log_level.split("/", 1)[0])
+            except ValueError:
+                ceph_log_level = 0
+        else:
+            ceph_log_level = 0
+
+        log_level = "DEBUG"
+        if ceph_log_level <= 0:
+            log_level = "CRITICAL"
+        elif ceph_log_level <= 1:
+            log_level = "WARNING"
+        elif ceph_log_level <= 4:
+            log_level = "INFO"
+        return log_level
+
+    def getLogger(self, name=None):
+        return logging.getLogger(name)
+
+
+class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
     """
     Standby modules only implement a serve and shutdown method, they
     are not permitted to implement commands and they do not receive
@@ -475,13 +574,13 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
     from their active peer), and to configuration settings (read only).
     """
 
-    MODULE_OPTIONS = []
-    MODULE_OPTION_DEFAULTS = {}
+    MODULE_OPTIONS = []  # type: List[Dict[str, Any]]
+    MODULE_OPTION_DEFAULTS = {}  # type: Dict[str, Any]
 
     def __init__(self, module_name, capsule):
         super(MgrStandbyModule, self).__init__(capsule)
         self.module_name = module_name
-        self._logger = configure_logger(self, module_name)
+
         # see also MgrModule.__init__()
         for o in self.MODULE_OPTIONS:
             if 'default' in o:
@@ -490,8 +589,35 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
                 else:
                     self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
 
+        mgr_level = self.get_ceph_option("debug_mgr")
+        log_level = self.get_module_option("log_level")
+        cluster_level = self.get_module_option('log_to_cluster_level')
+        self._configure_logging(mgr_level, log_level, cluster_level,
+                                False, False)
+
+        # for backwards compatibility
+        self._logger = self.getLogger()
+
     def __del__(self):
-        unconfigure_logger()
+        self._unconfigure_logging()
+
+    @classmethod
+    def _register_options(cls, module_name):
+        cls.MODULE_OPTIONS.append(
+            Option(name='log_level', type='str', default="", runtime=True,
+                   enum_allowed=['info', 'debug', 'critical', 'error',
+                                 'warning', '']))
+        cls.MODULE_OPTIONS.append(
+            Option(name='log_to_file', type='bool', default=False, runtime=True))
+        if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
+            cls.MODULE_OPTIONS.append(
+                Option(name='log_to_cluster', type='bool', default=False,
+                       runtime=True))
+        cls.MODULE_OPTIONS.append(
+            Option(name='log_to_cluster_level', type='str', default='info',
+                   runtime=True,
+                   enum_allowed=['info', 'debug', 'critical', 'error',
+                                 'warning', '']))
 
     @property
     def log(self):
@@ -544,10 +670,10 @@ class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
             return r
 
 
-class MgrModule(ceph_module.BaseMgrModule):
-    COMMANDS = []
-    MODULE_OPTIONS = []
-    MODULE_OPTION_DEFAULTS = {}
+class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
+    COMMANDS = []  # type: List[Any]
+    MODULE_OPTIONS = []  # type: List[dict]
+    MODULE_OPTION_DEFAULTS = {}  # type: Dict[str, Any]
 
     # Priority definitions for perf counters
     PRIO_CRITICAL = 10
@@ -579,21 +705,8 @@ class MgrModule(ceph_module.BaseMgrModule):
 
     def __init__(self, module_name, py_modules_ptr, this_ptr):
         self.module_name = module_name
-
-        # If we're taking over from a standby module, let's make sure
-        # its logger was unconfigured before we hook ours up
-        unconfigure_logger()
-        self._logger = configure_logger(self, module_name)
-
         super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
 
-        self._version = self._ceph_get_version()
-
-        self._perf_schema_cache = None
-
-        # Keep a librados instance for those that need it.
-        self._rados = None
-
         for o in self.MODULE_OPTIONS:
             if 'default' in o:
                 if 'type' in o:
@@ -606,11 +719,48 @@ class MgrModule(ceph_module.BaseMgrModule):
                     # with default and user-supplied option values.
                     self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
 
+        mgr_level = self.get_ceph_option("debug_mgr")
+        log_level = self.get_module_option("log_level")
+        cluster_level = self.get_module_option('log_to_cluster_level')
+        log_to_file = self.get_module_option("log_to_file")
+        log_to_cluster = self.get_module_option("log_to_cluster")
+        self._configure_logging(mgr_level, log_level, cluster_level,
+                                log_to_file, log_to_cluster)
+
+        # for backwards compatibility
+        self._logger = self.getLogger()
+
+        self._version = self._ceph_get_version()
+
+        self._perf_schema_cache = None
+
+        # Keep a librados instance for those that need it.
+        self._rados = None
+
+
     def __del__(self):
-        unconfigure_logger()
+        self._unconfigure_logging()
+
+    @classmethod
+    def _register_options(cls, module_name):
+        cls.MODULE_OPTIONS.append(
+            Option(name='log_level', type='str', default="", runtime=True,
+                   enum_allowed=['info', 'debug', 'critical', 'error',
+                                 'warning', '']))
+        cls.MODULE_OPTIONS.append(
+            Option(name='log_to_file', type='bool', default=False, runtime=True))
+        if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
+            cls.MODULE_OPTIONS.append(
+                Option(name='log_to_cluster', type='bool', default=False,
+                       runtime=True))
+        cls.MODULE_OPTIONS.append(
+            Option(name='log_to_cluster_level', type='str', default='info',
+                   runtime=True,
+                   enum_allowed=['info', 'debug', 'critical', 'error',
+                                 'warning', '']))
 
     @classmethod
-    def _register_commands(cls):
+    def _register_commands(cls, module_name):
         cls.COMMANDS.extend(CLICommand.dump_cmd_list())
 
     @property
@@ -665,6 +815,30 @@ class MgrModule(ceph_module.BaseMgrModule):
         """
         pass
 
+    def _config_notify(self):
+        # check logging options for changes
+        mgr_level = self.get_ceph_option("debug_mgr")
+        module_level = self.get_module_option("log_level")
+        cluster_level = self.get_module_option("log_to_cluster_level")
+        log_to_file = self.get_module_option("log_to_file", False)
+        log_to_cluster = self.get_module_option("log_to_cluster", False)
+
+        self._set_log_level(mgr_level, module_level, cluster_level)
+
+        if log_to_file != self.log_to_file:
+            if log_to_file:
+                self._enable_file_log()
+            else:
+                self._disable_file_log()
+        if log_to_cluster != self.log_to_cluster:
+            if log_to_cluster:
+                self._enable_cluster_log()
+            else:
+                self._disable_cluster_log()
+
+        # call module subclass implementations
+        self.config_notify()
+
     def config_notify(self):
         """
         Called by the ceph-mgr service to notify the Python plugin
@@ -692,7 +866,9 @@ class MgrModule(ceph_module.BaseMgrModule):
         :return: None
         """
         if self._rados:
+            addrs = self._rados.get_addrs()
             self._rados.shutdown()
+            self._ceph_unregister_client(addrs)
 
     def get(self, data_name):
         """
@@ -701,7 +877,8 @@ class MgrModule(ceph_module.BaseMgrModule):
         :param str data_name: Valid things to fetch are osd_crush_map_text,
                 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
                 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
-                health, mon_status, devices, device <devid>.
+                health, mon_status, devices, device <devid>, pg_stats,
+                pool_stats, pg_ready, osd_ping_times.
 
         Note:
             All these structures have their own JSON representations: experiment
@@ -725,16 +902,17 @@ class MgrModule(ceph_module.BaseMgrModule):
         return ''
 
     def _perfpath_to_path_labels(self, daemon, path):
-        label_names = ("ceph_daemon",)
-        labels = (daemon,)
+        # type: (str, str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]
+        label_names = ("ceph_daemon",)  # type: Tuple[str, ...]
+        labels = (daemon,)  # type: Tuple[str, ...]
 
         if daemon.startswith('rbd-mirror.'):
             match = re.match(
-                r'^rbd_mirror_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
+                r'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
                 path
             )
             if match:
-                path = 'rbd_mirror_' + match.group(4)
+                path = 'rbd_mirror_image_' + match.group(4)
                 pool = match.group(1)
                 namespace = match.group(2) or ''
                 image = match.group(3)
@@ -956,6 +1134,7 @@ class MgrModule(ceph_module.BaseMgrModule):
              'CHECK_FOO': {
                'severity': 'warning',           # or 'error'
                'summary': 'summary string',
+               'count': 4,                      # quantify badness
                'detail': [ 'list', 'of', 'detail', 'strings' ],
               },
              'CHECK_BAR': {
@@ -972,6 +1151,7 @@ class MgrModule(ceph_module.BaseMgrModule):
     def _handle_command(self, inbuf, cmd):
         if cmd['prefix'] not in CLICommand.COMMANDS:
             return self.handle_command(inbuf, cmd)
+
         return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
 
     def handle_command(self, inbuf, cmd):
@@ -1197,7 +1377,7 @@ class MgrModule(ceph_module.BaseMgrModule):
         value.
         """
 
-        result = defaultdict(dict)
+        result = defaultdict(dict)  # type: Dict[str, dict]
 
         for server in self.list_servers():
             for service in server['services']:
@@ -1269,7 +1449,9 @@ class MgrModule(ceph_module.BaseMgrModule):
         return self._ceph_have_mon_connection()
 
     def update_progress_event(self, evid, desc, progress):
-        return self._ceph_update_progress_event(str(evid), str(desc), float(progress))
+        return self._ceph_update_progress_event(str(evid),
+                                                str(desc),
+                                                float(progress))
 
     def complete_progress_event(self, evid):
         return self._ceph_complete_progress_event(str(evid))
@@ -1289,7 +1471,7 @@ class MgrModule(ceph_module.BaseMgrModule):
         ctx_capsule = self.get_context()
         self._rados = rados.Rados(context=ctx_capsule)
         self._rados.connect()
-
+        self._ceph_register_client(self._rados.get_addrs())
         return self._rados
 
     @staticmethod
@@ -1388,69 +1570,3 @@ class MgrModule(ceph_module.BaseMgrModule):
         :param arguments: dict of key/value arguments to test
         """
         return self._ceph_is_authorized(arguments)
-
-
-class PersistentStoreDict(object):
-    def __init__(self, mgr, prefix):
-        # type: (MgrModule, str) -> None
-        self.mgr = mgr
-        self.prefix = prefix + '.'
-
-    def _mk_store_key(self, key):
-        return self.prefix + key
-
-    def __missing__(self, key):
-        # KeyError won't work for the `in` operator.
-        # https://docs.python.org/3/reference/expressions.html#membership-test-details
-        raise IndexError('PersistentStoreDict: "{}" not found'.format(key))
-
-    def clear(self):
-        # Don't make any assumptions about the content of the values.
-        for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)):
-            k, _ = item
-            self.mgr.set_store(k, None)
-
-    def __getitem__(self, item):
-        # type: (str) -> Any
-        key = self._mk_store_key(item)
-        try:
-            val = self.mgr.get_store(key)
-            if val is None:
-                self.__missing__(key)
-            return json.loads(val)
-        except (KeyError, AttributeError, IndexError, ValueError, TypeError):
-            logging.getLogger(__name__).exception('failed to deserialize')
-            self.mgr.set_store(key, None)
-            raise
-
-    def __setitem__(self, item, value):
-        # type: (str, Any) -> None
-        """
-        value=None is not allowed, as it will remove the key.
-        """
-        key = self._mk_store_key(item)
-        self.mgr.set_store(key, json.dumps(value) if value is not None else None)
-
-    def __delitem__(self, item):
-        self[item] = None
-
-    def __len__(self):
-        return len(self.keys())
-
-    def items(self):
-        # type: () -> Iterator[Tuple[str, Any]]
-        prefix_len = len(self.prefix)
-        try:
-            for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)):
-                k, v = item
-                yield k[prefix_len:], json.loads(v)
-        except (KeyError, AttributeError, IndexError, ValueError, TypeError):
-            logging.getLogger(__name__).exception('failed to deserialize')
-            self.clear()
-
-    def keys(self):
-        # type: () -> Set[str]
-        return {item[0] for item in self.items()}
-
-    def __iter__(self):
-        return iter(self.keys())