import errno
import json
-from mgr_module import MgrModule, CommandResult
+from mgr_module import MgrModule, CommandResult, CLICommand, Option
import operator
import rados
from threading import Event
-from datetime import datetime, timedelta, date, time
-from six import iteritems
+from datetime import datetime, timedelta
+from typing import cast, Any, Dict, List, Optional, Sequence, Tuple, TYPE_CHECKING, Union
TIME_FORMAT = '%Y%m%d-%H%M%S'
DEVICE_HEALTH_TOOMANY: 'Too many daemons are expected to fail soon',
}
-MAX_SAMPLES=500
+MAX_SAMPLES = 500
+
+
+def get_ata_wear_level(data: Dict[Any,Any]) -> Optional[float]:
+ """
+ Extract wear level (as float) from smartctl -x --json output for SATA SSD
+ """
+ for page in data.get("ata_device_statistics", {}).get("pages", []):
+ if page.get("number") != 7:
+ continue
+ for item in page.get("table", []):
+ if item["offset"] == 8:
+ return item["value"] / 100.0
+ return None
+
+
+def get_nvme_wear_level(data: Dict[Any,Any]) -> Optional[float]:
+ """
+ Extract wear level (as float) from smartctl -x --json output for NVME SSD
+ """
+ pct_used = data.get("nvme_smart_health_information_log", {}).get("percentage_used")
+ if pct_used is None:
+ return None
+ return pct_used / 100.0
class Module(MgrModule):
MODULE_OPTIONS = [
- {
- 'name': 'enable_monitoring',
- 'default': True,
- 'type': 'bool',
- 'desc': 'monitor device health metrics',
- 'runtime': True,
- },
- {
- 'name': 'scrape_frequency',
- 'default': 86400,
- 'type': 'secs',
- 'desc': 'how frequently to scrape device health metrics',
- 'runtime': True,
- },
- {
- 'name': 'pool_name',
- 'default': 'device_health_metrics',
- 'type': 'str',
- 'desc': 'name of pool in which to store device health metrics',
- 'runtime': True,
- },
- {
- 'name': 'retention_period',
- 'default': (86400 * 180),
- 'type': 'secs',
- 'desc': 'how long to retain device health metrics',
- 'runtime': True,
- },
- {
- 'name': 'mark_out_threshold',
- 'default': (86400 * 14 * 2),
- 'type': 'secs',
- 'desc': 'automatically mark OSD if it may fail before this long',
- 'runtime': True,
- },
- {
- 'name': 'warn_threshold',
- 'default': (86400 * 14 * 6),
- 'type': 'secs',
- 'desc': 'raise health warning if OSD may fail before this long',
- 'runtime': True,
- },
- {
- 'name': 'self_heal',
- 'default': True,
- 'type': 'bool',
- 'desc': 'preemptively heal cluster around devices that may fail',
- 'runtime': True,
- },
- {
- 'name': 'sleep_interval',
- 'default': 600,
- 'type': 'secs',
- 'desc': 'how frequently to wake up and check device health',
- 'runtime': True,
- },
+ Option(
+ name='enable_monitoring',
+ default=True,
+ type='bool',
+ desc='monitor device health metrics',
+ runtime=True,
+ ),
+ Option(
+ name='scrape_frequency',
+ default=86400,
+ type='secs',
+ desc='how frequently to scrape device health metrics',
+ runtime=True,
+ ),
+ Option(
+ name='pool_name',
+ default='device_health_metrics',
+ type='str',
+ desc='name of pool in which to store device health metrics',
+ runtime=True,
+ ),
+ Option(
+ name='retention_period',
+ default=(86400 * 180),
+ type='secs',
+ desc='how long to retain device health metrics',
+ runtime=True,
+ ),
+ Option(
+ name='mark_out_threshold',
+ default=(86400 * 14 * 2),
+ type='secs',
+ desc='automatically mark OSD if it may fail before this long',
+ runtime=True,
+ ),
+ Option(
+ name='warn_threshold',
+ default=(86400 * 14 * 6),
+ type='secs',
+ desc='raise health warning if OSD may fail before this long',
+ runtime=True,
+ ),
+ Option(
+ name='self_heal',
+ default=True,
+ type='bool',
+ desc='preemptively heal cluster around devices that may fail',
+ runtime=True,
+ ),
+ Option(
+ name='sleep_interval',
+ default=600,
+ type='secs',
+ desc='how frequently to wake up and check device health',
+ runtime=True,
+ ),
]
- COMMANDS = [
- {
- "cmd": "device query-daemon-health-metrics "
- "name=who,type=CephString",
- "desc": "Get device health metrics for a given daemon",
- "perm": "r"
- },
- {
- "cmd": "device scrape-daemon-health-metrics "
- "name=who,type=CephString",
- "desc": "Scrape and store device health metrics "
- "for a given daemon",
- "perm": "r"
- },
- {
- "cmd": "device scrape-health-metrics "
- "name=devid,type=CephString,req=False",
- "desc": "Scrape and store health metrics",
- "perm": "r"
- },
- {
- "cmd": "device get-health-metrics "
- "name=devid,type=CephString "
- "name=sample,type=CephString,req=False",
- "desc": "Show stored device metrics for the device",
- "perm": "r"
- },
- {
- "cmd": "device check-health",
- "desc": "Check life expectancy of devices",
- "perm": "rw",
- },
- {
- "cmd": "device monitoring on",
- "desc": "Enable device health monitoring",
- "perm": "rw",
- },
- {
- "cmd": "device monitoring off",
- "desc": "Disable device health monitoring",
- "perm": "rw",
- },
- {
- 'cmd': 'device predict-life-expectancy '
- 'name=devid,type=CephString,req=true',
- 'desc': 'Predict life expectancy with local predictor',
- 'perm': 'r'
- },
- ]
-
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
# populate options (just until serve() runs)
self.event = Event()
self.has_device_pool = False
- def is_valid_daemon_name(self, who):
- l = who.split('.')
- if len(l) != 2:
+ # for mypy which does not run the code
+ if TYPE_CHECKING:
+ self.enable_monitoring = True
+ self.scrape_frequency = 0.0
+ self.pool_name = ''
+ self.device_health_metrics = ''
+ self.retention_period = 0.0
+ self.mark_out_threshold = 0.0
+ self.warn_threshold = 0.0
+ self.self_heal = True
+ self.sleep_interval = 0.0
+
+ def is_valid_daemon_name(self, who: str) -> bool:
+ parts = who.split('.')
+ if len(parts) != 2:
return False
- if l[0] not in ('osd', 'mon'):
- return False;
- return True;
-
- def handle_command(self, _, cmd):
- self.log.error("handle_command")
-
- if cmd['prefix'] == 'device query-daemon-health-metrics':
- who = cmd.get('who', '')
- if not self.is_valid_daemon_name(who):
- return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
- (daemon_type, daemon_id) = cmd.get('who', '').split('.')
- result = CommandResult('')
- self.send_command(result, daemon_type, daemon_id, json.dumps({
- 'prefix': 'smart',
- 'format': 'json',
- }), '')
- r, outb, outs = result.wait()
- return r, outb, outs
- elif cmd['prefix'] == 'device scrape-daemon-health-metrics':
- who = cmd.get('who', '')
- if not self.is_valid_daemon_name(who):
- return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
- (daemon_type, daemon_id) = cmd.get('who', '').split('.')
- return self.scrape_daemon(daemon_type, daemon_id)
- elif cmd['prefix'] == 'device scrape-health-metrics':
- if 'devid' in cmd:
- return self.scrape_device(cmd['devid'])
+ return parts[0] in ('osd', 'mon')
+
+ @CLICommand('device query-daemon-health-metrics',
+ perm='r')
+ def do_query_daemon_health_metrics(self, who: str) -> Tuple[int, str, str]:
+ '''
+ Get device health metrics for a given daemon
+ '''
+ if not self.is_valid_daemon_name(who):
+ return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
+ (daemon_type, daemon_id) = who.split('.')
+ result = CommandResult('')
+ self.send_command(result, daemon_type, daemon_id, json.dumps({
+ 'prefix': 'smart',
+ 'format': 'json',
+ }), '')
+ return result.wait()
+
+ @CLICommand('device scrape-daemon-health-metrics',
+ perm='r')
+ def do_scrape_daemon_health_metrics(self, who: str) -> Tuple[int, str, str]:
+ '''
+ Scrape and store device health metrics for a given daemon
+ '''
+ if not self.is_valid_daemon_name(who):
+ return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
+ (daemon_type, daemon_id) = who.split('.')
+ return self.scrape_daemon(daemon_type, daemon_id)
+
+ @CLICommand('device scrape-daemon-health-metrics',
+ perm='r')
+ def do_scrape_health_metrics(self, devid: Optional[str] = None) -> Tuple[int, str, str]:
+ '''
+ Scrape and store device health metrics
+ '''
+ if devid is None:
return self.scrape_all()
- elif cmd['prefix'] == 'device get-health-metrics':
- return self.show_device_metrics(cmd['devid'], cmd.get('sample'))
- elif cmd['prefix'] == 'device check-health':
- return self.check_health()
- elif cmd['prefix'] == 'device monitoring on':
- self.set_module_option('enable_monitoring', True)
- self.event.set()
- return 0, '', ''
- elif cmd['prefix'] == 'device monitoring off':
- self.set_module_option('enable_monitoring', False)
- self.set_health_checks({}) # avoid stuck health alerts
- return 0, '', ''
- elif cmd['prefix'] == 'device predict-life-expectancy':
- return self.predict_lift_expectancy(cmd['devid'])
else:
- # mgr should respect our self.COMMANDS and not call us for
- # any prefix we don't advertise
- raise NotImplementedError(cmd['prefix'])
-
- def self_test(self):
+ return self.scrape_device(devid)
+
+ @CLICommand('device get-health-metrics',
+ perm='r')
+ def do_get_health_metrics(self, devid: str, sample: Optional[str] = None) -> Tuple[int, str, str]:
+ '''
+ Show stored device metrics for the device
+ '''
+ return self.show_device_metrics(devid, sample)
+
+ @CLICommand('device check-health',
+ perm='rw')
+ def do_check_health(self) -> Tuple[int, str, str]:
+ '''
+ Check life expectancy of devices
+ '''
+ return self.check_health()
+
+ @CLICommand('device monitoring on',
+ perm='rw')
+ def do_monitoring_on(self) -> Tuple[int, str, str]:
+ '''
+ Enable device health monitoring
+ '''
+ self.set_module_option('enable_monitoring', True)
+ self.event.set()
+ return 0, '', ''
+
+ @CLICommand('device monitoring off',
+ perm='rw')
+ def do_monitoring_off(self) -> Tuple[int, str, str]:
+ '''
+ Disable device health monitoring
+ '''
+ self.set_module_option('enable_monitoring', False)
+ self.set_health_checks({}) # avoid stuck health alerts
+ return 0, '', ''
+
+ @CLICommand('device predict-life-expectancy',
+ perm='r')
+ def do_predict_life_expectancy(self, devid: str) -> Tuple[int, str, str]:
+ '''
+ Predict life expectancy with local predictor
+ '''
+ return self.predict_lift_expectancy(devid)
+
+ def self_test(self) -> None:
self.config_notify()
osdmap = self.get('osd_map')
osd_id = osdmap['osds'][0]['osd']
assert r == 0
assert before != after
- def config_notify(self):
+ def config_notify(self) -> None:
for opt in self.MODULE_OPTIONS:
setattr(self,
opt['name'],
self.get_module_option(opt['name']))
self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
- def notify(self, notify_type, notify_id):
- # create device_health_metrics pool if it doesn't exist
- if notify_type == "osd_map" and self.enable_monitoring:
- if not self.has_device_pool:
- self.create_device_pool()
- self.has_device_pool = True
+ def notify(self, notify_type: str, notify_id: str) -> None:
+ if notify_type == "osd_map" and self.enable_monitoring:
+ # create device_health_metrics pool if it doesn't exist
+ self.maybe_create_device_pool()
+
+ def have_enough_osds(self) -> bool:
+ # wait until we have enough OSDs to allow the pool to be healthy
+ up = 0
+ for osd in self.get("osd_map")["osds"]:
+ if osd["up"]:
+ up += 1
- def create_device_pool(self):
+ need = cast(int, self.get_ceph_option("osd_pool_default_size"))
+ return up >= need
+
+ def maybe_create_device_pool(self) -> bool:
+ if not self.has_device_pool:
+ if not self.have_enough_osds():
+ self.log.warning("Not enough OSDs yet to create monitoring pool")
+ return False
+ self.create_device_pool()
+ self.has_device_pool = True
+ return True
+
+ def create_device_pool(self) -> None:
self.log.debug('create %s pool' % self.pool_name)
# create pool
result = CommandResult('')
r, outb, outs = result.wait()
assert r == 0
- def serve(self):
+ def serve(self) -> None:
self.log.info("Starting")
self.config_notify()
if ls:
try:
last_scrape = datetime.strptime(ls, TIME_FORMAT)
- except ValueError as e:
+ except ValueError:
pass
self.log.debug('Last scrape %s', last_scrape)
next_scrape = now
else:
# align to scrape interval
- scrape_frequency = int(self.scrape_frequency) or 86400
+ scrape_frequency = self.scrape_frequency or 86400
seconds = (last_scrape - datetime.utcfromtimestamp(0)).total_seconds()
seconds -= seconds % scrape_frequency
seconds += scrape_frequency
self.set_store('last_scrape', last_scrape.strftime(TIME_FORMAT))
# sleep
- sleep_interval = int(self.sleep_interval) or 60
+ sleep_interval = self.sleep_interval or 60
self.log.debug('Sleeping for %d seconds', sleep_interval)
ret = self.event.wait(sleep_interval)
self.event.clear()
- def shutdown(self):
+ def shutdown(self) -> None:
self.log.info('Stopping')
self.run = False
self.event.set()
- def open_connection(self, create_if_missing=True):
- osdmap = self.get("osd_map")
- assert osdmap is not None
- if len(osdmap['osds']) == 0:
- return None
- if not self.has_device_pool:
- if not create_if_missing:
+ def open_connection(self, create_if_missing: bool = True) -> rados.Ioctx:
+ if create_if_missing:
+ if not self.maybe_create_device_pool():
return None
- if self.enable_monitoring:
- self.create_device_pool()
- self.has_device_pool = True
ioctx = self.rados.open_ioctx(self.pool_name)
return ioctx
- def scrape_daemon(self, daemon_type, daemon_id):
+ def scrape_daemon(self, daemon_type: str, daemon_id: str) -> Tuple[int, str, str]:
ioctx = self.open_connection()
if not ioctx:
- return 0, "", ""
+ return -errno.EAGAIN, "", "device_health_metrics pool not yet available"
raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id)
if raw_smart_data:
for device, raw_data in raw_smart_data.items():
ioctx.close()
return 0, "", ""
- def scrape_all(self):
+ def scrape_all(self) -> Tuple[int, str, str]:
osdmap = self.get("osd_map")
assert osdmap is not None
ioctx = self.open_connection()
if not ioctx:
- return 0, "", ""
+ return -errno.EAGAIN, "", "device_health_metrics pool not yet available"
did_device = {}
ids = []
for osd in osdmap['osds']:
ioctx.close()
return 0, "", ""
- def scrape_device(self, devid):
+ def scrape_device(self, devid: str) -> Tuple[int, str, str]:
r = self.get("device " + devid)
if not r or 'device' not in r.keys():
return -errno.ENOENT, '', 'device ' + devid + ' not found'
(daemon_type, daemon_id) = daemons[0].split('.')
ioctx = self.open_connection()
if not ioctx:
- return 0, "", ""
+ return -errno.EAGAIN, "", "device_health_metrics pool not yet available"
raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id,
devid=devid)
if raw_smart_data:
ioctx.close()
return 0, "", ""
- def do_scrape_daemon(self, daemon_type, daemon_id, devid=''):
+ def do_scrape_daemon(self,
+ daemon_type: str,
+ daemon_id: str,
+ devid: str = '') -> Optional[Dict[str, Any]]:
"""
:return: a dict, or None if the scrape failed.
"""
self.log.error(
"Fail to parse JSON result from daemon {0}.{1} ({2})".format(
daemon_type, daemon_id, outb))
+ return None
- def put_device_metrics(self, ioctx, devid, data):
+ def put_device_metrics(self, ioctx: rados.Ioctx, devid: str, data: Any) -> None:
assert devid
old_key = datetime.utcnow() - timedelta(
- seconds=int(self.retention_period))
+ seconds=self.retention_period)
prune = old_key.strftime(TIME_FORMAT)
self.log.debug('put_device_metrics device %s prune %s' %
(devid, prune))
erase = []
try:
with rados.ReadOpCtx() as op:
- omap_iter, ret = ioctx.get_omap_keys(op, "", MAX_SAMPLES) # fixme
+ # FIXME
+ omap_iter, ret = ioctx.get_omap_keys(op, "", MAX_SAMPLES)
assert ret == 0
ioctx.operate_read_op(op, devid)
for key, _ in list(omap_iter):
ioctx.remove_omap_keys(op, tuple(erase))
ioctx.operate_write_op(op, devid)
- def _get_device_metrics(self, devid, sample=None, min_sample=None):
+ # extract wear level?
+ wear_level = get_ata_wear_level(data)
+ if wear_level is None:
+ wear_level = get_nvme_wear_level(data)
+ dev_data = self.get(f"device {devid}") or {}
+ if wear_level is not None:
+ if dev_data.get(wear_level) != str(wear_level):
+ dev_data["wear_level"] = str(wear_level)
+ self.log.debug(f"updating {devid} wear level to {wear_level}")
+ self.set_device_wear_level(devid, wear_level)
+ else:
+ if "wear_level" in dev_data:
+ del dev_data["wear_level"]
+ self.log.debug(f"removing {devid} wear level")
+ self.set_device_wear_level(devid, -1.0)
+
+ def _get_device_metrics(self, devid: str,
+ sample: Optional[str] = None,
+ min_sample: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
res = {}
ioctx = self.open_connection(create_if_missing=False)
if not ioctx:
raise
return res
- def show_device_metrics(self, devid, sample):
+ def show_device_metrics(self, devid: str, sample: Optional[str]) -> Tuple[int, str, str]:
# verify device exists
r = self.get("device " + devid)
if not r or 'device' not in r.keys():
res = self._get_device_metrics(devid, sample=sample)
return 0, json.dumps(res, indent=4, sort_keys=True), ''
- def check_health(self):
+ def check_health(self) -> Tuple[int, str, str]:
self.log.info('Check health')
config = self.get('config')
min_in_ratio = float(config.get('mon_osd_min_in_ratio'))
- mark_out_threshold_td = timedelta(seconds=int(self.mark_out_threshold))
- warn_threshold_td = timedelta(seconds=int(self.warn_threshold))
- checks = {}
- health_warnings = {
+ mark_out_threshold_td = timedelta(seconds=self.mark_out_threshold)
+ warn_threshold_td = timedelta(seconds=self.warn_threshold)
+ checks: Dict[str, Dict[str, Union[int, str, Sequence[str]]]] = {}
+ health_warnings: Dict[str, List[str]] = {
DEVICE_HEALTH: [],
DEVICE_HEALTH_IN_USE: [],
}
did += 1
if to_mark_out:
self.mark_out_etc(to_mark_out)
- for warning, ls in iteritems(health_warnings):
+ for warning, ls in health_warnings.items():
n = len(ls)
if n:
checks[warning] = {
self.set_health_checks(checks)
return 0, "", ""
- def is_osd_in(self, osdmap, osd_id):
+ def is_osd_in(self, osdmap: Dict[str, Any], osd_id: str) -> bool:
for osd in osdmap['osds']:
- if str(osd_id) == str(osd['osd']):
+ if osd_id == str(osd['osd']):
return bool(osd['in'])
return False
- def get_osd_num_pgs(self, osd_id):
+ def get_osd_num_pgs(self, osd_id: str) -> int:
stats = self.get('osd_stats')
assert stats is not None
for stat in stats['osd_stats']:
- if str(osd_id) == str(stat['osd']):
+ if osd_id == str(stat['osd']):
return stat['num_pgs']
return -1
- def mark_out_etc(self, osd_ids):
+ def mark_out_etc(self, osd_ids: List[str]) -> None:
self.log.info('Marking out OSDs: %s' % osd_ids)
result = CommandResult('')
self.send_command(result, 'mon', '', json.dumps({
}), '')
r, outb, outs = result.wait()
if r != 0:
- self.log.warning('Could not mark OSD %s out. r: [%s], outb: [%s], outs: [%s]' % (osd_ids, r, outb, outs))
+ self.log.warning('Could not mark OSD %s out. r: [%s], outb: [%s], outs: [%s]',
+ osd_ids, r, outb, outs)
for osd_id in osd_ids:
result = CommandResult('')
self.send_command(result, 'mon', '', json.dumps({
}), '')
r, outb, outs = result.wait()
if r != 0:
- self.log.warning('Could not set osd.%s primary-affinity, r: [%s], outs: [%s]' % (osd_id, r, outb, outs))
+ self.log.warning('Could not set osd.%s primary-affinity, '
+ 'r: [%s], outb: [%s], outs: [%s]',
+ osd_id, r, outb, outs)
- def extract_smart_features(self, raw):
+ def extract_smart_features(self, raw: Any) -> Any:
# FIXME: extract and normalize raw smartctl --json output and
# generate a dict of the fields we care about.
return raw
- def predict_lift_expectancy(self, devid):
+ def predict_lift_expectancy(self, devid: str) -> Tuple[int, str, str]:
plugin_name = ''
model = self.get_ceph_option('device_failure_prediction_mode')
- if model and model.lower() == 'cloud':
- plugin_name = 'diskprediction_cloud'
- elif model and model.lower() == 'local':
+ if cast(str, model).lower() == 'local':
plugin_name = 'diskprediction_local'
else:
return -1, '', 'unable to enable any disk prediction model[local/cloud]'
can_run, _ = self.remote(plugin_name, 'can_run')
if can_run:
return self.remote(plugin_name, 'predict_life_expectancy', devid=devid)
+ else:
+ return -1, '', f'{plugin_name} is not available'
except:
return -1, '', 'unable to invoke diskprediction local or remote plugin'
- def predict_all_devices(self):
+ def predict_all_devices(self) -> Tuple[int, str, str]:
plugin_name = ''
model = self.get_ceph_option('device_failure_prediction_mode')
- if model and model.lower() == 'cloud':
- plugin_name = 'diskprediction_cloud'
- elif model and model.lower() == 'local':
+ if cast(str, model).lower() == 'local':
plugin_name = 'diskprediction_local'
else:
return -1, '', 'unable to enable any disk prediction model[local/cloud]'
can_run, _ = self.remote(plugin_name, 'can_run')
if can_run:
return self.remote(plugin_name, 'predict_all_devices')
+ else:
+ return -1, '', f'{plugin_name} is not available'
except:
return -1, '', 'unable to invoke diskprediction local or remote plugin'
- def get_recent_device_metrics(self, devid, min_sample):
+ def get_recent_device_metrics(self, devid: str, min_sample: str) -> Dict[str, Dict[str, Any]]:
return self._get_device_metrics(devid, min_sample=min_sample)
- def get_time_format(self):
+ def get_time_format(self) -> str:
return TIME_FORMAT