import hashlib
-from mgr_module import MgrModule
+from mgr_module import CLICommand, CLIReadCommand, CLIWriteCommand, MgrModule, Option
import datetime
import errno
+import functools
+import inspect
import json
from collections import defaultdict
from prettytable import PrettyTable
import re
from threading import Event, Lock
+from typing import cast, Any, Callable, DefaultDict, Dict, Iterable, List, Optional, Tuple, TypeVar,\
+ Union, TYPE_CHECKING
DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
MAX_WAIT = 600
MIN_WAIT = 60
+
+FuncT = TypeVar('FuncT', bound=Callable)
+
+
+def with_crashes(func: FuncT) -> FuncT:
+ @functools.wraps(func)
+ def wrapper(self: 'Module', *args: Any, **kwargs: Any) -> Tuple[int, str, str]:
+ with self.crashes_lock:
+ if not self.crashes:
+ self._load_crashes()
+ return func(self, *args, **kwargs)
+ wrapper.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
+ return cast(FuncT, wrapper)
+
+
+CrashT = Dict[str, Union[str, List[str]]]
+
+
class Module(MgrModule):
MODULE_OPTIONS = [
- {
- 'name': 'warn_recent_interval',
- 'type': 'secs',
- 'default': 60*60*24*14,
- 'desc': 'time interval in which to warn about recent crashes',
- 'runtime': True,
- },
- {
- 'name': 'retain_interval',
- 'type': 'secs',
- 'default': 60*60*24 * 365,
- 'desc': 'how long to retain crashes before pruning them',
- 'runtime': True,
- },
+ Option(
+ name='warn_recent_interval',
+ type='secs',
+ default=60 * 60 * 24 * 14,
+ desc='time interval in which to warn about recent crashes',
+ runtime=True),
+ Option(
+ name='retain_interval',
+ type='secs',
+ default=60 * 60 * 24 * 365,
+ desc='how long to retain crashes before pruning them',
+ runtime=True),
]
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(Module, self).__init__(*args, **kwargs)
- self.crashes = None
+ self.crashes: Optional[Dict[str, CrashT]] = None
self.crashes_lock = Lock()
self.run = True
self.event = Event()
+ if TYPE_CHECKING:
+ self.warn_recent_interval = 0.0
+ self.retain_interval = 0.0
- def shutdown(self):
+ def shutdown(self) -> None:
self.run = False
self.event.set()
- def serve(self):
+ def serve(self) -> None:
self.config_notify()
while self.run:
with self.crashes_lock:
self.event.wait(wait)
self.event.clear()
- def config_notify(self):
+ def config_notify(self) -> None:
for opt in self.MODULE_OPTIONS:
setattr(self,
opt['name'],
self.log.debug(' mgr option %s = %s',
opt['name'], getattr(self, opt['name']))
- def _load_crashes(self):
+ def _load_crashes(self) -> None:
raw = self.get_store_prefix('crash/')
self.crashes = {k[6:]: json.loads(m) for (k, m) in raw.items()}
- def _refresh_health_checks(self):
+ def _refresh_health_checks(self) -> None:
if not self.crashes:
self._load_crashes()
+ assert self.crashes is not None
cutoff = datetime.datetime.utcnow() - datetime.timedelta(
seconds=self.warn_recent_interval)
recent = {
crashid: crash for crashid, crash in self.crashes.items()
- if self.time_from_string(crash['timestamp']) > cutoff and 'archived' not in crash
+ if (self.time_from_string(cast(str, crash['timestamp'])) > cutoff
+ and 'archived' not in crash)
}
- num = len(recent)
- health_checks = {}
- if recent:
- detail = [
- '%s crashed on host %s at %s' % (
- crash.get('entity_name', 'unidentified daemon'),
- crash.get('utsname_hostname', '(unknown)'),
- crash.get('timestamp', 'unknown time'))
- for (_, crash) in recent.items()]
+
+ def prune_detail(ls: List[str]) -> int:
+ num = len(ls)
if num > 30:
- detail = detail[0:30]
- detail.append('and %d more' % (num - 30))
- self.log.debug('detail %s' % detail)
+ ls = ls[0:30]
+ ls.append('and %d more' % (num - 30))
+ return num
+
+ daemon_crashes = []
+ module_crashes = []
+ for c in recent.values():
+ if 'mgr_module' in c:
+ module_crashes.append(c)
+ else:
+ daemon_crashes.append(c)
+ daemon_detail = [
+ '%s crashed on host %s at %s' % (
+ crash.get('entity_name', 'unidentified daemon'),
+ crash.get('utsname_hostname', '(unknown)'),
+ crash.get('timestamp', 'unknown time'))
+ for crash in daemon_crashes]
+ module_detail = [
+ 'mgr module %s crashed in daemon %s on host %s at %s' % (
+ crash.get('mgr_module', 'unidentified module'),
+ crash.get('entity_name', 'unidentified daemon'),
+ crash.get('utsname_hostname', '(unknown)'),
+ crash.get('timestamp', 'unknown time'))
+ for crash in module_crashes]
+ daemon_num = prune_detail(daemon_detail)
+ module_num = prune_detail(module_detail)
+
+ health_checks: Dict[str, Dict[str, Union[int, str, List[str]]]] = {}
+ if daemon_detail:
+ self.log.debug('daemon detail %s' % daemon_detail)
health_checks['RECENT_CRASH'] = {
'severity': 'warning',
- 'summary': '%d daemons have recently crashed' % (num),
- 'count': num,
- 'detail': detail,
+ 'summary': '%d daemons have recently crashed' % (daemon_num),
+ 'count': daemon_num,
+ 'detail': daemon_detail,
+ }
+ if module_detail:
+ self.log.debug('module detail %s' % module_detail)
+ health_checks['RECENT_MGR_MODULE_CRASH'] = {
+ 'severity': 'warning',
+ 'summary': '%d mgr modules have recently crashed' % (module_num),
+ 'count': module_num,
+ 'detail': module_detail,
}
- self.set_health_checks(health_checks)
-
- def handle_command(self, inbuf, command):
- with self.crashes_lock:
- if not self.crashes:
- self._load_crashes()
- for cmd in self.COMMANDS:
- if cmd['cmd'].startswith(command['prefix']):
- handler = cmd['handler']
- break
- if handler is None:
- return errno.EINVAL, '', 'unknown command %s' % command['prefix']
- return handler(self, command, inbuf)
+ self.set_health_checks(health_checks)
- def time_from_string(self, timestr):
+ def time_from_string(self, timestr: str) -> datetime.datetime:
# drop the 'Z' timezone indication, it's always UTC
timestr = timestr.rstrip('Z')
try:
except ValueError:
return datetime.datetime.strptime(timestr, OLD_DATEFMT)
- def validate_crash_metadata(self, inbuf):
+ def validate_crash_metadata(self, inbuf: str) -> Dict[str, Union[str, List[str]]]:
# raise any exceptions to caller
metadata = json.loads(inbuf)
for f in ['crash_id', 'timestamp']:
if f not in metadata:
raise AttributeError("missing '%s' field" % f)
- time = self.time_from_string(metadata['timestamp'])
+ _ = self.time_from_string(metadata['timestamp'])
return metadata
- def timestamp_filter(self, f):
+ def timestamp_filter(self, f: Callable[[datetime.datetime], bool]) -> Iterable[Tuple[str, CrashT]]:
"""
Filter crash reports by timestamp.
:param f: f(time) return true to keep crash report
:returns: crash reports for which f(time) returns true
"""
- def inner(pair):
+ def inner(pair: Tuple[str, CrashT]) -> bool:
_, crash = pair
- time = self.time_from_string(crash["timestamp"])
+ time = self.time_from_string(cast(str, crash["timestamp"]))
return f(time)
+ assert self.crashes is not None
return filter(inner, self.crashes.items())
# stack signature helpers
- def sanitize_backtrace(self, bt):
+ def sanitize_backtrace(self, bt: List[str]) -> List[str]:
ret = list()
for func_record in bt:
# split into two fields on last space, take the first one,
ASSERT_MATCHEXPR = re.compile(r'(?s)(.*) thread .* time .*(: .*)\n')
- def sanitize_assert_msg(self, msg):
+ def sanitize_assert_msg(self, msg: str) -> str:
# (?s) allows matching newline. get everything up to "thread" and
# then after-and-including the last colon-space. This skips the
# thread id, timestamp, and file:lineno, because file is already in
# the beginning, and lineno may vary.
- return ''.join(self.ASSERT_MATCHEXPR.match(msg).groups())
+ matched = self.ASSERT_MATCHEXPR.match(msg)
+ assert matched
+ return ''.join(matched.groups())
- def calc_sig(self, bt, assert_msg):
+ def calc_sig(self, bt: List[str], assert_msg: Optional[str]) -> str:
sig = hashlib.sha256()
for func in self.sanitize_backtrace(bt):
sig.update(func.encode())
# command handlers
- def do_info(self, cmd, inbuf):
- crashid = cmd['id']
+ @CLIReadCommand('crash info')
+ @with_crashes
+ def do_info(self, id: str) -> Tuple[int, str, str]:
+ """
+ show crash dump metadata
+ """
+ crashid = id
+ assert self.crashes is not None
crash = self.crashes.get(crashid)
if not crash:
return errno.EINVAL, '', 'crash info: %s not found' % crashid
val = json.dumps(crash, indent=4, sort_keys=True)
return 0, val, ''
- def do_post(self, cmd, inbuf):
+ @CLICommand('crash post')
+ def do_post(self, inbuf: str) -> Tuple[int, str, str]:
+ """
+ Add a crash dump (use -i <jsonfile>)
+ """
try:
metadata = self.validate_crash_metadata(inbuf)
except Exception as e:
return errno.EINVAL, '', 'malformed crash metadata: %s' % e
if 'backtrace' in metadata:
- metadata['stack_sig'] = self.calc_sig(
- metadata.get('backtrace'), metadata.get('assert_msg'))
- crashid = metadata['crash_id']
-
+ backtrace = cast(List[str], metadata.get('backtrace'))
+ assert_msg = cast(Optional[str], metadata.get('assert_msg'))
+ metadata['stack_sig'] = self.calc_sig(backtrace, assert_msg)
+ crashid = cast(str, metadata['crash_id'])
+ assert self.crashes is not None
if crashid not in self.crashes:
self.crashes[crashid] = metadata
key = 'crash/%s' % crashid
self._refresh_health_checks()
return 0, '', ''
- def ls(self):
+ def ls(self) -> Tuple[int, str, str]:
if not self.crashes:
self._load_crashes()
- return self.do_ls({'prefix': 'crash ls'}, '')
+ return self.do_ls_all('')
- def do_ls(self, cmd, inbuf):
- if cmd['prefix'] == 'crash ls':
- t = self.crashes.values()
- else:
- t = [crash for crashid, crash in self.crashes.items()
- if 'archived' not in crash]
- r = sorted(t, key=lambda i: i.get('crash_id'))
- if cmd.get('format') == 'json' or cmd.get('format') == 'json-pretty':
+ def _do_ls(self, t: Iterable[CrashT], format: Optional[str]) -> Tuple[int, str, str]:
+ r = sorted(t, key=lambda i: i['crash_id'])
+ if format in ('json', 'json-pretty'):
return 0, json.dumps(r, indent=4, sort_keys=True), ''
else:
table = PrettyTable(['ID', 'ENTITY', 'NEW'],
table.align['ENTITY'] = 'l'
for c in r:
table.add_row([c.get('crash_id'),
- c.get('entity_name','unknown'),
+ c.get('entity_name', 'unknown'),
'' if 'archived' in c else '*'])
return 0, table.get_string(), ''
- def do_rm(self, cmd, inbuf):
- crashid = cmd['id']
+ @CLIReadCommand('crash ls')
+ @with_crashes
+ def do_ls_all(self, format: Optional[str] = None) -> Tuple[int, str, str]:
+ """
+ Show new and archived crash dumps
+ """
+ assert self.crashes is not None
+ return self._do_ls(self.crashes.values(), format)
+
+ @CLIReadCommand('crash ls-new')
+ @with_crashes
+ def do_ls_new(self, format: Optional[str] = None) -> Tuple[int, str, str]:
+ """
+ Show new crash dumps
+ """
+ assert self.crashes is not None
+ t = [crash for crashid, crash in self.crashes.items()
+ if 'archived' not in crash]
+ return self._do_ls(t, format)
+
+ @CLICommand('crash rm')
+ @with_crashes
+ def do_rm(self, id: str) -> Tuple[int, str, str]:
+ """
+ Remove a saved crash <id>
+ """
+ crashid = id
+ assert self.crashes is not None
if crashid in self.crashes:
del self.crashes[crashid]
key = 'crash/%s' % crashid
self._refresh_health_checks()
return 0, '', ''
- def do_prune(self, cmd, inbuf):
- keep = cmd['keep']
- try:
- keep = int(keep)
- except ValueError:
- return errno.EINVAL, '', 'keep argument must be integer'
-
- self._prune(keep * 60*60*24)
+ @CLICommand('crash prune')
+ @with_crashes
+ def do_prune(self, keep: int) -> Tuple[int, str, str]:
+ """
+ Remove crashes older than <keep> days
+ """
+ self._prune(keep * datetime.timedelta(days=1).total_seconds())
return 0, '', ''
- def _prune(self, seconds):
+ def _prune(self, seconds: float) -> None:
now = datetime.datetime.utcnow()
cutoff = now - datetime.timedelta(seconds=seconds)
removed_any = False
# make a copy of the list, since we'll modify self.crashes below
to_prune = list(self.timestamp_filter(lambda ts: ts <= cutoff))
+ assert self.crashes is not None
for crashid, crash in to_prune:
del self.crashes[crashid]
key = 'crash/%s' % crashid
if removed_any:
self._refresh_health_checks()
- def do_archive(self, cmd, inbuf):
- crashid = cmd['id']
+ @CLIWriteCommand('crash archive')
+ @with_crashes
+ def do_archive(self, id: str) -> Tuple[int, str, str]:
+ """
+ Acknowledge a crash and silence health warning(s)
+ """
+ crashid = id
+ assert self.crashes is not None
crash = self.crashes.get(crashid)
if not crash:
return errno.EINVAL, '', 'crash info: %s not found' % crashid
self._refresh_health_checks()
return 0, '', ''
- def do_archive_all(self, cmd, inbuf):
+ @CLIWriteCommand('crash archive-all')
+ @with_crashes
+ def do_archive_all(self) -> Tuple[int, str, str]:
+ """
+ Acknowledge all new crashes and silence health warning(s)
+ """
+ assert self.crashes is not None
for crashid, crash in self.crashes.items():
if not crash.get('archived'):
crash['archived'] = str(datetime.datetime.utcnow())
self._refresh_health_checks()
return 0, '', ''
- def do_stat(self, cmd, inbuf):
+ @CLIReadCommand('crash stat')
+ @with_crashes
+ def do_stat(self) -> Tuple[int, str, str]:
+ """
+ Summarize recorded crashes
+ """
# age in days for reporting, ordered smallest first
- bins = [1, 3, 7]
+ AGE_IN_DAYS = [1, 3, 7]
retlines = list()
- def binstr(bindict):
+ BinnedStatsT = Dict[str, Union[int, datetime.datetime, List[str]]]
+
+ def binstr(bindict: BinnedStatsT) -> str:
binlines = list()
- count = len(bindict['idlist'])
+ id_list = cast(List[str], bindict['idlist'])
+ count = len(id_list)
if count:
binlines.append(
'%d older than %s days old:' % (count, bindict['age'])
)
- for crashid in bindict['idlist']:
+ for crashid in id_list:
binlines.append(crashid)
return '\n'.join(binlines)
total = 0
now = datetime.datetime.utcnow()
- for i, age in enumerate(bins):
+ bins: List[BinnedStatsT] = []
+ for age in AGE_IN_DAYS:
agelimit = now - datetime.timedelta(days=age)
- bins[i] = {
+ bins.append({
'age': age,
'agelimit': agelimit,
'idlist': list()
- }
+ })
+ assert self.crashes is not None
for crashid, crash in self.crashes.items():
total += 1
- stamp = self.time_from_string(crash['timestamp'])
- for i, bindict in enumerate(bins):
- if stamp <= bindict['agelimit']:
- bindict['idlist'].append(crashid)
+ stamp = self.time_from_string(cast(str, crash['timestamp']))
+ for bindict in bins:
+ if stamp <= cast(datetime.datetime, bindict['agelimit']):
+ cast(List[str], bindict['idlist']).append(crashid)
# don't count this one again
continue
retlines.append(binstr(bindict))
return 0, '\n'.join(retlines), ''
- def do_json_report(self, cmd, inbuf):
+ @CLIReadCommand('crash json_report')
+ @with_crashes
+ def do_json_report(self, hours: int) -> Tuple[int, str, str]:
"""
- Return a machine readable summary of recent crashes.
+ Crashes in the last <hours> hours
"""
- try:
- hours = int(cmd['hours'])
- except ValueError:
- return errno.EINVAL, '', '<hours> argument must be integer'
-
- report = defaultdict(lambda: 0)
+ # Return a machine readable summary of recent crashes.
+ report: DefaultDict[str, int] = defaultdict(lambda: 0)
+ assert self.crashes is not None
for crashid, crash in self.crashes.items():
- pname = crash.get("process_name", "unknown")
+ pname = cast(str, crash.get("process_name", "unknown"))
if not pname:
pname = "unknown"
report[pname] += 1
return 0, '', json.dumps(report, sort_keys=True)
- def self_test(self):
+ def self_test(self) -> None:
# test time conversion
timestr = '2018-06-22T20:35:38.058818Z'
old_timestr = '2018-06-22 20:35:38.058818Z'
dt = self.time_from_string(old_timestr)
if dt != datetime.datetime(2018, 6, 22, 20, 35, 38, 58818):
raise RuntimeError('time_from_string() (old) failed')
-
- COMMANDS = [
- {
- 'cmd': 'crash info name=id,type=CephString',
- 'desc': 'show crash dump metadata',
- 'perm': 'r',
- 'handler': do_info,
- },
- {
- 'cmd': 'crash ls',
- 'desc': 'Show new and archived crash dumps',
- 'perm': 'r',
- 'handler': do_ls,
- },
- {
- 'cmd': 'crash ls-new',
- 'desc': 'Show new crash dumps',
- 'perm': 'r',
- 'handler': do_ls,
- },
- {
- 'cmd': 'crash post',
- 'desc': 'Add a crash dump (use -i <jsonfile>)',
- 'perm': 'rw',
- 'handler': do_post,
- },
- {
- 'cmd': 'crash prune name=keep,type=CephString',
- 'desc': 'Remove crashes older than <keep> days',
- 'perm': 'rw',
- 'handler': do_prune,
- },
- {
- 'cmd': 'crash rm name=id,type=CephString',
- 'desc': 'Remove a saved crash <id>',
- 'perm': 'rw',
- 'handler': do_rm,
- },
- {
- 'cmd': 'crash stat',
- 'desc': 'Summarize recorded crashes',
- 'perm': 'r',
- 'handler': do_stat,
- },
- {
- 'cmd': 'crash json_report name=hours,type=CephString',
- 'desc': 'Crashes in the last <hours> hours',
- 'perm': 'r',
- 'handler': do_json_report,
- },
- {
- 'cmd': 'crash archive name=id,type=CephString',
- 'desc': 'Acknowledge a crash and silence health warning(s)',
- 'perm': 'w',
- 'handler': do_archive,
- },
- {
- 'cmd': 'crash archive-all',
- 'desc': 'Acknowledge all new crashes and silence health warning(s)',
- 'perm': 'w',
- 'handler': do_archive_all,
- },
- ]