]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/mgr_module.py
update sources to 12.2.2
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
index 2463bafe75d8a11e73e6d9f012cfbfc277b2cc01..1abbcc5cd4fbe02c4fc635c3586ff6ed393b5972 100644 (file)
@@ -1,10 +1,54 @@
 
-import ceph_state  #noqa
+import ceph_module  # noqa
+#import ceph_osdmap  #noqa
+#import ceph_osdmap_incremental  #noqa
+#import ceph_crushmap  #noqa
+
 import json
 import logging
 import threading
+from collections import defaultdict
+
+
+class CPlusPlusHandler(logging.Handler):
+    def __init__(self, module_inst):
+        super(CPlusPlusHandler, self).__init__()
+        self._module = module_inst
+
+    def emit(self, record):
+        if record.levelno <= logging.DEBUG:
+            ceph_level = 20
+        elif record.levelno <= logging.INFO:
+            ceph_level = 4
+        elif record.levelno <= logging.WARNING:
+            ceph_level = 1
+        else:
+            ceph_level = 0
+
+        self._module._ceph_log(ceph_level, self.format(record))
+
+
+def configure_logger(module_inst, name):
+    logger = logging.getLogger(name)
+
+
+    # Don't filter any logs at the python level, leave it to C++
+    logger.setLevel(logging.DEBUG)
+
+    # FIXME: we should learn the log level from C++ land, and then
+    # avoid calling the C++ level log when we know a message is of
+    # an insufficient level to be ultimately output
+    logger.addHandler(CPlusPlusHandler(module_inst))
+
+    return logger
 
 
+def unconfigure_logger(module_inst, name):
+    logger = logging.getLogger(name)
+    rm_handlers = [h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
+    for h in rm_handlers:
+        logger.removeHandler(h)
+
 class CommandResult(object):
     """
     Use with MgrModule.send_command
@@ -30,36 +74,168 @@ class CommandResult(object):
         return self.r, self.outb, self.outs
 
 
-class MgrModule(object):
+class OSDMap(ceph_module.BasePyOSDMap):
+    def get_epoch(self):
+        return self._get_epoch()
+
+    def get_crush_version(self):
+        return self._get_crush_version()
+
+    def dump(self):
+        return self._dump()
+
+    def new_incremental(self):
+        return self._new_incremental()
+
+    def apply_incremental(self, inc):
+        return self._apply_incremental(inc)
+
+    def get_crush(self):
+        return self._get_crush()
+
+    def get_pools_by_take(self, take):
+        return self._get_pools_by_take(take).get('pools', [])
+
+    def calc_pg_upmaps(self, inc,
+                       max_deviation=.01, max_iterations=10, pools=[]):
+        return self._calc_pg_upmaps(
+            inc,
+            max_deviation, max_iterations, pools)
+
+    def map_pool_pgs_up(self, poolid):
+        return self._map_pool_pgs_up(poolid)
+
+class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
+    def get_epoch(self):
+        return self._get_epoch()
+
+    def dump(self):
+        return self._dump()
+
+    def set_osd_reweights(self, weightmap):
+        """
+        weightmap is a dict, int to float.  e.g. { 0: .9, 1: 1.0, 3: .997 }
+        """
+        return self._set_osd_reweights(weightmap)
+
+    def set_crush_compat_weight_set_weights(self, weightmap):
+        """
+        weightmap is a dict, int to float.  devices only.  e.g.,
+        { 0: 3.4, 1: 3.3, 2: 3.334 }
+        """
+        return self._set_crush_compat_weight_set_weights(weightmap)
+
+class CRUSHMap(ceph_module.BasePyCRUSH):
+    def dump(self):
+        return self._dump()
+
+    def get_item_weight(self, item):
+        return self._get_item_weight(item)
+
+    def get_item_name(self, item):
+        return self._get_item_name(item)
+
+    def find_takes(self):
+        return self._find_takes().get('takes', [])
+
+    def get_take_weight_osd_map(self, root):
+        uglymap = self._get_take_weight_osd_map(root)
+        return { int(k): v for k, v in uglymap.get('weights', {}).iteritems() }
+
+class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
+    """
+    Standby modules only implement a serve and shutdown method, they
+    are not permitted to implement commands and they do not receive
+    any notifications.
+
+    They only have access to the mgrmap (for acecssing service URI info
+    from their active peer), and to configuration settings (read only).
+    """
+
+    def __init__(self, module_name, capsule):
+        super(MgrStandbyModule, self).__init__(capsule)
+        self.module_name = module_name
+        self._logger = configure_logger(self, module_name)
+
+    def __del__(self):
+        unconfigure_logger(self, self.module_name)
+
+    @property
+    def log(self):
+        return self._logger
+
+    def serve(self):
+        """
+        The serve method is mandatory for standby modules.
+        :return:
+        """
+        raise NotImplementedError()
+
+    def get_mgr_id(self):
+        return self._ceph_get_mgr_id()
+
+    def get_config(self, key):
+        return self._ceph_get_config(key)
+
+    def get_active_uri(self):
+        return self._ceph_get_active_uri()
+
+    def get_localized_config(self, key, default=None):
+        r = self.get_config(self.get_mgr_id() + '/' + key)
+        if r is None:
+            r = self.get_config(key)
+
+        if r is None:
+            r = default
+        return r
+
+class MgrModule(ceph_module.BaseMgrModule):
     COMMANDS = []
 
-    def __init__(self, handle):
-        self._handle = handle
-        self._logger = logging.getLogger(handle)
+    # Priority definitions for perf counters
+    PRIO_CRITICAL = 10
+    PRIO_INTERESTING = 8
+    PRIO_USEFUL = 5
+    PRIO_UNINTERESTING = 2
+    PRIO_DEBUGONLY = 0
+
+    # counter value types
+    PERFCOUNTER_TIME = 1
+    PERFCOUNTER_U64 = 2
+
+    # counter types
+    PERFCOUNTER_LONGRUNAVG = 4
+    PERFCOUNTER_COUNTER = 8
+    PERFCOUNTER_HISTOGRAM = 0x10
+    PERFCOUNTER_TYPE_MASK = ~2
 
-        # Don't filter any logs at the python level, leave it to C++
-        self._logger.setLevel(logging.DEBUG)
+    def __init__(self, module_name, py_modules_ptr, this_ptr):
+        self.module_name = module_name
 
-        # FIXME: we should learn the log level from C++ land, and then
-        # avoid calling ceph_state.log when we know a message is of
-        # an insufficient level to be ultimately output
+        # If we're taking over from a standby module, let's make sure
+        # its logger was unconfigured before we hook ours up
+        unconfigure_logger(self, self.module_name)
+        self._logger = configure_logger(self, module_name)
 
-        class CPlusPlusHandler(logging.Handler):
-            def emit(self, record):
-                if record.levelno <= logging.DEBUG:
-                    ceph_level = 20
-                elif record.levelno <= logging.INFO:
-                    ceph_level = 4
-                elif record.levelno <= logging.WARNING:
-                    ceph_level = 1
-                else:
-                    ceph_level = 0
+        super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
 
-                ceph_state.log(handle, ceph_level, self.format(record))
+        self._version = self._ceph_get_version()
 
-        self._logger.addHandler(CPlusPlusHandler())
+        self._perf_schema_cache = None
 
-        self._version = ceph_state.get_version()
+    def __del__(self):
+        unconfigure_logger(self, self.module_name)
+
+    def update_perf_schema(self, daemon_type, daemon_name):
+        """
+        For plugins that use get_all_perf_counters, call this when
+        receiving a notification of type 'perf_schema_update', to
+        prompt MgrModule to update its cache of counter schemas.
+
+        :param daemon_type:
+        :param daemon_name:
+        :return:
+        """
 
     @property
     def log(self):
@@ -69,6 +245,12 @@ class MgrModule(object):
     def version(self):
         return self._version
 
+    def get_context(self):
+        """
+        :return: a Python capsule containing a C++ CephContext pointer
+        """
+        return self._ceph_get_context()
+
     def notify(self, notify_type, notify_id):
         """
         Called by the ceph-mgr service to notify the Python plugin
@@ -100,7 +282,7 @@ class MgrModule(object):
         """
         Called by the plugin to load some cluster state from ceph-mgr
         """
-        return ceph_state.get(self._handle, data_name)
+        return self._ceph_get(data_name)
 
     def get_server(self, hostname):
         """
@@ -109,7 +291,7 @@ class MgrModule(object):
 
         :param hostname: a hostame
         """
-        return ceph_state.get_server(self._handle, hostname)
+        return self._ceph_get_server(hostname)
 
     def get_perf_schema(self, svc_type, svc_name):
         """
@@ -121,7 +303,7 @@ class MgrModule(object):
         :param svc_name:
         :return: list of dicts describing the counters requested
         """
-        return ceph_state.get_perf_schema(self._handle, svc_type, svc_name)
+        return self._ceph_get_perf_schema(svc_type, svc_name)
 
     def get_counter(self, svc_type, svc_name, path):
         """
@@ -133,14 +315,14 @@ class MgrModule(object):
         :param path:
         :return: A list of two-element lists containing time and value
         """
-        return ceph_state.get_counter(self._handle, svc_type, svc_name, path)
+        return self._ceph_get_counter(svc_type, svc_name, path)
 
     def list_servers(self):
         """
         Like ``get_server``, but instead of returning information
         about just one node, return all the nodes in an array.
         """
-        return ceph_state.get_server(self._handle, None)
+        return self._ceph_get_server(None)
 
     def get_metadata(self, svc_type, svc_id):
         """
@@ -150,7 +332,7 @@ class MgrModule(object):
         :param svc_id: string
         :return: dict
         """
-        return ceph_state.get_metadata(self._handle, svc_type, svc_id)
+        return self._ceph_get_metadata(svc_type, svc_id)
 
     def get_daemon_status(self, svc_type, svc_id):
         """
@@ -160,14 +342,14 @@ class MgrModule(object):
         :param svc_id: string
         :return: dict
         """
-        return ceph_state.get_daemon_status(self._handle, svc_type, svc_id)
+        return self._ceph_get_daemon_status(svc_type, svc_id)
 
     def send_command(self, *args, **kwargs):
         """
         Called by the plugin to send a command to the mon
         cluster.
         """
-        ceph_state.send_command(self._handle, *args, **kwargs)
+        self._ceph_send_command(*args, **kwargs)
 
     def set_health_checks(self, checks):
         """
@@ -191,7 +373,7 @@ class MgrModule(object):
 
         :param list: dict of health check dicts
         """
-        ceph_state.set_health_checks(self._handle, checks)
+        self._ceph_set_health_checks(checks)
 
     def handle_command(self, cmd):
         """
@@ -217,16 +399,20 @@ class MgrModule(object):
 
         :return: str
         """
-        return ceph_state.get_mgr_id()
+        return self._ceph_get_mgr_id()
 
-    def get_config(self, key):
+    def get_config(self, key, default=None):
         """
         Retrieve the value of a persistent configuration setting
 
         :param key: str
         :return: str
         """
-        return ceph_state.get_config(self._handle, key)
+        r = self._ceph_get_config(key)
+        if r is None:
+            return default
+        else:
+            return r
 
     def get_config_prefix(self, key_prefix):
         """
@@ -235,7 +421,7 @@ class MgrModule(object):
         :param key_prefix: str
         :return: str
         """
-        return ceph_state.get_config_prefix(self._handle, key_prefix)
+        return self._ceph_get_config_prefix(key_prefix)
 
     def get_localized_config(self, key, default=None):
         """
@@ -259,7 +445,7 @@ class MgrModule(object):
         :param key: str
         :param val: str
         """
-        ceph_state.set_config(self._handle, key, val)
+        self._ceph_set_config(key, val)
 
     def set_localized_config(self, key, val):
         """
@@ -268,7 +454,7 @@ class MgrModule(object):
         :param default: str
         :return: str
         """
-        return self.set_config(self.get_mgr_id() + '/' + key, val)
+        return self._ceph_set_config(self.get_mgr_id() + '/' + key, val)
 
     def set_config_json(self, key, val):
         """
@@ -277,7 +463,7 @@ class MgrModule(object):
         :param key: str
         :param val: json-serializable object
         """
-        self.set_config(key, json.dumps(val))
+        self._ceph_set_config(key, json.dumps(val))
 
     def get_config_json(self, key):
         """
@@ -299,3 +485,77 @@ class MgrModule(object):
         :return: bool
         """
         pass
+
+    def get_osdmap(self):
+        """
+        Get a handle to an OSDMap.  If epoch==0, get a handle for the latest
+        OSDMap.
+        :return: OSDMap
+        """
+        return self._ceph_get_osdmap()
+
+    def get_all_perf_counters(self, prio_limit=PRIO_USEFUL):
+        """
+        Return the perf counters currently known to this ceph-mgr
+        instance, filtered by priority equal to or greater than `prio_limit`.
+
+        The result us a map of string to dict, associating services
+        (like "osd.123") with their counters.  The counter
+        dict for each service maps counter paths to a counter
+        info structure, which is the information from
+        the schema, plus an additional "value" member with the latest
+        value.
+        """
+
+        result = defaultdict(dict)
+
+        # TODO: improve C++->Python interface to return just
+        # the latest if that's all we want.
+        def get_latest(daemon_type, daemon_name, counter):
+            data = self.get_counter(daemon_type, daemon_name, counter)[counter]
+            if data:
+                return data[-1][1]
+            else:
+                return 0
+
+        for server in self.list_servers():
+            for service in server['services']:
+                if service['type'] not in ("mds", "osd", "mon"):
+                    continue
+
+                schema = self.get_perf_schema(service['type'], service['id'])
+                if not schema:
+                    self.log.warn("No perf counter schema for {0}.{1}".format(
+                        service['type'], service['id']
+                    ))
+                    continue
+
+                # Value is returned in a potentially-multi-service format,
+                # get just the service we're asking about
+                svc_full_name = "{0}.{1}".format(service['type'], service['id'])
+                schema = schema[svc_full_name]
+
+                # Populate latest values
+                for counter_path, counter_schema in schema.items():
+                    # self.log.debug("{0}: {1}".format(
+                    #     counter_path, json.dumps(counter_schema)
+                    # ))
+                    if counter_schema['priority'] < prio_limit:
+                        continue
+
+                    counter_info = counter_schema
+                    counter_info['value'] = get_latest(service['type'], service['id'], counter_path)
+                    result[svc_full_name][counter_path] = counter_info
+
+        self.log.debug("returning {0} counter".format(len(result)))
+
+        return result
+
+    def set_uri(self, uri):
+        """
+        If the module exposes a service, then call this to publish the
+        address once it is available.
+
+        :return: a string
+        """
+        return self._ceph_set_uri(uri)