]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/devicehealth/module.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / devicehealth / module.py
index c4e46854b9c09b95ca275e11498b4514bc9dfe86..54b1f5b0dfc16d16d75ad4847b2aa31bf6de8df5 100644 (file)
@@ -4,12 +4,12 @@ Device health monitoring
 
 import errno
 import json
-from mgr_module import MgrModule, CommandResult
+from mgr_module import MgrModule, CommandResult, CLICommand, Option
 import operator
 import rados
 from threading import Event
-from datetime import datetime, timedelta, date, time
-from six import iteritems
+from datetime import datetime, timedelta
+from typing import cast, Any, Dict, List, Optional, Sequence, Tuple, TYPE_CHECKING, Union
 
 TIME_FORMAT = '%Y%m%d-%H%M%S'
 
@@ -22,120 +22,93 @@ HEALTH_MESSAGES = {
     DEVICE_HEALTH_TOOMANY: 'Too many daemons are expected to fail soon',
 }
 
-MAX_SAMPLES=500
+MAX_SAMPLES = 500
+
+
+def get_ata_wear_level(data: Dict[Any,Any]) -> Optional[float]:
+    """
+    Extract wear level (as float) from smartctl -x --json output for SATA SSD
+    """
+    for page in data.get("ata_device_statistics", {}).get("pages", []):
+        if page.get("number") != 7:
+            continue
+        for item in page.get("table", []):
+            if item["offset"] == 8:
+                return item["value"] / 100.0
+    return None
+
+
+def get_nvme_wear_level(data: Dict[Any,Any]) -> Optional[float]:
+    """
+    Extract wear level (as float) from smartctl -x --json output for NVME SSD
+    """
+    pct_used = data.get("nvme_smart_health_information_log", {}).get("percentage_used")
+    if pct_used is None:
+        return None
+    return pct_used / 100.0
 
 
 class Module(MgrModule):
     MODULE_OPTIONS = [
-        {
-            'name': 'enable_monitoring',
-            'default': True,
-            'type': 'bool',
-            'desc': 'monitor device health metrics',
-            'runtime': True,
-        },
-        {
-            'name': 'scrape_frequency',
-            'default': 86400,
-            'type': 'secs',
-            'desc': 'how frequently to scrape device health metrics',
-            'runtime': True,
-        },
-        {
-            'name': 'pool_name',
-            'default': 'device_health_metrics',
-            'type': 'str',
-            'desc': 'name of pool in which to store device health metrics',
-            'runtime': True,
-        },
-        {
-            'name': 'retention_period',
-            'default': (86400 * 180),
-            'type': 'secs',
-            'desc': 'how long to retain device health metrics',
-            'runtime': True,
-        },
-        {
-            'name': 'mark_out_threshold',
-            'default': (86400 * 14 * 2),
-            'type': 'secs',
-            'desc': 'automatically mark OSD if it may fail before this long',
-            'runtime': True,
-        },
-        {
-            'name': 'warn_threshold',
-            'default': (86400 * 14 * 6),
-            'type': 'secs',
-            'desc': 'raise health warning if OSD may fail before this long',
-            'runtime': True,
-        },
-        {
-            'name': 'self_heal',
-            'default': True,
-            'type': 'bool',
-            'desc': 'preemptively heal cluster around devices that may fail',
-            'runtime': True,
-        },
-        {
-            'name': 'sleep_interval',
-            'default': 600,
-            'type': 'secs',
-            'desc': 'how frequently to wake up and check device health',
-            'runtime': True,
-        },
+        Option(
+            name='enable_monitoring',
+            default=True,
+            type='bool',
+            desc='monitor device health metrics',
+            runtime=True,
+        ),
+        Option(
+            name='scrape_frequency',
+            default=86400,
+            type='secs',
+            desc='how frequently to scrape device health metrics',
+            runtime=True,
+        ),
+        Option(
+            name='pool_name',
+            default='device_health_metrics',
+            type='str',
+            desc='name of pool in which to store device health metrics',
+            runtime=True,
+        ),
+        Option(
+            name='retention_period',
+            default=(86400 * 180),
+            type='secs',
+            desc='how long to retain device health metrics',
+            runtime=True,
+        ),
+        Option(
+            name='mark_out_threshold',
+            default=(86400 * 14 * 2),
+            type='secs',
+            desc='automatically mark OSD if it may fail before this long',
+            runtime=True,
+        ),
+        Option(
+            name='warn_threshold',
+            default=(86400 * 14 * 6),
+            type='secs',
+            desc='raise health warning if OSD may fail before this long',
+            runtime=True,
+        ),
+        Option(
+            name='self_heal',
+            default=True,
+            type='bool',
+            desc='preemptively heal cluster around devices that may fail',
+            runtime=True,
+        ),
+        Option(
+            name='sleep_interval',
+            default=600,
+            type='secs',
+            desc='how frequently to wake up and check device health',
+            runtime=True,
+        ),
     ]
 
-    COMMANDS = [
-        {
-            "cmd": "device query-daemon-health-metrics "
-                   "name=who,type=CephString",
-            "desc": "Get device health metrics for a given daemon",
-            "perm": "r"
-        },
-        {
-            "cmd": "device scrape-daemon-health-metrics "
-                   "name=who,type=CephString",
-            "desc": "Scrape and store device health metrics "
-                    "for a given daemon",
-            "perm": "r"
-        },
-        {
-            "cmd": "device scrape-health-metrics "
-                   "name=devid,type=CephString,req=False",
-            "desc": "Scrape and store health metrics",
-            "perm": "r"
-        },
-        {
-            "cmd": "device get-health-metrics "
-                   "name=devid,type=CephString "
-                   "name=sample,type=CephString,req=False",
-            "desc": "Show stored device metrics for the device",
-            "perm": "r"
-        },
-        {
-            "cmd": "device check-health",
-            "desc": "Check life expectancy of devices",
-            "perm": "rw",
-        },
-        {
-            "cmd": "device monitoring on",
-            "desc": "Enable device health monitoring",
-            "perm": "rw",
-        },
-        {
-            "cmd": "device monitoring off",
-            "desc": "Disable device health monitoring",
-            "perm": "rw",
-        },
-        {
-            'cmd': 'device predict-life-expectancy '
-                   'name=devid,type=CephString,req=true',
-            'desc': 'Predict life expectancy with local predictor',
-            'perm': 'r'
-        },
-    ]
-
-    def __init__(self, *args, **kwargs):
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
         super(Module, self).__init__(*args, **kwargs)
 
         # populate options (just until serve() runs)
@@ -147,59 +120,107 @@ class Module(MgrModule):
         self.event = Event()
         self.has_device_pool = False
 
-    def is_valid_daemon_name(self, who):
-        l = who.split('.')
-        if len(l) != 2:
+        # for mypy which does not run the code
+        if TYPE_CHECKING:
+            self.enable_monitoring = True
+            self.scrape_frequency = 0.0
+            self.pool_name = ''
+            self.device_health_metrics = ''
+            self.retention_period = 0.0
+            self.mark_out_threshold = 0.0
+            self.warn_threshold = 0.0
+            self.self_heal = True
+            self.sleep_interval = 0.0
+
+    def is_valid_daemon_name(self, who: str) -> bool:
+        parts = who.split('.')
+        if len(parts) != 2:
             return False
-        if l[0] not in ('osd', 'mon'):
-            return False;
-        return True;
-
-    def handle_command(self, _, cmd):
-        self.log.error("handle_command")
-
-        if cmd['prefix'] == 'device query-daemon-health-metrics':
-            who = cmd.get('who', '')
-            if not self.is_valid_daemon_name(who):
-                return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
-            (daemon_type, daemon_id) = cmd.get('who', '').split('.')
-            result = CommandResult('')
-            self.send_command(result, daemon_type, daemon_id, json.dumps({
-                'prefix': 'smart',
-                'format': 'json',
-            }), '')
-            r, outb, outs = result.wait()
-            return r, outb, outs
-        elif cmd['prefix'] == 'device scrape-daemon-health-metrics':
-            who = cmd.get('who', '')
-            if not self.is_valid_daemon_name(who):
-                return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
-            (daemon_type, daemon_id) = cmd.get('who', '').split('.')
-            return self.scrape_daemon(daemon_type, daemon_id)
-        elif cmd['prefix'] == 'device scrape-health-metrics':
-            if 'devid' in cmd:
-                return self.scrape_device(cmd['devid'])
+        return parts[0] in ('osd', 'mon')
+
+    @CLICommand('device query-daemon-health-metrics',
+                perm='r')
+    def do_query_daemon_health_metrics(self, who: str) -> Tuple[int, str, str]:
+        '''
+        Get device health metrics for a given daemon
+        '''
+        if not self.is_valid_daemon_name(who):
+            return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
+        (daemon_type, daemon_id) = who.split('.')
+        result = CommandResult('')
+        self.send_command(result, daemon_type, daemon_id, json.dumps({
+            'prefix': 'smart',
+            'format': 'json',
+        }), '')
+        return result.wait()
+
+    @CLICommand('device scrape-daemon-health-metrics',
+                perm='r')
+    def do_scrape_daemon_health_metrics(self, who: str) -> Tuple[int, str, str]:
+        '''
+        Scrape and store device health metrics for a given daemon
+        '''
+        if not self.is_valid_daemon_name(who):
+            return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
+        (daemon_type, daemon_id) = who.split('.')
+        return self.scrape_daemon(daemon_type, daemon_id)
+
+    @CLICommand('device scrape-daemon-health-metrics',
+                perm='r')
+    def do_scrape_health_metrics(self, devid: Optional[str] = None) -> Tuple[int, str, str]:
+        '''
+        Scrape and store device health metrics
+        '''
+        if devid is None:
             return self.scrape_all()
-        elif cmd['prefix'] == 'device get-health-metrics':
-            return self.show_device_metrics(cmd['devid'], cmd.get('sample'))
-        elif cmd['prefix'] == 'device check-health':
-            return self.check_health()
-        elif cmd['prefix'] == 'device monitoring on':
-            self.set_module_option('enable_monitoring', True)
-            self.event.set()
-            return 0, '', ''
-        elif cmd['prefix'] == 'device monitoring off':
-            self.set_module_option('enable_monitoring', False)
-            self.set_health_checks({})  # avoid stuck health alerts
-            return 0, '', ''
-        elif cmd['prefix'] == 'device predict-life-expectancy':
-            return self.predict_lift_expectancy(cmd['devid'])
         else:
-            # mgr should respect our self.COMMANDS and not call us for
-            # any prefix we don't advertise
-            raise NotImplementedError(cmd['prefix'])
-
-    def self_test(self):
+            return self.scrape_device(devid)
+
+    @CLICommand('device get-health-metrics',
+                perm='r')
+    def do_get_health_metrics(self, devid: str, sample: Optional[str] = None) -> Tuple[int, str, str]:
+        '''
+        Show stored device metrics for the device
+        '''
+        return self.show_device_metrics(devid, sample)
+
+    @CLICommand('device check-health',
+                perm='rw')
+    def do_check_health(self) -> Tuple[int, str, str]:
+        '''
+        Check life expectancy of devices
+        '''
+        return self.check_health()
+
+    @CLICommand('device monitoring on',
+                perm='rw')
+    def do_monitoring_on(self) -> Tuple[int, str, str]:
+        '''
+        Enable device health monitoring
+        '''
+        self.set_module_option('enable_monitoring', True)
+        self.event.set()
+        return 0, '', ''
+
+    @CLICommand('device monitoring off',
+                perm='rw')
+    def do_monitoring_off(self) -> Tuple[int, str, str]:
+        '''
+        Disable device health monitoring
+        '''
+        self.set_module_option('enable_monitoring', False)
+        self.set_health_checks({})  # avoid stuck health alerts
+        return 0, '', ''
+
+    @CLICommand('device predict-life-expectancy',
+                perm='r')
+    def do_predict_life_expectancy(self, devid: str) -> Tuple[int, str, str]:
+        '''
+        Predict life expectancy with local predictor
+        '''
+        return self.predict_lift_expectancy(devid)
+
+    def self_test(self) -> None:
         self.config_notify()
         osdmap = self.get('osd_map')
         osd_id = osdmap['osds'][0]['osd']
@@ -215,21 +236,38 @@ class Module(MgrModule):
             assert r == 0
             assert before != after
 
-    def config_notify(self):
+    def config_notify(self) -> None:
         for opt in self.MODULE_OPTIONS:
             setattr(self,
                     opt['name'],
                     self.get_module_option(opt['name']))
             self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
 
-    def notify(self, notify_type, notify_id):
-       # create device_health_metrics pool if it doesn't exist
-       if notify_type == "osd_map" and self.enable_monitoring:
-            if not self.has_device_pool:
-                self.create_device_pool()
-                self.has_device_pool = True
+    def notify(self, notify_type: str, notify_id: str) -> None:
+        if notify_type == "osd_map" and self.enable_monitoring:
+            # create device_health_metrics pool if it doesn't exist
+            self.maybe_create_device_pool()
+
+    def have_enough_osds(self) -> bool:
+        # wait until we have enough OSDs to allow the pool to be healthy
+        up = 0
+        for osd in self.get("osd_map")["osds"]:
+            if osd["up"]:
+                up += 1
 
-    def create_device_pool(self):
+        need = cast(int, self.get_ceph_option("osd_pool_default_size"))
+        return up >= need
+
+    def maybe_create_device_pool(self) -> bool:
+        if not self.has_device_pool:
+            if not self.have_enough_osds():
+                self.log.warning("Not enough OSDs yet to create monitoring pool")
+                return False
+            self.create_device_pool()
+            self.has_device_pool = True
+        return True
+
+    def create_device_pool(self) -> None:
         self.log.debug('create %s pool' % self.pool_name)
         # create pool
         result = CommandResult('')
@@ -253,7 +291,7 @@ class Module(MgrModule):
         r, outb, outs = result.wait()
         assert r == 0
 
-    def serve(self):
+    def serve(self) -> None:
         self.log.info("Starting")
         self.config_notify()
 
@@ -262,7 +300,7 @@ class Module(MgrModule):
         if ls:
             try:
                 last_scrape = datetime.strptime(ls, TIME_FORMAT)
-            except ValueError as e:
+            except ValueError:
                 pass
         self.log.debug('Last scrape %s', last_scrape)
 
@@ -276,7 +314,7 @@ class Module(MgrModule):
                     next_scrape = now
                 else:
                     # align to scrape interval
-                    scrape_frequency = int(self.scrape_frequency) or 86400
+                    scrape_frequency = self.scrape_frequency or 86400
                     seconds = (last_scrape - datetime.utcfromtimestamp(0)).total_seconds()
                     seconds -= seconds % scrape_frequency
                     seconds += scrape_frequency
@@ -295,34 +333,27 @@ class Module(MgrModule):
                     self.set_store('last_scrape', last_scrape.strftime(TIME_FORMAT))
 
             # sleep
-            sleep_interval = int(self.sleep_interval) or 60
+            sleep_interval = self.sleep_interval or 60
             self.log.debug('Sleeping for %d seconds', sleep_interval)
             ret = self.event.wait(sleep_interval)
             self.event.clear()
 
-    def shutdown(self):
+    def shutdown(self) -> None:
         self.log.info('Stopping')
         self.run = False
         self.event.set()
 
-    def open_connection(self, create_if_missing=True):
-        osdmap = self.get("osd_map")
-        assert osdmap is not None
-        if len(osdmap['osds']) == 0:
-            return None
-        if not self.has_device_pool:
-            if not create_if_missing:
+    def open_connection(self, create_if_missing: bool = True) -> rados.Ioctx:
+        if create_if_missing:
+            if not self.maybe_create_device_pool():
                 return None
-            if self.enable_monitoring:
-                self.create_device_pool()
-                self.has_device_pool = True
         ioctx = self.rados.open_ioctx(self.pool_name)
         return ioctx
 
-    def scrape_daemon(self, daemon_type, daemon_id):
+    def scrape_daemon(self, daemon_type: str, daemon_id: str) -> Tuple[int, str, str]:
         ioctx = self.open_connection()
         if not ioctx:
-            return 0, "", ""
+            return -errno.EAGAIN, "", "device_health_metrics pool not yet available"
         raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id)
         if raw_smart_data:
             for device, raw_data in raw_smart_data.items():
@@ -332,12 +363,12 @@ class Module(MgrModule):
         ioctx.close()
         return 0, "", ""
 
-    def scrape_all(self):
+    def scrape_all(self) -> Tuple[int, str, str]:
         osdmap = self.get("osd_map")
         assert osdmap is not None
         ioctx = self.open_connection()
         if not ioctx:
-            return 0, "", ""
+            return -errno.EAGAIN, "", "device_health_metrics pool not yet available"
         did_device = {}
         ids = []
         for osd in osdmap['osds']:
@@ -360,7 +391,7 @@ class Module(MgrModule):
         ioctx.close()
         return 0, "", ""
 
-    def scrape_device(self, devid):
+    def scrape_device(self, devid: str) -> Tuple[int, str, str]:
         r = self.get("device " + devid)
         if not r or 'device' not in r.keys():
             return -errno.ENOENT, '', 'device ' + devid + ' not found'
@@ -371,7 +402,7 @@ class Module(MgrModule):
         (daemon_type, daemon_id) = daemons[0].split('.')
         ioctx = self.open_connection()
         if not ioctx:
-            return 0, "", ""
+            return -errno.EAGAIN, "", "device_health_metrics pool not yet available"
         raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id,
                                                devid=devid)
         if raw_smart_data:
@@ -382,7 +413,10 @@ class Module(MgrModule):
         ioctx.close()
         return 0, "", ""
 
-    def do_scrape_daemon(self, daemon_type, daemon_id, devid=''):
+    def do_scrape_daemon(self,
+                         daemon_type: str,
+                         daemon_id: str,
+                         devid: str = '') -> Optional[Dict[str, Any]]:
         """
         :return: a dict, or None if the scrape failed.
         """
@@ -401,18 +435,20 @@ class Module(MgrModule):
             self.log.error(
                 "Fail to parse JSON result from daemon {0}.{1} ({2})".format(
                     daemon_type, daemon_id, outb))
+            return None
 
-    def put_device_metrics(self, ioctx, devid, data):
+    def put_device_metrics(self, ioctx: rados.Ioctx, devid: str, data: Any) -> None:
         assert devid
         old_key = datetime.utcnow() - timedelta(
-            seconds=int(self.retention_period))
+            seconds=self.retention_period)
         prune = old_key.strftime(TIME_FORMAT)
         self.log.debug('put_device_metrics device %s prune %s' %
                        (devid, prune))
         erase = []
         try:
             with rados.ReadOpCtx() as op:
-                omap_iter, ret = ioctx.get_omap_keys(op, "", MAX_SAMPLES)  # fixme
+                # FIXME
+                omap_iter, ret = ioctx.get_omap_keys(op, "", MAX_SAMPLES)
                 assert ret == 0
                 ioctx.operate_read_op(op, devid)
                 for key, _ in list(omap_iter):
@@ -437,7 +473,25 @@ class Module(MgrModule):
                 ioctx.remove_omap_keys(op, tuple(erase))
             ioctx.operate_write_op(op, devid)
 
-    def _get_device_metrics(self, devid, sample=None, min_sample=None):
+        # extract wear level?
+        wear_level = get_ata_wear_level(data)
+        if wear_level is None:
+            wear_level = get_nvme_wear_level(data)
+        dev_data = self.get(f"device {devid}") or {}
+        if wear_level is not None:
+            if dev_data.get(wear_level) != str(wear_level):
+                dev_data["wear_level"] = str(wear_level)
+                self.log.debug(f"updating {devid} wear level to {wear_level}")
+                self.set_device_wear_level(devid, wear_level)
+        else:
+            if "wear_level" in dev_data:
+                del dev_data["wear_level"]
+                self.log.debug(f"removing {devid} wear level")
+                self.set_device_wear_level(devid, -1.0)
+
+    def _get_device_metrics(self, devid: str,
+                            sample: Optional[str] = None,
+                            min_sample: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
         res = {}
         ioctx = self.open_connection(create_if_missing=False)
         if not ioctx:
@@ -468,7 +522,7 @@ class Module(MgrModule):
                     raise
         return res
 
-    def show_device_metrics(self, devid, sample):
+    def show_device_metrics(self, devid: str, sample: Optional[str]) -> Tuple[int, str, str]:
         # verify device exists
         r = self.get("device " + devid)
         if not r or 'device' not in r.keys():
@@ -477,14 +531,14 @@ class Module(MgrModule):
         res = self._get_device_metrics(devid, sample=sample)
         return 0, json.dumps(res, indent=4, sort_keys=True), ''
 
-    def check_health(self):
+    def check_health(self) -> Tuple[int, str, str]:
         self.log.info('Check health')
         config = self.get('config')
         min_in_ratio = float(config.get('mon_osd_min_in_ratio'))
-        mark_out_threshold_td = timedelta(seconds=int(self.mark_out_threshold))
-        warn_threshold_td = timedelta(seconds=int(self.warn_threshold))
-        checks = {}
-        health_warnings = {
+        mark_out_threshold_td = timedelta(seconds=self.mark_out_threshold)
+        warn_threshold_td = timedelta(seconds=self.warn_threshold)
+        checks: Dict[str, Dict[str, Union[int, str, Sequence[str]]]] = {}
+        health_warnings: Dict[str, List[str]] = {
             DEVICE_HEALTH: [],
             DEVICE_HEALTH_IN_USE: [],
             }
@@ -574,7 +628,7 @@ class Module(MgrModule):
                 did += 1
             if to_mark_out:
                 self.mark_out_etc(to_mark_out)
-        for warning, ls in iteritems(health_warnings):
+        for warning, ls in health_warnings.items():
             n = len(ls)
             if n:
                 checks[warning] = {
@@ -586,21 +640,21 @@ class Module(MgrModule):
         self.set_health_checks(checks)
         return 0, "", ""
 
-    def is_osd_in(self, osdmap, osd_id):
+    def is_osd_in(self, osdmap: Dict[str, Any], osd_id: str) -> bool:
         for osd in osdmap['osds']:
-            if str(osd_id) == str(osd['osd']):
+            if osd_id == str(osd['osd']):
                 return bool(osd['in'])
         return False
 
-    def get_osd_num_pgs(self, osd_id):
+    def get_osd_num_pgs(self, osd_id: str) -> int:
         stats = self.get('osd_stats')
         assert stats is not None
         for stat in stats['osd_stats']:
-            if str(osd_id) == str(stat['osd']):
+            if osd_id == str(stat['osd']):
                 return stat['num_pgs']
         return -1
 
-    def mark_out_etc(self, osd_ids):
+    def mark_out_etc(self, osd_ids: List[str]) -> None:
         self.log.info('Marking out OSDs: %s' % osd_ids)
         result = CommandResult('')
         self.send_command(result, 'mon', '', json.dumps({
@@ -610,7 +664,8 @@ class Module(MgrModule):
         }), '')
         r, outb, outs = result.wait()
         if r != 0:
-            self.log.warning('Could not mark OSD %s out. r: [%s], outb: [%s], outs: [%s]' % (osd_ids, r, outb, outs))
+            self.log.warning('Could not mark OSD %s out. r: [%s], outb: [%s], outs: [%s]',
+                             osd_ids, r, outb, outs)
         for osd_id in osd_ids:
             result = CommandResult('')
             self.send_command(result, 'mon', '', json.dumps({
@@ -621,19 +676,19 @@ class Module(MgrModule):
             }), '')
             r, outb, outs = result.wait()
             if r != 0:
-                self.log.warning('Could not set osd.%s primary-affinity, r: [%s], outs: [%s]' % (osd_id, r, outb, outs))
+                self.log.warning('Could not set osd.%s primary-affinity, '
+                                 'r: [%s], outb: [%s], outs: [%s]',
+                                 osd_id, r, outb, outs)
 
-    def extract_smart_features(self, raw):
+    def extract_smart_features(self, raw: Any) -> Any:
         # FIXME: extract and normalize raw smartctl --json output and
         # generate a dict of the fields we care about.
         return raw
 
-    def predict_lift_expectancy(self, devid):
+    def predict_lift_expectancy(self, devid: str) -> Tuple[int, str, str]:
         plugin_name = ''
         model = self.get_ceph_option('device_failure_prediction_mode')
-        if model and model.lower() == 'cloud':
-            plugin_name = 'diskprediction_cloud'
-        elif model and model.lower() == 'local':
+        if cast(str, model).lower() == 'local':
             plugin_name = 'diskprediction_local'
         else:
             return -1, '', 'unable to enable any disk prediction model[local/cloud]'
@@ -641,15 +696,15 @@ class Module(MgrModule):
             can_run, _ = self.remote(plugin_name, 'can_run')
             if can_run:
                 return self.remote(plugin_name, 'predict_life_expectancy', devid=devid)
+            else:
+                return -1, '', f'{plugin_name} is not available'
         except:
             return -1, '', 'unable to invoke diskprediction local or remote plugin'
 
-    def predict_all_devices(self):
+    def predict_all_devices(self) -> Tuple[int, str, str]:
         plugin_name = ''
         model = self.get_ceph_option('device_failure_prediction_mode')
-        if model and model.lower() == 'cloud':
-            plugin_name = 'diskprediction_cloud'
-        elif model and model.lower() == 'local':
+        if cast(str, model).lower() == 'local':
             plugin_name = 'diskprediction_local'
         else:
             return -1, '', 'unable to enable any disk prediction model[local/cloud]'
@@ -657,11 +712,13 @@ class Module(MgrModule):
             can_run, _ = self.remote(plugin_name, 'can_run')
             if can_run:
                 return self.remote(plugin_name, 'predict_all_devices')
+            else:
+                return -1, '', f'{plugin_name} is not available'
         except:
             return -1, '', 'unable to invoke diskprediction local or remote plugin'
 
-    def get_recent_device_metrics(self, devid, min_sample):
+    def get_recent_device_metrics(self, devid: str, min_sample: str) -> Dict[str, Dict[str, Any]]:
         return self._get_device_metrics(devid, min_sample=min_sample)
 
-    def get_time_format(self):
+    def get_time_format(self) -> str:
         return TIME_FORMAT