import ceph_module # noqa
try:
- from typing import Set, Tuple, Iterator, Any
+ from typing import Set, Tuple, Iterator, Any, Dict, Optional, Callable, List
except ImportError:
# just for type checking
pass
import logging
+import errno
import json
import six
import threading
"unknown"]
-class CPlusPlusHandler(logging.Handler):
- def __init__(self, module_inst):
- super(CPlusPlusHandler, self).__init__()
- self._module = module_inst
-
- def emit(self, record):
- if record.levelno <= logging.DEBUG:
- ceph_level = 20
- elif record.levelno <= logging.INFO:
- ceph_level = 4
- elif record.levelno <= logging.WARNING:
- ceph_level = 1
- else:
- ceph_level = 0
-
- self._module._ceph_log(ceph_level, self.format(record))
-
-
-def configure_logger(module_inst, module_name):
- """
- Create and configure the logger with the specified module.
-
- A handler will be added to the root logger which will redirect
- the messages from all loggers (incl. 3rd party libraries) to the
- Ceph log.
-
- :param module_inst: The module instance.
- :type module_inst: instance
- :param module_name: The module name.
- :type module_name: str
- :return: Return the logger with the specified name.
- """
- logger = logging.getLogger(module_name)
- # Don't filter any logs at the python level, leave it to C++.
- # FIXME: We should learn the log level from C++ land, and then
- # avoid calling the C++ level log when we know a message
- # is of an insufficient level to be ultimately output.
- logger.setLevel(logging.DEBUG) # Don't use NOTSET
-
- root_logger = logging.getLogger()
- # Add handler to the root logger, thus this module and all
- # 3rd party libraries will log their messages to the Ceph log.
- root_logger.addHandler(CPlusPlusHandler(module_inst))
- # Set the log level to ``ERROR`` to ensure that we only get
- # those message from 3rd party libraries (only effective if
- # they use the default log level ``NOTSET``).
- # Check https://docs.python.org/3/library/logging.html#logging.Logger.setLevel
- # for more information about how the effective log level is
- # determined.
- root_logger.setLevel(logging.ERROR)
-
- return logger
-
-
-def unconfigure_logger(module_name=None):
- """
- :param module_name: The module name. Defaults to ``None``.
- :type module_name: str or None
- """
- logger = logging.getLogger(module_name)
- rm_handlers = [
- h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
- for h in rm_handlers:
- logger.removeHandler(h)
-
-
class CommandResult(object):
"""
Use with MgrModule.send_command
d = self._dump()
return d['erasure_code_profiles'].get(name, None)
+ def get_require_osd_release(self):
+ d = self._dump()
+ return d['require_osd_release']
+
class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
def get_epoch(self):
try:
first_take = [s for s in rule['steps'] if s['op'] == 'take'][0]
except IndexError:
- self.log.warn("CRUSH rule '{0}' has no 'take' step".format(
+ logging.warning("CRUSH rule '{0}' has no 'take' step".format(
rule_name))
return None
else:
return osd_list
def device_class_counts(self):
- result = defaultdict(int)
+ result = defaultdict(int) # type: Dict[str, int]
# TODO don't abuse dump like this
d = self.dump()
for device in d['devices']:
class CLICommand(object):
- COMMANDS = {}
+ COMMANDS = {} # type: Dict[str, CLICommand]
def __init__(self, prefix, args="", desc="", perm="rw"):
self.prefix = prefix
self.args_dict = {}
self.desc = desc
self.perm = perm
- self.func = None
+ self.func = None # type: Optional[Callable]
self._parse_args()
def _parse_args(self):
kwargs[a.replace("-", "_")] = cmd_dict[a]
if inbuf:
kwargs['inbuf'] = inbuf
+ assert self.func
return self.func(mgr, **kwargs)
+ def dump_cmd(self):
+ return {
+ 'cmd': '{} {}'.format(self.prefix, self.args),
+ 'desc': self.desc,
+ 'perm': self.perm
+ }
+
@classmethod
def dump_cmd_list(cls):
- return [{
- 'cmd': '{} {}'.format(cmd.prefix, cmd.args),
- 'desc': cmd.desc,
- 'perm': cmd.perm
- } for _, cmd in cls.COMMANDS.items()]
+ return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
def CLIReadCommand(prefix, args="", desc=""):
)
-class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
+class CPlusPlusHandler(logging.Handler):
+ def __init__(self, module_inst):
+ super(CPlusPlusHandler, self).__init__()
+ self._module = module_inst
+ self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
+ .format(module_inst.module_name)))
+
+ def emit(self, record):
+ if record.levelno >= self.level:
+ self._module._ceph_log(self.format(record))
+
+class ClusterLogHandler(logging.Handler):
+ def __init__(self, module_inst):
+ super().__init__()
+ self._module = module_inst
+ self.setFormatter(logging.Formatter("%(message)s"))
+
+ def emit(self, record):
+ levelmap = {
+ 'DEBUG': MgrModule.CLUSTER_LOG_PRIO_DEBUG,
+ 'INFO': MgrModule.CLUSTER_LOG_PRIO_INFO,
+ 'WARNING': MgrModule.CLUSTER_LOG_PRIO_WARN,
+ 'ERROR': MgrModule.CLUSTER_LOG_PRIO_ERROR,
+ 'CRITICAL': MgrModule.CLUSTER_LOG_PRIO_ERROR,
+ }
+ level = levelmap[record.levelname]
+ if record.levelno >= self.level:
+ self._module.cluster_log(self._module.module_name,
+ level,
+ self.format(record))
+
+class FileHandler(logging.FileHandler):
+ def __init__(self, module_inst):
+ path = module_inst.get_ceph_option("log_file")
+ idx = path.rfind(".log")
+ if idx != -1:
+ self.path = "{}.{}.log".format(path[:idx], module_inst.module_name)
+ else:
+ self.path = "{}.{}".format(path, module_inst.module_name)
+ super(FileHandler, self).__init__(self.path, delay=True)
+ self.setFormatter(logging.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
+
+
+class MgrModuleLoggingMixin(object):
+ def _configure_logging(self, mgr_level, module_level, cluster_level,
+ log_to_file, log_to_cluster):
+ self._mgr_level = None
+ self._module_level = None
+ self._root_logger = logging.getLogger()
+
+ self._unconfigure_logging()
+
+ # the ceph log handler is initialized only once
+ self._mgr_log_handler = CPlusPlusHandler(self)
+ self._cluster_log_handler = ClusterLogHandler(self)
+ self._file_log_handler = FileHandler(self)
+
+ self.log_to_file = log_to_file
+ self.log_to_cluster = log_to_cluster
+
+ self._root_logger.addHandler(self._mgr_log_handler)
+ if log_to_file:
+ self._root_logger.addHandler(self._file_log_handler)
+ if log_to_cluster:
+ self._root_logger.addHandler(self._cluster_log_handler)
+
+ self._root_logger.setLevel(logging.NOTSET)
+ self._set_log_level(mgr_level, module_level, cluster_level)
+
+
+ def _unconfigure_logging(self):
+ # remove existing handlers:
+ rm_handlers = [
+ h for h in self._root_logger.handlers if isinstance(h, CPlusPlusHandler) or isinstance(h, FileHandler) or isinstance(h, ClusterLogHandler)]
+ for h in rm_handlers:
+ self._root_logger.removeHandler(h)
+ self.log_to_file = False
+ self.log_to_cluster = False
+
+ def _set_log_level(self, mgr_level, module_level, cluster_level):
+ self._cluster_log_handler.setLevel(cluster_level.upper())
+
+ module_level = module_level.upper() if module_level else ''
+ if not self._module_level:
+ # using debug_mgr level
+ if not module_level and self._mgr_level == mgr_level:
+ # no change in module level neither in debug_mgr
+ return
+ else:
+ if self._module_level == module_level:
+ # no change in module level
+ return
+
+ if not self._module_level and not module_level:
+ level = self._ceph_log_level_to_python(mgr_level)
+ self.getLogger().debug("setting log level based on debug_mgr: %s (%s)", level, mgr_level)
+ elif self._module_level and not module_level:
+ level = self._ceph_log_level_to_python(mgr_level)
+ self.getLogger().warning("unsetting module log level, falling back to "
+ "debug_mgr level: %s (%s)", level, mgr_level)
+ elif module_level:
+ level = module_level
+ self.getLogger().debug("setting log level: %s", level)
+
+ self._module_level = module_level
+ self._mgr_level = mgr_level
+
+ self._mgr_log_handler.setLevel(level)
+ self._file_log_handler.setLevel(level)
+
+ def _enable_file_log(self):
+ # enable file log
+ self.getLogger().warning("enabling logging to file")
+ self.log_to_file = True
+ self._root_logger.addHandler(self._file_log_handler)
+
+ def _disable_file_log(self):
+ # disable file log
+ self.getLogger().warning("disabling logging to file")
+ self.log_to_file = False
+ self._root_logger.removeHandler(self._file_log_handler)
+
+ def _enable_cluster_log(self):
+ # enable cluster log
+ self.getLogger().warning("enabling logging to cluster")
+ self.log_to_cluster = True
+ self._root_logger.addHandler(self._cluster_log_handler)
+
+ def _disable_cluster_log(self):
+ # disable cluster log
+ self.getLogger().warning("disabling logging to cluster")
+ self.log_to_cluster = False
+ self._root_logger.removeHandler(self._cluster_log_handler)
+
+ def _ceph_log_level_to_python(self, ceph_log_level):
+ if ceph_log_level:
+ try:
+ ceph_log_level = int(ceph_log_level.split("/", 1)[0])
+ except ValueError:
+ ceph_log_level = 0
+ else:
+ ceph_log_level = 0
+
+ log_level = "DEBUG"
+ if ceph_log_level <= 0:
+ log_level = "CRITICAL"
+ elif ceph_log_level <= 1:
+ log_level = "WARNING"
+ elif ceph_log_level <= 4:
+ log_level = "INFO"
+ return log_level
+
+ def getLogger(self, name=None):
+ return logging.getLogger(name)
+
+
+class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
"""
Standby modules only implement a serve and shutdown method, they
are not permitted to implement commands and they do not receive
from their active peer), and to configuration settings (read only).
"""
- MODULE_OPTIONS = []
- MODULE_OPTION_DEFAULTS = {}
+ MODULE_OPTIONS = [] # type: List[Dict[str, Any]]
+ MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
def __init__(self, module_name, capsule):
super(MgrStandbyModule, self).__init__(capsule)
self.module_name = module_name
- self._logger = configure_logger(self, module_name)
+
# see also MgrModule.__init__()
for o in self.MODULE_OPTIONS:
if 'default' in o:
else:
self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
+ mgr_level = self.get_ceph_option("debug_mgr")
+ log_level = self.get_module_option("log_level")
+ cluster_level = self.get_module_option('log_to_cluster_level')
+ self._configure_logging(mgr_level, log_level, cluster_level,
+ False, False)
+
+ # for backwards compatibility
+ self._logger = self.getLogger()
+
def __del__(self):
- unconfigure_logger()
+ self._unconfigure_logging()
+
+ @classmethod
+ def _register_options(cls, module_name):
+ cls.MODULE_OPTIONS.append(
+ Option(name='log_level', type='str', default="", runtime=True,
+ enum_allowed=['info', 'debug', 'critical', 'error',
+ 'warning', '']))
+ cls.MODULE_OPTIONS.append(
+ Option(name='log_to_file', type='bool', default=False, runtime=True))
+ if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
+ cls.MODULE_OPTIONS.append(
+ Option(name='log_to_cluster', type='bool', default=False,
+ runtime=True))
+ cls.MODULE_OPTIONS.append(
+ Option(name='log_to_cluster_level', type='str', default='info',
+ runtime=True,
+ enum_allowed=['info', 'debug', 'critical', 'error',
+ 'warning', '']))
@property
def log(self):
return r
-class MgrModule(ceph_module.BaseMgrModule):
- COMMANDS = []
- MODULE_OPTIONS = []
- MODULE_OPTION_DEFAULTS = {}
+class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
+ COMMANDS = [] # type: List[Any]
+ MODULE_OPTIONS = [] # type: List[dict]
+ MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
# Priority definitions for perf counters
PRIO_CRITICAL = 10
def __init__(self, module_name, py_modules_ptr, this_ptr):
self.module_name = module_name
-
- # If we're taking over from a standby module, let's make sure
- # its logger was unconfigured before we hook ours up
- unconfigure_logger()
- self._logger = configure_logger(self, module_name)
-
super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
- self._version = self._ceph_get_version()
-
- self._perf_schema_cache = None
-
- # Keep a librados instance for those that need it.
- self._rados = None
-
for o in self.MODULE_OPTIONS:
if 'default' in o:
if 'type' in o:
# with default and user-supplied option values.
self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
+ mgr_level = self.get_ceph_option("debug_mgr")
+ log_level = self.get_module_option("log_level")
+ cluster_level = self.get_module_option('log_to_cluster_level')
+ log_to_file = self.get_module_option("log_to_file")
+ log_to_cluster = self.get_module_option("log_to_cluster")
+ self._configure_logging(mgr_level, log_level, cluster_level,
+ log_to_file, log_to_cluster)
+
+ # for backwards compatibility
+ self._logger = self.getLogger()
+
+ self._version = self._ceph_get_version()
+
+ self._perf_schema_cache = None
+
+ # Keep a librados instance for those that need it.
+ self._rados = None
+
+
def __del__(self):
- unconfigure_logger()
+ self._unconfigure_logging()
+
+ @classmethod
+ def _register_options(cls, module_name):
+ cls.MODULE_OPTIONS.append(
+ Option(name='log_level', type='str', default="", runtime=True,
+ enum_allowed=['info', 'debug', 'critical', 'error',
+ 'warning', '']))
+ cls.MODULE_OPTIONS.append(
+ Option(name='log_to_file', type='bool', default=False, runtime=True))
+ if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
+ cls.MODULE_OPTIONS.append(
+ Option(name='log_to_cluster', type='bool', default=False,
+ runtime=True))
+ cls.MODULE_OPTIONS.append(
+ Option(name='log_to_cluster_level', type='str', default='info',
+ runtime=True,
+ enum_allowed=['info', 'debug', 'critical', 'error',
+ 'warning', '']))
@classmethod
- def _register_commands(cls):
+ def _register_commands(cls, module_name):
cls.COMMANDS.extend(CLICommand.dump_cmd_list())
@property
"""
pass
+ def _config_notify(self):
+ # check logging options for changes
+ mgr_level = self.get_ceph_option("debug_mgr")
+ module_level = self.get_module_option("log_level")
+ cluster_level = self.get_module_option("log_to_cluster_level")
+ log_to_file = self.get_module_option("log_to_file", False)
+ log_to_cluster = self.get_module_option("log_to_cluster", False)
+
+ self._set_log_level(mgr_level, module_level, cluster_level)
+
+ if log_to_file != self.log_to_file:
+ if log_to_file:
+ self._enable_file_log()
+ else:
+ self._disable_file_log()
+ if log_to_cluster != self.log_to_cluster:
+ if log_to_cluster:
+ self._enable_cluster_log()
+ else:
+ self._disable_cluster_log()
+
+ # call module subclass implementations
+ self.config_notify()
+
def config_notify(self):
"""
Called by the ceph-mgr service to notify the Python plugin
:return: None
"""
if self._rados:
+ addrs = self._rados.get_addrs()
self._rados.shutdown()
+ self._ceph_unregister_client(addrs)
def get(self, data_name):
"""
:param str data_name: Valid things to fetch are osd_crush_map_text,
osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
- health, mon_status, devices, device <devid>.
+ health, mon_status, devices, device <devid>, pg_stats,
+ pool_stats, pg_ready, osd_ping_times.
Note:
All these structures have their own JSON representations: experiment
return ''
def _perfpath_to_path_labels(self, daemon, path):
- label_names = ("ceph_daemon",)
- labels = (daemon,)
+ # type: (str, str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]
+ label_names = ("ceph_daemon",) # type: Tuple[str, ...]
+ labels = (daemon,) # type: Tuple[str, ...]
if daemon.startswith('rbd-mirror.'):
match = re.match(
- r'^rbd_mirror_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
+ r'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
path
)
if match:
- path = 'rbd_mirror_' + match.group(4)
+ path = 'rbd_mirror_image_' + match.group(4)
pool = match.group(1)
namespace = match.group(2) or ''
image = match.group(3)
'CHECK_FOO': {
'severity': 'warning', # or 'error'
'summary': 'summary string',
+ 'count': 4, # quantify badness
'detail': [ 'list', 'of', 'detail', 'strings' ],
},
'CHECK_BAR': {
def _handle_command(self, inbuf, cmd):
if cmd['prefix'] not in CLICommand.COMMANDS:
return self.handle_command(inbuf, cmd)
+
return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
def handle_command(self, inbuf, cmd):
value.
"""
- result = defaultdict(dict)
+ result = defaultdict(dict) # type: Dict[str, dict]
for server in self.list_servers():
for service in server['services']:
return self._ceph_have_mon_connection()
def update_progress_event(self, evid, desc, progress):
- return self._ceph_update_progress_event(str(evid), str(desc), float(progress))
+ return self._ceph_update_progress_event(str(evid),
+ str(desc),
+ float(progress))
def complete_progress_event(self, evid):
return self._ceph_complete_progress_event(str(evid))
ctx_capsule = self.get_context()
self._rados = rados.Rados(context=ctx_capsule)
self._rados.connect()
-
+ self._ceph_register_client(self._rados.get_addrs())
return self._rados
@staticmethod
:param arguments: dict of key/value arguments to test
"""
return self._ceph_is_authorized(arguments)
-
-
-class PersistentStoreDict(object):
- def __init__(self, mgr, prefix):
- # type: (MgrModule, str) -> None
- self.mgr = mgr
- self.prefix = prefix + '.'
-
- def _mk_store_key(self, key):
- return self.prefix + key
-
- def __missing__(self, key):
- # KeyError won't work for the `in` operator.
- # https://docs.python.org/3/reference/expressions.html#membership-test-details
- raise IndexError('PersistentStoreDict: "{}" not found'.format(key))
-
- def clear(self):
- # Don't make any assumptions about the content of the values.
- for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)):
- k, _ = item
- self.mgr.set_store(k, None)
-
- def __getitem__(self, item):
- # type: (str) -> Any
- key = self._mk_store_key(item)
- try:
- val = self.mgr.get_store(key)
- if val is None:
- self.__missing__(key)
- return json.loads(val)
- except (KeyError, AttributeError, IndexError, ValueError, TypeError):
- logging.getLogger(__name__).exception('failed to deserialize')
- self.mgr.set_store(key, None)
- raise
-
- def __setitem__(self, item, value):
- # type: (str, Any) -> None
- """
- value=None is not allowed, as it will remove the key.
- """
- key = self._mk_store_key(item)
- self.mgr.set_store(key, json.dumps(value) if value is not None else None)
-
- def __delitem__(self, item):
- self[item] = None
-
- def __len__(self):
- return len(self.keys())
-
- def items(self):
- # type: () -> Iterator[Tuple[str, Any]]
- prefix_len = len(self.prefix)
- try:
- for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)):
- k, v = item
- yield k[prefix_len:], json.loads(v)
- except (KeyError, AttributeError, IndexError, ValueError, TypeError):
- logging.getLogger(__name__).exception('failed to deserialize')
- self.clear()
-
- def keys(self):
- # type: () -> Set[str]
- return {item[0] for item in self.items()}
-
- def __iter__(self):
- return iter(self.keys())