]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/telegraf/module.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / telegraf / module.py
index ca25fce7c73815677723f3f4422701971580df1d..f640f1d3a0fd82f5ec1768472bb36e0155bde7ce 100644 (file)
@@ -7,66 +7,40 @@ from threading import Event
 
 from telegraf.basesocket import BaseSocket
 from telegraf.protocol import Line
-from mgr_module import MgrModule, PG_STATES
+from mgr_module import CLICommand, CLIReadCommand, MgrModule, Option, OptionValue, PG_STATES
 
-try:
-    from urllib.parse import urlparse
-except ImportError:
-    from urlparse import urlparse
+from typing import cast, Any, Dict, Iterable, Optional, Tuple
+from urllib.parse import urlparse
 
 
 class Module(MgrModule):
-    COMMANDS = [
-        {
-            "cmd": "telegraf config-set name=key,type=CephString "
-                   "name=value,type=CephString",
-            "desc": "Set a configuration value",
-            "perm": "rw"
-        },
-        {
-            "cmd": "telegraf config-show",
-            "desc": "Show current configuration",
-            "perm": "r"
-        },
-        {
-            "cmd": "telegraf send",
-            "desc": "Force sending data to Telegraf",
-            "perm": "rw"
-        },
-    ]
-
     MODULE_OPTIONS = [
-        {
-            'name': 'address',
-            'default': 'unixgram:///tmp/telegraf.sock',
-        },
-        {
-            'name': 'interval',
-            'type': 'secs',
-            'default': 15
-        }
-    ]
+        Option(name='address',
+               default='unixgram:///tmp/telegraf.sock'),
+        Option(name='interval',
+               type='secs',
+               default=15)]
 
     ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2}
 
     @property
-    def config_keys(self):
+    def config_keys(self) -> Dict[str, OptionValue]:
         return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
         super(Module, self).__init__(*args, **kwargs)
         self.event = Event()
         self.run = True
-        self.fsid = None
-        self.config = dict()
+        self.fsid: Optional[str] = None
+        self.config: Dict[str, OptionValue] = dict()
 
-    def get_fsid(self):
+    def get_fsid(self) -> str:
         if not self.fsid:
             self.fsid = self.get('mon_map')['fsid']
-
+        assert self.fsid is not None
         return self.fsid
 
-    def get_pool_stats(self):
+    def get_pool_stats(self) -> Iterable[Dict[str, Any]]:
         df = self.get('df')
 
         df_types = [
@@ -97,7 +71,7 @@ class Module(MgrModule):
                     'value': pool['stats'][df_type],
                 }
 
-    def get_daemon_stats(self):
+    def get_daemon_stats(self) -> Iterable[Dict[str, Any]]:
         for daemon, counters in self.get_all_perf_counters().items():
             svc_type, svc_id = daemon.split('.', 1)
             metadata = self.get_metadata(svc_type, svc_id)
@@ -119,7 +93,7 @@ class Module(MgrModule):
                     'value': counter_info['value']
                 }
 
-    def get_pg_stats(self):
+    def get_pg_stats(self) -> Dict[str, int]:
         stats = dict()
 
         pg_status = self.get('pg_status')
@@ -140,7 +114,7 @@ class Module(MgrModule):
 
         return stats
 
-    def get_cluster_stats(self):
+    def get_cluster_stats(self) -> Iterable[Dict[str, Any]]:
         stats = dict()
 
         health = json.loads(self.get('health')['json'])
@@ -185,11 +159,12 @@ class Module(MgrModule):
             num_mds_up += len(fs['mdsmap']['up'])
 
         stats['num_mds_up'] = num_mds_up
-        stats['num_mds'] = num_mds_up + stats['num_mds_standby']
+        stats['num_mds'] = num_mds_up + cast(int, stats['num_mds_standby'])
 
         stats.update(self.get_pg_stats())
 
         for key, value in stats.items():
+            assert value is not None
             yield {
                 'measurement': 'ceph_cluster_stats',
                 'tags': {
@@ -199,42 +174,43 @@ class Module(MgrModule):
                 'value': int(value)
             }
 
-    def set_config_option(self, option, value):
+    def set_config_option(self, option: str, value: str) -> None:
         if option not in self.config_keys.keys():
             raise RuntimeError('{0} is a unknown configuration '
                                'option'.format(option))
 
-        if option in ['interval']:
+        if option == 'interval':
             try:
-                value = int(value)
+                interval = int(value)
             except (ValueError, TypeError):
                 raise RuntimeError('invalid {0} configured. Please specify '
                                    'a valid integer'.format(option))
+            if interval < 5:
+                raise RuntimeError('interval should be set to at least 5 seconds')
+            self.config[option] = interval
+        else:
+            self.config[option] = value
 
-        if option == 'interval' and value < 5:
-            raise RuntimeError('interval should be set to at least 5 seconds')
-
-        self.config[option] = value
-
-    def init_module_config(self):
+    def init_module_config(self) -> None:
         self.config['address'] = \
             self.get_module_option("address", default=self.config_keys['address'])
-        self.config['interval'] = \
-            int(self.get_module_option("interval",
-                                default=self.config_keys['interval']))
+        interval = self.get_module_option("interval",
+                                          default=self.config_keys['interval'])
+        assert interval
+        self.config['interval'] = int(interval)
 
-    def now(self):
+    def now(self) -> int:
         return int(round(time.time() * 1000000000))
 
-    def gather_measurements(self):
+    def gather_measurements(self) -> Iterable[Dict[str, Any]]:
         return itertools.chain(
             self.get_pool_stats(),
             self.get_daemon_stats(),
             self.get_cluster_stats()
         )
 
-    def send_to_telegraf(self):
-        url = urlparse(self.config['address'])
+    def send_to_telegraf(self) -> None:
+        url = urlparse(cast(str, self.config['address']))
 
         sock = BaseSocket(url)
         self.log.debug('Sending data to Telegraf at %s', sock.address)
@@ -253,37 +229,44 @@ class Module(MgrModule):
         except FileNotFoundError:
             self.log.exception('Failed to open Telegraf at: %s', url.geturl())
 
-    def shutdown(self):
+    def shutdown(self) -> None:
         self.log.info('Stopping Telegraf module')
         self.run = False
         self.event.set()
 
-    def handle_command(self, inbuf, cmd):
-        if cmd['prefix'] == 'telegraf config-show':
-            return 0, json.dumps(self.config), ''
-        elif cmd['prefix'] == 'telegraf config-set':
-            key = cmd['key']
-            value = cmd['value']
-            if not value:
-                return -errno.EINVAL, '', 'Value should not be empty or None'
-
-            self.log.debug('Setting configuration option %s to %s', key, value)
-            self.set_config_option(key, value)
-            self.set_module_option(key, value)
-            return 0, 'Configuration option {0} updated'.format(key), ''
-        elif cmd['prefix'] == 'telegraf send':
-            self.send_to_telegraf()
-            return 0, 'Sending data to Telegraf', ''
-
-        return (-errno.EINVAL, '',
-                "Command not found '{0}'".format(cmd['prefix']))
-
-    def self_test(self):
+    @CLIReadCommand('telegraf config-show')
+    def config_show(self) -> Tuple[int, str, str]:
+        """
+        Show current configuration
+        """
+        return 0, json.dumps(self.config), ''
+
+    @CLICommand('telegraf config-set')
+    def config_set(self, key: str, value: str) -> Tuple[int, str, str]:
+        """
+        Set a configuration value
+        """
+        if not value:
+            return -errno.EINVAL, '', 'Value should not be empty or None'
+        self.log.debug('Setting configuration option %s to %s', key, value)
+        self.set_config_option(key, value)
+        self.set_module_option(key, value)
+        return 0, 'Configuration option {0} updated'.format(key), ''
+
+    @CLICommand('telegraf send')
+    def send(self) -> Tuple[int, str, str]:
+        """
+        Force sending data to Telegraf
+        """
+        self.send_to_telegraf()
+        return 0, 'Sending data to Telegraf', ''
+
+    def self_test(self) -> None:
         measurements = list(self.gather_measurements())
         if len(measurements) == 0:
             raise RuntimeError('No measurements found')
 
-    def serve(self):
+    def serve(self) -> None:
         self.log.info('Starting Telegraf module')
         self.init_module_config()
         self.run = True
@@ -297,4 +280,4 @@ class Module(MgrModule):
             runtime = (self.now() - start) / 1000000
             self.log.debug('Sending data to Telegraf took %d ms', runtime)
             self.log.debug("Sleeping for %d seconds", self.config['interval'])
-            self.event.wait(self.config['interval'])
+            self.event.wait(cast(int, self.config['interval']))