]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/mgr_module.py
update sources to 12.2.8
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
index 9d3362d75e922ec9d72915e6817da91d4fafddc7..27fe50bdce45544617e08c5d99bd0af2586cfc70 100644 (file)
@@ -1,10 +1,55 @@
 
-import ceph_state  #noqa
+import ceph_module  # noqa
+#import ceph_osdmap  #noqa
+#import ceph_osdmap_incremental  #noqa
+#import ceph_crushmap  #noqa
+
 import json
 import logging
+import six
 import threading
+from collections import defaultdict
+
+
+class CPlusPlusHandler(logging.Handler):
+    def __init__(self, module_inst):
+        super(CPlusPlusHandler, self).__init__()
+        self._module = module_inst
+
+    def emit(self, record):
+        if record.levelno <= logging.DEBUG:
+            ceph_level = 20
+        elif record.levelno <= logging.INFO:
+            ceph_level = 4
+        elif record.levelno <= logging.WARNING:
+            ceph_level = 1
+        else:
+            ceph_level = 0
+
+        self._module._ceph_log(ceph_level, self.format(record))
+
+
+def configure_logger(module_inst, name):
+    logger = logging.getLogger(name)
 
 
+    # Don't filter any logs at the python level, leave it to C++
+    logger.setLevel(logging.DEBUG)
+
+    # FIXME: we should learn the log level from C++ land, and then
+    # avoid calling the C++ level log when we know a message is of
+    # an insufficient level to be ultimately output
+    logger.addHandler(CPlusPlusHandler(module_inst))
+
+    return logger
+
+
+def unconfigure_logger(module_inst, name):
+    logger = logging.getLogger(name)
+    rm_handlers = [h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
+    for h in rm_handlers:
+        logger.removeHandler(h)
+
 class CommandResult(object):
     """
     Use with MgrModule.send_command
@@ -30,36 +75,186 @@ class CommandResult(object):
         return self.r, self.outb, self.outs
 
 
-class MgrModule(object):
+class OSDMap(ceph_module.BasePyOSDMap):
+    def get_epoch(self):
+        return self._get_epoch()
+
+    def get_crush_version(self):
+        return self._get_crush_version()
+
+    def dump(self):
+        return self._dump()
+
+    def new_incremental(self):
+        return self._new_incremental()
+
+    def apply_incremental(self, inc):
+        return self._apply_incremental(inc)
+
+    def get_crush(self):
+        return self._get_crush()
+
+    def get_pools_by_take(self, take):
+        return self._get_pools_by_take(take).get('pools', [])
+
+    def calc_pg_upmaps(self, inc,
+                       max_deviation=.01, max_iterations=10, pools=[]):
+        return self._calc_pg_upmaps(
+            inc,
+            max_deviation, max_iterations, pools)
+
+    def map_pool_pgs_up(self, poolid):
+        return self._map_pool_pgs_up(poolid)
+
+class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
+    def get_epoch(self):
+        return self._get_epoch()
+
+    def dump(self):
+        return self._dump()
+
+    def set_osd_reweights(self, weightmap):
+        """
+        weightmap is a dict, int to float.  e.g. { 0: .9, 1: 1.0, 3: .997 }
+        """
+        return self._set_osd_reweights(weightmap)
+
+    def set_crush_compat_weight_set_weights(self, weightmap):
+        """
+        weightmap is a dict, int to float.  devices only.  e.g.,
+        { 0: 3.4, 1: 3.3, 2: 3.334 }
+        """
+        return self._set_crush_compat_weight_set_weights(weightmap)
+
+class CRUSHMap(ceph_module.BasePyCRUSH):
+    ITEM_NONE = 0x7fffffff
+
+    def dump(self):
+        return self._dump()
+
+    def get_item_weight(self, item):
+        return self._get_item_weight(item)
+
+    def get_item_name(self, item):
+        return self._get_item_name(item)
+
+    def find_takes(self):
+        return self._find_takes().get('takes', [])
+
+    def get_take_weight_osd_map(self, root):
+        uglymap = self._get_take_weight_osd_map(root)
+        return { int(k): v for k, v in six.iteritems(uglymap.get('weights', {})) }
+
+class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
+    """
+    Standby modules only implement a serve and shutdown method, they
+    are not permitted to implement commands and they do not receive
+    any notifications.
+
+    They only have access to the mgrmap (for accessing service URI info
+    from their active peer), and to configuration settings (read only).
+    """
+
+    def __init__(self, module_name, capsule):
+        super(MgrStandbyModule, self).__init__(capsule)
+        self.module_name = module_name
+        self._logger = configure_logger(self, module_name)
+
+    def __del__(self):
+        unconfigure_logger(self, self.module_name)
+
+    @property
+    def log(self):
+        return self._logger
+
+    def serve(self):
+        """
+        The serve method is mandatory for standby modules.
+        :return:
+        """
+        raise NotImplementedError()
+
+    def get_mgr_id(self):
+        return self._ceph_get_mgr_id()
+
+    def get_config(self, key, default=None):
+        """
+        Retrieve the value of a persistent configuration setting
+
+        :param str key:
+        :param default: the default value of the config if it is not found
+        :return: str
+        """
+        r = self._ceph_get_config(key)
+        if r is None:
+            return default
+        else:
+            return r
+
+
+    def get_active_uri(self):
+        return self._ceph_get_active_uri()
+
+    def get_localized_config(self, key, default=None):
+        r = self.get_config(self.get_mgr_id() + '/' + key)
+        if r is None:
+            r = self.get_config(key)
+
+        if r is None:
+            r = default
+        return r
+
+class MgrModule(ceph_module.BaseMgrModule):
     COMMANDS = []
 
-    def __init__(self, handle):
-        self._handle = handle
-        self._logger = logging.getLogger(handle)
+    # Priority definitions for perf counters
+    PRIO_CRITICAL = 10
+    PRIO_INTERESTING = 8
+    PRIO_USEFUL = 5
+    PRIO_UNINTERESTING = 2
+    PRIO_DEBUGONLY = 0
+
+    # counter value types
+    PERFCOUNTER_TIME = 1
+    PERFCOUNTER_U64 = 2
+
+    # counter types
+    PERFCOUNTER_LONGRUNAVG = 4
+    PERFCOUNTER_COUNTER = 8
+    PERFCOUNTER_HISTOGRAM = 0x10
+    PERFCOUNTER_TYPE_MASK = ~3
 
-        # Don't filter any logs at the python level, leave it to C++
-        self._logger.setLevel(logging.DEBUG)
+    # units supported
+    BYTES = 0
+    NONE = 1
+    
+    def __init__(self, module_name, py_modules_ptr, this_ptr):
+        self.module_name = module_name
 
-        # FIXME: we should learn the log level from C++ land, and then
-        # avoid calling ceph_state.log when we know a message is of
-        # an insufficient level to be ultimately output
+        # If we're taking over from a standby module, let's make sure
+        # its logger was unconfigured before we hook ours up
+        unconfigure_logger(self, self.module_name)
+        self._logger = configure_logger(self, module_name)
 
-        class CPlusPlusHandler(logging.Handler):
-            def emit(self, record):
-                if record.levelno <= logging.DEBUG:
-                    ceph_level = 20
-                elif record.levelno <= logging.INFO:
-                    ceph_level = 4
-                elif record.levelno <= logging.WARNING:
-                    ceph_level = 1
-                else:
-                    ceph_level = 0
+        super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
 
-                ceph_state.log(handle, ceph_level, self.format(record))
+        self._version = self._ceph_get_version()
 
-        self._logger.addHandler(CPlusPlusHandler())
+        self._perf_schema_cache = None
 
-        self._version = ceph_state.get_version()
+    def __del__(self):
+        unconfigure_logger(self, self.module_name)
+
+    def update_perf_schema(self, daemon_type, daemon_name):
+        """
+        For plugins that use get_all_perf_counters, call this when
+        receiving a notification of type 'perf_schema_update', to
+        prompt MgrModule to update its cache of counter schemas.
+
+        :param daemon_type:
+        :param daemon_name:
+        :return:
+        """
 
     @property
     def log(self):
@@ -69,6 +264,12 @@ class MgrModule(object):
     def version(self):
         return self._version
 
+    def get_context(self):
+        """
+        :return: a Python capsule containing a C++ CephContext pointer
+        """
+        return self._ceph_get_context()
+
     def notify(self, notify_type, notify_id):
         """
         Called by the ceph-mgr service to notify the Python plugin
@@ -100,8 +301,36 @@ class MgrModule(object):
         """
         Called by the plugin to load some cluster state from ceph-mgr
         """
-        return ceph_state.get(self._handle, data_name)
-
+        return self._ceph_get(data_name)
+
+    def _stattype_to_str(self, stattype):
+        
+        typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
+        if typeonly == 0:
+            return 'gauge'
+        if typeonly == self.PERFCOUNTER_LONGRUNAVG:
+            # this lie matches the DaemonState decoding: only val, no counts
+            return 'counter'
+        if typeonly == self.PERFCOUNTER_COUNTER:
+            return 'counter'
+        if typeonly == self.PERFCOUNTER_HISTOGRAM:
+            return 'histogram'
+        
+        return ''
+
+    def _perfvalue_to_value(self, stattype, value):
+        if stattype & self.PERFCOUNTER_TIME:
+            # Convert from ns to seconds
+            return value / 1000000000.0
+        else:
+            return value
+
+    def _unit_to_str(self, unit):
+        if unit == self.NONE:
+            return "/s"
+        elif unit == self.BYTES:
+            return "B/s"  
+    
     def get_server(self, hostname):
         """
         Called by the plugin to load information about a particular
@@ -109,7 +338,19 @@ class MgrModule(object):
 
         :param hostname: a hostame
         """
-        return ceph_state.get_server(self._handle, hostname)
+        return self._ceph_get_server(hostname)
+
+    def get_perf_schema(self, svc_type, svc_name):
+        """
+        Called by the plugin to fetch perf counter schema info.
+        svc_name can be nullptr, as can svc_type, in which case
+        they are wildcards
+
+        :param svc_type:
+        :param svc_name:
+        :return: list of dicts describing the counters requested
+        """
+        return self._ceph_get_perf_schema(svc_type, svc_name)
 
     def get_counter(self, svc_type, svc_name, path):
         """
@@ -121,14 +362,14 @@ class MgrModule(object):
         :param path:
         :return: A list of two-element lists containing time and value
         """
-        return ceph_state.get_counter(self._handle, svc_type, svc_name, path)
+        return self._ceph_get_counter(svc_type, svc_name, path)
 
     def list_servers(self):
         """
         Like ``get_server``, but instead of returning information
         about just one node, return all the nodes in an array.
         """
-        return ceph_state.get_server(self._handle, None)
+        return self._ceph_get_server(None)
 
     def get_metadata(self, svc_type, svc_id):
         """
@@ -138,7 +379,7 @@ class MgrModule(object):
         :param svc_id: string
         :return: dict
         """
-        return ceph_state.get_metadata(self._handle, svc_type, svc_id)
+        return self._ceph_get_metadata(svc_type, svc_id)
 
     def get_daemon_status(self, svc_type, svc_id):
         """
@@ -148,14 +389,38 @@ class MgrModule(object):
         :param svc_id: string
         :return: dict
         """
-        return ceph_state.get_daemon_status(self._handle, svc_type, svc_id)
+        return self._ceph_get_daemon_status(svc_type, svc_id)
 
     def send_command(self, *args, **kwargs):
         """
         Called by the plugin to send a command to the mon
         cluster.
         """
-        ceph_state.send_command(self._handle, *args, **kwargs)
+        self._ceph_send_command(*args, **kwargs)
+
+    def set_health_checks(self, checks):
+        """
+        Set module's health checks
+
+        Set the module's current map of health checks.  Argument is a
+        dict of check names to info, in this form:
+
+           {
+             'CHECK_FOO': {
+               'severity': 'warning',           # or 'error'
+               'summary': 'summary string',
+               'detail': [ 'list', 'of', 'detail', 'strings' ],
+              },
+             'CHECK_BAR': {
+               'severity': 'error',
+               'summary': 'bars are bad',
+               'detail': [ 'too hard' ],
+             },
+           }
+
+        :param list: dict of health check dicts
+        """
+        self._ceph_set_health_checks(checks)
 
     def handle_command(self, cmd):
         """
@@ -181,16 +446,20 @@ class MgrModule(object):
 
         :return: str
         """
-        return ceph_state.get_mgr_id()
+        return self._ceph_get_mgr_id()
 
-    def get_config(self, key):
+    def get_config(self, key, default=None):
         """
         Retrieve the value of a persistent configuration setting
 
         :param key: str
         :return: str
         """
-        return ceph_state.get_config(self._handle, key)
+        r = self._ceph_get_config(key)
+        if r is None:
+            return default
+        else:
+            return r
 
     def get_config_prefix(self, key_prefix):
         """
@@ -199,7 +468,7 @@ class MgrModule(object):
         :param key_prefix: str
         :return: str
         """
-        return ceph_state.get_config_prefix(self._handle, key_prefix)
+        return self._ceph_get_config_prefix(key_prefix)
 
     def get_localized_config(self, key, default=None):
         """
@@ -223,7 +492,7 @@ class MgrModule(object):
         :param key: str
         :param val: str
         """
-        ceph_state.set_config(self._handle, key, val)
+        self._ceph_set_config(key, val)
 
     def set_localized_config(self, key, val):
         """
@@ -232,7 +501,7 @@ class MgrModule(object):
         :param default: str
         :return: str
         """
-        return self.set_config(self.get_mgr_id() + '/' + key, val)
+        return self._ceph_set_config(self.get_mgr_id() + '/' + key, val)
 
     def set_config_json(self, key, val):
         """
@@ -241,7 +510,7 @@ class MgrModule(object):
         :param key: str
         :param val: json-serializable object
         """
-        self.set_config(key, json.dumps(val))
+        self._ceph_set_config(key, json.dumps(val))
 
     def get_config_json(self, key):
         """
@@ -263,3 +532,110 @@ class MgrModule(object):
         :return: bool
         """
         pass
+
+    def get_osdmap(self):
+        """
+        Get a handle to an OSDMap.  If epoch==0, get a handle for the latest
+        OSDMap.
+        :return: OSDMap
+        """
+        return self._ceph_get_osdmap()
+
+    def get_all_perf_counters(self, prio_limit=PRIO_USEFUL):
+        """
+        Return the perf counters currently known to this ceph-mgr
+        instance, filtered by priority equal to or greater than `prio_limit`.
+
+        The result us a map of string to dict, associating services
+        (like "osd.123") with their counters.  The counter
+        dict for each service maps counter paths to a counter
+        info structure, which is the information from
+        the schema, plus an additional "value" member with the latest
+        value.
+        """
+
+        result = defaultdict(dict)
+
+        # TODO: improve C++->Python interface to return just
+        # the latest if that's all we want.
+        def get_latest(daemon_type, daemon_name, counter):
+            data = self.get_counter(daemon_type, daemon_name, counter)[counter]
+            if data:
+                return data[-1][1]
+            else:
+                return 0
+
+        def get_latest_avg(daemon_type, daemon_name, counter):
+            data = self.get_counter(daemon_type, daemon_name, counter)[counter]
+            if data:
+                return (data[-1][1], data[-1][2])
+            else:
+                return (0, 0)
+
+        for server in self.list_servers():
+            for service in server['services']:
+                if service['type'] not in ("rgw", "mds", "osd", "mon"):
+                    continue
+
+                schema = self.get_perf_schema(service['type'], service['id'])
+                if not schema:
+                    self.log.warn("No perf counter schema for {0}.{1}".format(
+                        service['type'], service['id']
+                    ))
+                    continue
+
+                # Value is returned in a potentially-multi-service format,
+                # get just the service we're asking about
+                svc_full_name = "{0}.{1}".format(service['type'], service['id'])
+                schema = schema[svc_full_name]
+
+                # Populate latest values
+                for counter_path, counter_schema in schema.items():
+                    # self.log.debug("{0}: {1}".format(
+                    #     counter_path, json.dumps(counter_schema)
+                    # ))
+                    if counter_schema['priority'] < prio_limit:
+                        continue
+
+                    counter_info = dict(counter_schema)
+
+                    # Also populate count for the long running avgs
+                    if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG:
+                        v, c = get_latest_avg(
+                            service['type'],
+                            service['id'],
+                            counter_path
+                        )
+                        counter_info['value'], counter_info['count'] = v, c
+                        result[svc_full_name][counter_path] = counter_info
+                    else:
+                        counter_info['value'] = get_latest(
+                            service['type'],
+                            service['id'],
+                            counter_path
+                        )
+
+                    result[svc_full_name][counter_path] = counter_info
+
+        self.log.debug("returning {0} counter".format(len(result)))
+
+        return result
+
+    def set_uri(self, uri):
+        """
+        If the module exposes a service, then call this to publish the
+        address once it is available.
+
+        :return: a string
+        """
+        return self._ceph_set_uri(uri)
+
+    def have_mon_connection(self):
+        """
+        Check whether this ceph-mgr daemon has an open connection
+        to a monitor.  If it doesn't, then it's likely that the
+        information we have about the cluster is out of date,
+        and/or the monitor cluster is down.
+        """
+
+        return self._ceph_have_mon_connection()