]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/influx/module.py
update sources to 12.2.7
[ceph.git] / ceph / src / pybind / mgr / influx / module.py
index adeb452701d81394df5fc2a5e540de4a98f7d0d5..b4039ac70806576dc1ea2b4997272a9ffa95eade 100644 (file)
@@ -1,53 +1,90 @@
-
 from datetime import datetime
 from threading import Event
 import json
 import errno
+import time
 
 from mgr_module import MgrModule
 
 try:
     from influxdb import InfluxDBClient
     from influxdb.exceptions import InfluxDBClientError
+    from requests.exceptions import ConnectionError
 except ImportError:
     InfluxDBClient = None
 
+
 class Module(MgrModule):
     COMMANDS = [
+        {
+            "cmd": "influx config-set name=key,type=CephString "
+                   "name=value,type=CephString",
+            "desc": "Set a configuration value",
+            "perm": "rw"
+        },
+        {
+            "cmd": "influx config-show",
+            "desc": "Show current configuration",
+            "perm": "r"
+        },
+        {
+            "cmd": "influx send",
+            "desc": "Force sending data to Influx",
+            "perm": "rw"
+        },
         {
             "cmd": "influx self-test",
             "desc": "debug the module",
-            "perm": "rw"  
+            "perm": "rw"
         },
     ]
 
+    config_keys = {
+        'hostname': None,
+        'port': 8086,
+        'database': 'ceph',
+        'username': None,
+        'password': None,
+        'interval': 5,
+        'ssl': 'false',
+        'verify_ssl': 'true'
+    }
 
     def __init__(self, *args, **kwargs):
         super(Module, self).__init__(*args, **kwargs)
         self.event = Event()
-        self.run = True 
+        self.run = True
+        self.config = dict()
 
+    def get_fsid(self):
+        return self.get('mon_map')['fsid']
 
     def get_latest(self, daemon_type, daemon_name, stat):
         data = self.get_counter(daemon_type, daemon_name, stat)[stat]
         if data:
             return data[-1][1]
-        else:
-            return 0
 
+        return 0
 
     def get_df_stats(self):
         df = self.get("df")
         data = []
 
+        now = datetime.utcnow().isoformat() + 'Z'
+
         df_types = [
             'bytes_used',
+            'kb_used',
             'dirty',
+            'rd',
             'rd_bytes',
             'raw_bytes_used',
+            'wr',
             'wr_bytes',
             'objects',
-            'max_avail'
+            'max_avail',
+            'quota_objects',
+            'quota_bytes'
         ]
 
         for df_type in df_types:
@@ -55,15 +92,15 @@ class Module(MgrModule):
                 point = {
                     "measurement": "ceph_pool_stats",
                     "tags": {
-                        "pool_name" : pool['name'],
-                        "pool_id" : pool['id'],
-                        "type_instance" : df_type,
-                        "mgr_id" : self.get_mgr_id(),
+                        "pool_name": pool['name'],
+                        "pool_id": pool['id'],
+                        "type_instance": df_type,
+                        "fsid": self.get_fsid()
                     },
-                        "time" : datetime.utcnow().isoformat() + 'Z',
-                        "fields": {
-                            "value" : pool['stats'][df_type],
-                        }
+                    "time": now,
+                    "fields": {
+                        "value": pool['stats'][df_type],
+                    }
                 }
                 data.append(point)
         return data
@@ -71,8 +108,10 @@ class Module(MgrModule):
     def get_daemon_stats(self):
         data = []
 
+        now = datetime.utcnow().isoformat() + 'Z'
+
         for daemon, counters in self.get_all_perf_counters().iteritems():
-            svc_type, svc_id = daemon.split(".")
+            svc_type, svc_id = daemon.split(".", 1)
             metadata = self.get_metadata(svc_type, svc_id)
 
             for path, counter_info in counters.items():
@@ -86,9 +125,10 @@ class Module(MgrModule):
                     "tags": {
                         "ceph_daemon": daemon,
                         "type_instance": path,
-                        "host": metadata['hostname']
+                        "host": metadata['hostname'],
+                        "fsid": self.get_fsid()
                     },
-                    "time": datetime.utcnow().isoformat() + 'Z',
+                    "time": now,
                     "fields": {
                         "value": value
                     }
@@ -96,32 +136,105 @@ class Module(MgrModule):
 
         return data
 
+    def set_config_option(self, option, value):
+        if option not in self.config_keys.keys():
+            raise RuntimeError('{0} is a unknown configuration '
+                               'option'.format(option))
+
+        if option in ['port', 'interval']:
+            try:
+                value = int(value)
+            except (ValueError, TypeError):
+                raise RuntimeError('invalid {0} configured. Please specify '
+                                   'a valid integer'.format(option))
+
+        if option == 'interval' and value < 5:
+            raise RuntimeError('interval should be set to at least 5 seconds')
+
+        if option in ['ssl', 'verify_ssl']:
+            value = value.lower() == 'true'
+
+        self.config[option] = value
+
+    def init_module_config(self):
+        self.config['hostname'] = \
+            self.get_config("hostname", default=self.config_keys['hostname'])
+        self.config['port'] = \
+            int(self.get_config("port", default=self.config_keys['port']))
+        self.config['database'] = \
+            self.get_config("database", default=self.config_keys['database'])
+        self.config['username'] = \
+            self.get_config("username", default=self.config_keys['username'])
+        self.config['password'] = \
+            self.get_config("password", default=self.config_keys['password'])
+        self.config['interval'] = \
+            int(self.get_config("interval",
+                                default=self.config_keys['interval']))
+        ssl = self.get_config("ssl", default=self.config_keys['ssl'])
+        self.config['ssl'] = ssl.lower() == 'true'
+        verify_ssl = \
+            self.get_config("verify_ssl", default=self.config_keys['verify_ssl'])
+        self.config['verify_ssl'] = verify_ssl.lower() == 'true'
+
     def send_to_influx(self):
-        host = self.get_config("hostname")
-        if not host:
-            self.log.error("No InfluxDB server configured, please set"
-                           "`hostname` configuration key.")
+        if not self.config['hostname']:
+            self.log.error("No Influx server configured, please set one using: "
+                           "ceph influx config-set hostname <hostname>")
+            self.set_health_checks({
+                'MGR_INFLUX_NO_SERVER': {
+                    'severity': 'warning',
+                    'summary': 'No InfluxDB server configured',
+                    'detail': ['Configuration option hostname not set']
+                }
+            })
             return
 
-        port = int(self.get_config("port", default="8086"))
-        database = self.get_config("database", default="ceph")
-
         # If influx server has authentication turned off then
         # missing username/password is valid.
-        username = self.get_config("username", default="")
-        password = self.get_config("password", default="")
-
-        client = InfluxDBClient(host, port, username, password, database)
+        self.log.debug("Sending data to Influx host: %s",
+                       self.config['hostname'])
+        client = InfluxDBClient(self.config['hostname'], self.config['port'],
+                                self.config['username'],
+                                self.config['password'],
+                                self.config['database'],
+                                self.config['ssl'],
+                                self.config['verify_ssl'])
 
-        # using influx client get_list_database requires admin privs, instead we'll catch the not found exception and inform the user if db can't be created
+        # using influx client get_list_database requires admin privs,
+        # instead we'll catch the not found exception and inform the user if
+        # db can not be created
         try:
             client.write_points(self.get_df_stats(), 'ms')
             client.write_points(self.get_daemon_stats(), 'ms')
+            self.set_health_checks(dict())
+        except ConnectionError as e:
+            self.log.exception("Failed to connect to Influx host %s:%d",
+                               self.config['hostname'], self.config['port'])
+            self.set_health_checks({
+                'MGR_INFLUX_SEND_FAILED': {
+                    'severity': 'warning',
+                    'summary': 'Failed to send data to InfluxDB server at %s:%d'
+                               ' due to an connection error'
+                               % (self.config['hostname'], self.config['port']),
+                    'detail': [str(e)]
+                }
+            })
         except InfluxDBClientError as e:
             if e.code == 404:
-                self.log.info("Database '{0}' not found, trying to create (requires admin privs).  You can also create manually and grant write privs to user '{1}'".format(database,username))
-                client.create_database(database)
+                self.log.info("Database '%s' not found, trying to create "
+                              "(requires admin privs).  You can also create "
+                              "manually and grant write privs to user "
+                              "'%s'", self.config['database'],
+                              self.config['username'])
+                client.create_database(self.config['database'])
             else:
+                self.set_health_checks({
+                    'MGR_INFLUX_SEND_FAILED': {
+                        'severity': 'warning',
+                        'summary': 'Failed to send data to InfluxDB',
+                        'detail': [str(e)]
+                    }
+                })
                 raise
 
     def shutdown(self):
@@ -130,18 +243,35 @@ class Module(MgrModule):
         self.event.set()
 
     def handle_command(self, cmd):
+        if cmd['prefix'] == 'influx config-show':
+            return 0, json.dumps(self.config), ''
+        elif cmd['prefix'] == 'influx config-set':
+            key = cmd['key']
+            value = cmd['value']
+            if not value:
+                return -errno.EINVAL, '', 'Value should not be empty or None'
+
+            self.log.debug('Setting configuration option %s to %s', key, value)
+            self.set_config_option(key, value)
+            self.set_config(key, value)
+            return 0, 'Configuration option {0} updated'.format(key), ''
+        elif cmd['prefix'] == 'influx send':
+            self.send_to_influx()
+            return 0, 'Sending data to Influx', ''
         if cmd['prefix'] == 'influx self-test':
             daemon_stats = self.get_daemon_stats()
             assert len(daemon_stats)
             df_stats = self.get_df_stats()
+
             result = {
                 'daemon_stats': daemon_stats,
                 'df_stats': df_stats
             }
+
             return 0, json.dumps(result, indent=2), 'Self-test OK'
-        else:
-            return (-errno.EINVAL, '',
-                    "Command not found '{0}'".format(cmd['prefix']))
+
+        return (-errno.EINVAL, '',
+                "Command not found '{0}'".format(cmd['prefix']))
 
     def serve(self):
         if InfluxDBClient is None:
@@ -150,13 +280,14 @@ class Module(MgrModule):
             return
 
         self.log.info('Starting influx module')
+        self.init_module_config()
         self.run = True
+
         while self.run:
+            start = time.time()
             self.send_to_influx()
-            self.log.debug("Running interval loop")
-            interval = self.get_config("interval")
-            if interval is None:
-                interval = 5
-            self.log.debug("sleeping for %d seconds",interval)
-            self.event.wait(interval)
-            
+            runtime = time.time() - start
+            self.log.debug('Finished sending data in Influx in %.3f seconds',
+                           runtime)
+            self.log.debug("Sleeping for %d seconds", self.config['interval'])
+            self.event.wait(self.config['interval'])