]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/devicehealth/module.py
import ceph 14.2.5
[ceph.git] / ceph / src / pybind / mgr / devicehealth / module.py
CommitLineData
11fdf7f2
TL
1"""
2Device health monitoring
3"""
4
5import errno
6import json
7from mgr_module import MgrModule, CommandResult
8import operator
9import rados
10from threading import Event
11from datetime import datetime, timedelta, date, time
eafe8130 12import _strptime
11fdf7f2
TL
13from six import iteritems
14
15TIME_FORMAT = '%Y%m%d-%H%M%S'
16
17DEVICE_HEALTH = 'DEVICE_HEALTH'
18DEVICE_HEALTH_IN_USE = 'DEVICE_HEALTH_IN_USE'
19DEVICE_HEALTH_TOOMANY = 'DEVICE_HEALTH_TOOMANY'
20HEALTH_MESSAGES = {
21 DEVICE_HEALTH: '%d device(s) expected to fail soon',
22 DEVICE_HEALTH_IN_USE: '%d daemons(s) expected to fail soon and still contain data',
23 DEVICE_HEALTH_TOOMANY: 'Too many daemons are expected to fail soon',
24}
25
eafe8130
TL
26MAX_SAMPLES=500
27
11fdf7f2
TL
28
29class Module(MgrModule):
30 MODULE_OPTIONS = [
31 {
32 'name': 'enable_monitoring',
33 'default': False,
34 'type': 'bool',
35 'desc': 'monitor device health metrics',
36 'runtime': True,
37 },
38 {
39 'name': 'scrape_frequency',
40 'default': 86400,
41 'type': 'secs',
42 'desc': 'how frequently to scrape device health metrics',
43 'runtime': True,
44 },
45 {
46 'name': 'pool_name',
47 'default': 'device_health_metrics',
48 'type': 'str',
49 'desc': 'name of pool in which to store device health metrics',
50 'runtime': True,
51 },
52 {
53 'name': 'retention_period',
54 'default': (86400 * 180),
55 'type': 'secs',
56 'desc': 'how long to retain device health metrics',
57 'runtime': True,
58 },
59 {
60 'name': 'mark_out_threshold',
61 'default': (86400 * 14 * 2),
62 'type': 'secs',
63 'desc': 'automatically mark OSD if it may fail before this long',
64 'runtime': True,
65 },
66 {
67 'name': 'warn_threshold',
68 'default': (86400 * 14 * 6),
69 'type': 'secs',
70 'desc': 'raise health warning if OSD may fail before this long',
71 'runtime': True,
72 },
73 {
74 'name': 'self_heal',
75 'default': True,
76 'type': 'bool',
77 'desc': 'preemptively heal cluster around devices that may fail',
78 'runtime': True,
79 },
80 {
81 'name': 'sleep_interval',
82 'default': 600,
83 'type': 'secs',
84 'desc': 'how frequently to wake up and check device health',
85 'runtime': True,
86 },
87 ]
88
89 COMMANDS = [
90 {
91 "cmd": "device query-daemon-health-metrics "
92 "name=who,type=CephString",
93 "desc": "Get device health metrics for a given daemon",
94 "perm": "r"
95 },
96 {
97 "cmd": "device scrape-daemon-health-metrics "
98 "name=who,type=CephString",
99 "desc": "Scrape and store device health metrics "
100 "for a given daemon",
101 "perm": "r"
102 },
103 {
104 "cmd": "device scrape-health-metrics "
105 "name=devid,type=CephString,req=False",
106 "desc": "Scrape and store health metrics",
107 "perm": "r"
108 },
109 {
110 "cmd": "device get-health-metrics "
111 "name=devid,type=CephString "
112 "name=sample,type=CephString,req=False",
113 "desc": "Show stored device metrics for the device",
114 "perm": "r"
115 },
116 {
117 "cmd": "device check-health",
118 "desc": "Check life expectancy of devices",
119 "perm": "rw",
120 },
121 {
122 "cmd": "device monitoring on",
123 "desc": "Enable device health monitoring",
124 "perm": "rw",
125 },
126 {
127 "cmd": "device monitoring off",
128 "desc": "Disable device health monitoring",
129 "perm": "rw",
130 },
131 {
132 'cmd': 'device predict-life-expectancy '
133 'name=devid,type=CephString,req=true',
134 'desc': 'Predict life expectancy with local predictor',
135 'perm': 'r'
136 },
137 ]
138
139 def __init__(self, *args, **kwargs):
140 super(Module, self).__init__(*args, **kwargs)
141
142 # populate options (just until serve() runs)
143 for opt in self.MODULE_OPTIONS:
144 setattr(self, opt['name'], opt['default'])
145
146 # other
147 self.run = True
148 self.event = Event()
149
150 def is_valid_daemon_name(self, who):
151 l = who.split('.')
152 if len(l) != 2:
153 return False
154 if l[0] not in ('osd', 'mon'):
155 return False;
156 return True;
157
158 def handle_command(self, _, cmd):
159 self.log.error("handle_command")
160
161 if cmd['prefix'] == 'device query-daemon-health-metrics':
162 who = cmd.get('who', '')
163 if not self.is_valid_daemon_name(who):
164 return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
165 (daemon_type, daemon_id) = cmd.get('who', '').split('.')
166 result = CommandResult('')
167 self.send_command(result, daemon_type, daemon_id, json.dumps({
168 'prefix': 'smart',
169 'format': 'json',
170 }), '')
171 r, outb, outs = result.wait()
172 return r, outb, outs
173 elif cmd['prefix'] == 'device scrape-daemon-health-metrics':
174 who = cmd.get('who', '')
175 if not self.is_valid_daemon_name(who):
176 return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
177 (daemon_type, daemon_id) = cmd.get('who', '').split('.')
178 return self.scrape_daemon(daemon_type, daemon_id)
179 elif cmd['prefix'] == 'device scrape-health-metrics':
180 if 'devid' in cmd:
181 return self.scrape_device(cmd['devid'])
182 return self.scrape_all()
183 elif cmd['prefix'] == 'device get-health-metrics':
184 return self.show_device_metrics(cmd['devid'], cmd.get('sample'))
185 elif cmd['prefix'] == 'device check-health':
186 return self.check_health()
187 elif cmd['prefix'] == 'device monitoring on':
188 self.set_module_option('enable_monitoring', True)
189 self.event.set()
190 return 0, '', ''
191 elif cmd['prefix'] == 'device monitoring off':
192 self.set_module_option('enable_monitoring', False)
193 self.set_health_checks({}) # avoid stuck health alerts
194 return 0, '', ''
195 elif cmd['prefix'] == 'device predict-life-expectancy':
196 return self.predict_lift_expectancy(cmd['devid'])
197 else:
198 # mgr should respect our self.COMMANDS and not call us for
199 # any prefix we don't advertise
200 raise NotImplementedError(cmd['prefix'])
201
202 def self_test(self):
203 self.config_notify()
204 osdmap = self.get('osd_map')
205 osd_id = osdmap['osds'][0]['osd']
206 osdmeta = self.get('osd_metadata')
207 devs = osdmeta.get(str(osd_id), {}).get('device_ids')
208 if devs:
209 devid = devs.split()[0].split('=')[1]
210 (r, before, err) = self.show_device_metrics(devid, '')
211 assert r == 0
212 (r, out, err) = self.scrape_device(devid)
213 assert r == 0
214 (r, after, err) = self.show_device_metrics(devid, '')
215 assert r == 0
216 assert before != after
217
218 def config_notify(self):
219 for opt in self.MODULE_OPTIONS:
220 setattr(self,
221 opt['name'],
222 self.get_module_option(opt['name']))
223 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
224
225 def serve(self):
226 self.log.info("Starting")
227 self.config_notify()
228
229 last_scrape = None
230 ls = self.get_store('last_scrape')
231 if ls:
232 try:
233 last_scrape = datetime.strptime(ls, TIME_FORMAT)
234 except ValueError as e:
235 pass
236 self.log.debug('Last scrape %s', last_scrape)
237
238 while self.run:
239 if self.enable_monitoring:
240 self.log.debug('Running')
241 self.check_health()
242
243 now = datetime.utcnow()
244 if not last_scrape:
245 next_scrape = now
246 else:
247 # align to scrape interval
248 scrape_frequency = int(self.scrape_frequency) or 86400
249 seconds = (last_scrape - datetime.utcfromtimestamp(0)).total_seconds()
250 seconds -= seconds % scrape_frequency
251 seconds += scrape_frequency
252 next_scrape = datetime.utcfromtimestamp(seconds)
253 if last_scrape:
254 self.log.debug('Last scrape %s, next scrape due %s',
255 last_scrape.strftime(TIME_FORMAT),
256 next_scrape.strftime(TIME_FORMAT))
257 else:
258 self.log.debug('Last scrape never, next scrape due %s',
259 next_scrape.strftime(TIME_FORMAT))
260 if now >= next_scrape:
261 self.scrape_all()
262 self.predict_all_devices()
263 last_scrape = now
264 self.set_store('last_scrape', last_scrape.strftime(TIME_FORMAT))
265
266 # sleep
267 sleep_interval = int(self.sleep_interval) or 60
268 self.log.debug('Sleeping for %d seconds', sleep_interval)
269 ret = self.event.wait(sleep_interval)
270 self.event.clear()
271
272 def shutdown(self):
273 self.log.info('Stopping')
274 self.run = False
275 self.event.set()
276
277 def open_connection(self, create_if_missing=True):
278 pools = self.rados.list_pools()
279 is_pool = False
280 for pool in pools:
281 if pool == self.pool_name:
282 is_pool = True
283 break
284 if not is_pool:
285 if not create_if_missing:
286 return None
287 self.log.debug('create %s pool' % self.pool_name)
288 # create pool
289 result = CommandResult('')
290 self.send_command(result, 'mon', '', json.dumps({
291 'prefix': 'osd pool create',
292 'format': 'json',
293 'pool': self.pool_name,
294 'pg_num': 1,
295 'pg_num_min': 1,
296 }), '')
297 r, outb, outs = result.wait()
298 assert r == 0
299
300 # set pool application
301 result = CommandResult('')
302 self.send_command(result, 'mon', '', json.dumps({
303 'prefix': 'osd pool application enable',
304 'format': 'json',
305 'pool': self.pool_name,
306 'app': 'mgr_devicehealth',
307 }), '')
308 r, outb, outs = result.wait()
309 assert r == 0
310
311 ioctx = self.rados.open_ioctx(self.pool_name)
312 return ioctx
313
314 def scrape_daemon(self, daemon_type, daemon_id):
315 ioctx = self.open_connection()
eafe8130
TL
316 if daemon_type != 'osd':
317 return -errno.EINVAL, '', 'scraping non-OSDs not currently supported'
11fdf7f2
TL
318 raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id)
319 if raw_smart_data:
320 for device, raw_data in raw_smart_data.items():
321 data = self.extract_smart_features(raw_data)
322 self.put_device_metrics(ioctx, device, data)
323 ioctx.close()
324 return 0, "", ""
325
326 def scrape_all(self):
327 osdmap = self.get("osd_map")
328 assert osdmap is not None
329 ioctx = self.open_connection()
330 did_device = {}
331 ids = []
332 for osd in osdmap['osds']:
333 ids.append(('osd', str(osd['osd'])))
11fdf7f2
TL
334 for daemon_type, daemon_id in ids:
335 raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id)
336 if not raw_smart_data:
337 continue
338 for device, raw_data in raw_smart_data.items():
339 if device in did_device:
340 self.log.debug('skipping duplicate %s' % device)
341 continue
342 did_device[device] = 1
343 data = self.extract_smart_features(raw_data)
344 self.put_device_metrics(ioctx, device, data)
345 ioctx.close()
346 return 0, "", ""
347
348 def scrape_device(self, devid):
349 r = self.get("device " + devid)
350 if not r or 'device' not in r.keys():
351 return -errno.ENOENT, '', 'device ' + devid + ' not found'
eafe8130 352 daemons = [d for d in r['device'].get('daemons', []) if not d.startswith('osd.')]
11fdf7f2
TL
353 if not daemons:
354 return (-errno.EAGAIN, '',
eafe8130 355 'device ' + devid + ' not claimed by any active OSD daemons')
11fdf7f2
TL
356 (daemon_type, daemon_id) = daemons[0].split('.')
357 ioctx = self.open_connection()
358 raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id,
359 devid=devid)
360 if raw_smart_data:
361 for device, raw_data in raw_smart_data.items():
362 data = self.extract_smart_features(raw_data)
363 self.put_device_metrics(ioctx, device, data)
364 ioctx.close()
365 return 0, "", ""
366
367 def do_scrape_daemon(self, daemon_type, daemon_id, devid=''):
368 """
369 :return: a dict, or None if the scrape failed.
370 """
371 self.log.debug('do_scrape_daemon %s.%s' % (daemon_type, daemon_id))
372 result = CommandResult('')
373 self.send_command(result, daemon_type, daemon_id, json.dumps({
374 'prefix': 'smart',
375 'format': 'json',
376 'devid': devid,
377 }), '')
378 r, outb, outs = result.wait()
379
380 try:
381 return json.loads(outb)
382 except (IndexError, ValueError):
383 self.log.error(
384 "Fail to parse JSON result from daemon {0}.{1} ({2})".format(
385 daemon_type, daemon_id, outb))
386
387 def put_device_metrics(self, ioctx, devid, data):
388 old_key = datetime.utcnow() - timedelta(
389 seconds=int(self.retention_period))
390 prune = old_key.strftime(TIME_FORMAT)
391 self.log.debug('put_device_metrics device %s prune %s' %
392 (devid, prune))
393 erase = []
394 try:
395 with rados.ReadOpCtx() as op:
eafe8130 396 omap_iter, ret = ioctx.get_omap_keys(op, "", MAX_SAMPLES) # fixme
11fdf7f2
TL
397 assert ret == 0
398 ioctx.operate_read_op(op, devid)
399 for key, _ in list(omap_iter):
400 if key >= prune:
401 break
402 erase.append(key)
403 except rados.ObjectNotFound:
404 # The object doesn't already exist, no problem.
405 pass
406 except rados.Error as e:
407 # Do not proceed with writes if something unexpected
408 # went wrong with the reads.
409 self.log.exception("Error reading OMAP: {0}".format(e))
410 return
411
412 key = datetime.utcnow().strftime(TIME_FORMAT)
413 self.log.debug('put_device_metrics device %s key %s = %s, erase %s' %
414 (devid, key, data, erase))
415 with rados.WriteOpCtx() as op:
416 ioctx.set_omap(op, (key,), (str(json.dumps(data)),))
417 if len(erase):
418 ioctx.remove_omap_keys(op, tuple(erase))
419 ioctx.operate_write_op(op, devid)
420
421 def show_device_metrics(self, devid, sample):
422 # verify device exists
423 r = self.get("device " + devid)
424 if not r or 'device' not in r.keys():
425 return -errno.ENOENT, '', 'device ' + devid + ' not found'
426 # fetch metrics
427 res = {}
428 ioctx = self.open_connection(create_if_missing=False)
429 if not ioctx:
430 return 0, json.dumps(res, indent=4), ''
431 with ioctx:
432 with rados.ReadOpCtx() as op:
eafe8130
TL
433 omap_iter, ret = ioctx.get_omap_vals(op, "", sample or '',
434 MAX_SAMPLES) # fixme
11fdf7f2
TL
435 assert ret == 0
436 try:
437 ioctx.operate_read_op(op, devid)
438 for key, value in list(omap_iter):
439 if sample and key != sample:
440 break
441 try:
442 v = json.loads(value)
443 except (ValueError, IndexError):
444 self.log.debug('unable to parse value for %s: "%s"' %
445 (key, value))
446 pass
447 res[key] = v
448 except rados.ObjectNotFound:
449 pass
450 except rados.Error as e:
451 self.log.exception("RADOS error reading omap: {0}".format(e))
452 raise
453
454 return 0, json.dumps(res, indent=4), ''
455
456 def check_health(self):
457 self.log.info('Check health')
458 config = self.get('config')
459 min_in_ratio = float(config.get('mon_osd_min_in_ratio'))
460 mark_out_threshold_td = timedelta(seconds=int(self.mark_out_threshold))
461 warn_threshold_td = timedelta(seconds=int(self.warn_threshold))
462 checks = {}
463 health_warnings = {
464 DEVICE_HEALTH: [],
465 DEVICE_HEALTH_IN_USE: [],
466 }
467 devs = self.get("devices")
468 osds_in = {}
469 osds_out = {}
470 now = datetime.utcnow()
471 osdmap = self.get("osd_map")
472 assert osdmap is not None
473 for dev in devs['devices']:
474 devid = dev['devid']
475 if 'life_expectancy_max' not in dev:
476 continue
477 # ignore devices that are not consumed by any daemons
478 if not dev['daemons']:
479 continue
480 if not dev['life_expectancy_max'] or \
481 dev['life_expectancy_max'] == '0.000000':
482 continue
483 # life_expectancy_(min/max) is in the format of:
484 # '%Y-%m-%d %H:%M:%S.%f', e.g.:
485 # '2019-01-20 21:12:12.000000'
486 life_expectancy_max = datetime.strptime(
487 dev['life_expectancy_max'],
488 '%Y-%m-%d %H:%M:%S.%f')
489 self.log.debug('device %s expectancy max %s', dev,
490 life_expectancy_max)
491
492 if life_expectancy_max - now <= mark_out_threshold_td:
493 if self.self_heal:
494 # dev['daemons'] == ["osd.0","osd.1","osd.2"]
495 if dev['daemons']:
496 osds = [x for x in dev['daemons']
497 if x.startswith('osd.')]
498 osd_ids = map(lambda x: x[4:], osds)
499 for _id in osd_ids:
500 if self.is_osd_in(osdmap, _id):
501 osds_in[_id] = life_expectancy_max
502 else:
503 osds_out[_id] = 1
504
505 if life_expectancy_max - now <= warn_threshold_td:
506 # device can appear in more than one location in case
507 # of SCSI multipath
508 device_locations = map(lambda x: x['host'] + ':' + x['dev'],
509 dev['location'])
510 health_warnings[DEVICE_HEALTH].append(
511 '%s (%s); daemons %s; life expectancy between %s and %s'
512 % (dev['devid'],
513 ','.join(device_locations),
514 ','.join(dev.get('daemons', ['none'])),
515 dev['life_expectancy_max'],
516 dev.get('life_expectancy_max', 'unknown')))
517
518 # OSD might be marked 'out' (which means it has no
519 # data), however PGs are still attached to it.
520 for _id in osds_out:
521 num_pgs = self.get_osd_num_pgs(_id)
522 if num_pgs > 0:
523 health_warnings[DEVICE_HEALTH_IN_USE].append(
524 'osd.%s is marked out '
525 'but still has %s PG(s)' %
526 (_id, num_pgs))
527 if osds_in:
528 self.log.debug('osds_in %s' % osds_in)
529 # calculate target in ratio
530 num_osds = len(osdmap['osds'])
531 num_in = len([x for x in osdmap['osds'] if x['in']])
532 num_bad = len(osds_in)
533 # sort with next-to-fail first
534 bad_osds = sorted(osds_in.items(), key=operator.itemgetter(1))
535 did = 0
536 to_mark_out = []
537 for osd_id, when in bad_osds:
538 ratio = float(num_in - did - 1) / float(num_osds)
539 if ratio < min_in_ratio:
540 final_ratio = float(num_in - num_bad) / float(num_osds)
541 checks[DEVICE_HEALTH_TOOMANY] = {
542 'severity': 'warning',
543 'summary': HEALTH_MESSAGES[DEVICE_HEALTH_TOOMANY],
544 'detail': [
545 '%d OSDs with failing device(s) would bring "in" ratio to %f < mon_osd_min_in_ratio %f' % (num_bad - did, final_ratio, min_in_ratio)
546 ]
547 }
548 break
549 to_mark_out.append(osd_id)
550 did += 1
551 if to_mark_out:
552 self.mark_out_etc(to_mark_out)
553 for warning, ls in iteritems(health_warnings):
554 n = len(ls)
555 if n:
556 checks[warning] = {
557 'severity': 'warning',
558 'summary': HEALTH_MESSAGES[warning] % n,
559 'detail': ls,
560 }
561 self.set_health_checks(checks)
562 return 0, "", ""
563
564 def is_osd_in(self, osdmap, osd_id):
565 for osd in osdmap['osds']:
566 if str(osd_id) == str(osd['osd']):
567 return bool(osd['in'])
568 return False
569
570 def get_osd_num_pgs(self, osd_id):
571 stats = self.get('osd_stats')
572 assert stats is not None
573 for stat in stats['osd_stats']:
574 if str(osd_id) == str(stat['osd']):
575 return stat['num_pgs']
576 return -1
577
578 def mark_out_etc(self, osd_ids):
579 self.log.info('Marking out OSDs: %s' % osd_ids)
580 result = CommandResult('')
581 self.send_command(result, 'mon', '', json.dumps({
582 'prefix': 'osd out',
583 'format': 'json',
584 'ids': osd_ids,
585 }), '')
586 r, outb, outs = result.wait()
587 if r != 0:
588 self.log.warn('Could not mark OSD %s out. r: [%s], outb: [%s], outs: [%s]' % (osd_ids, r, outb, outs))
589 for osd_id in osd_ids:
590 result = CommandResult('')
591 self.send_command(result, 'mon', '', json.dumps({
592 'prefix': 'osd primary-affinity',
593 'format': 'json',
594 'id': int(osd_id),
595 'weight': 0.0,
596 }), '')
597 r, outb, outs = result.wait()
598 if r != 0:
599 self.log.warn('Could not set osd.%s primary-affinity, r: [%s], outs: [%s]' % (osd_id, r, outb, outs))
600
601 def extract_smart_features(self, raw):
602 # FIXME: extract and normalize raw smartctl --json output and
603 # generate a dict of the fields we care about.
604 return raw
605
606 def predict_lift_expectancy(self, devid):
607 plugin_name = ''
608 model = self.get_ceph_option('device_failure_prediction_mode')
609 if model and model.lower() == 'cloud':
610 plugin_name = 'diskprediction_cloud'
611 elif model and model.lower() == 'local':
612 plugin_name = 'diskprediction_local'
613 else:
614 return -1, '', 'unable to enable any disk prediction model[local/cloud]'
615 try:
616 can_run, _ = self.remote(plugin_name, 'can_run')
617 if can_run:
618 return self.remote(plugin_name, 'predict_life_expectancy', devid=devid)
619 except:
620 return -1, '', 'unable to invoke diskprediction local or remote plugin'
621
622 def predict_all_devices(self):
623 plugin_name = ''
624 model = self.get_ceph_option('device_failure_prediction_mode')
625 if model and model.lower() == 'cloud':
626 plugin_name = 'diskprediction_cloud'
627 elif model and model.lower() == 'local':
628 plugin_name = 'diskprediction_local'
629 else:
630 return -1, '', 'unable to enable any disk prediction model[local/cloud]'
631 try:
632 can_run, _ = self.remote(plugin_name, 'can_run')
633 if can_run:
634 return self.remote(plugin_name, 'predict_all_devices')
635 except:
636 return -1, '', 'unable to invoke diskprediction local or remote plugin'
eafe8130
TL
637
638 def get_recent_device_metrics(self, devid, min_sample):
639 return self._get_device_metrics(devid, min_sample=min_sample)
640
641 def get_time_format(self):
642 return TIME_FORMAT