]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/devicehealth/module.py
import ceph octopus 15.2.17
[ceph.git] / ceph / src / pybind / mgr / devicehealth / module.py
1 """
2 Device health monitoring
3 """
4
5 import errno
6 import json
7 from mgr_module import MgrModule, CommandResult
8 import operator
9 import rados
10 from threading import Event
11 from datetime import datetime, timedelta, date, time, timezone
12 from six import iteritems
13
14 TIME_FORMAT = '%Y%m%d-%H%M%S'
15
16 DEVICE_HEALTH = 'DEVICE_HEALTH'
17 DEVICE_HEALTH_IN_USE = 'DEVICE_HEALTH_IN_USE'
18 DEVICE_HEALTH_TOOMANY = 'DEVICE_HEALTH_TOOMANY'
19 HEALTH_MESSAGES = {
20 DEVICE_HEALTH: '%d device(s) expected to fail soon',
21 DEVICE_HEALTH_IN_USE: '%d daemon(s) expected to fail soon and still contain data',
22 DEVICE_HEALTH_TOOMANY: 'Too many daemons are expected to fail soon',
23 }
24
25 MAX_SAMPLES=500
26
27
28 class Module(MgrModule):
29 MODULE_OPTIONS = [
30 {
31 'name': 'enable_monitoring',
32 'default': True,
33 'type': 'bool',
34 'desc': 'monitor device health metrics',
35 'runtime': True,
36 },
37 {
38 'name': 'scrape_frequency',
39 'default': 86400,
40 'type': 'secs',
41 'desc': 'how frequently to scrape device health metrics',
42 'runtime': True,
43 },
44 {
45 'name': 'pool_name',
46 'default': 'device_health_metrics',
47 'type': 'str',
48 'desc': 'name of pool in which to store device health metrics',
49 'runtime': True,
50 },
51 {
52 'name': 'retention_period',
53 'default': (86400 * 180),
54 'type': 'secs',
55 'desc': 'how long to retain device health metrics',
56 'runtime': True,
57 },
58 {
59 'name': 'mark_out_threshold',
60 'default': (86400 * 14 * 2),
61 'type': 'secs',
62 'desc': 'automatically mark OSD if it may fail before this long',
63 'runtime': True,
64 },
65 {
66 'name': 'warn_threshold',
67 'default': (86400 * 14 * 6),
68 'type': 'secs',
69 'desc': 'raise health warning if OSD may fail before this long',
70 'runtime': True,
71 },
72 {
73 'name': 'self_heal',
74 'default': True,
75 'type': 'bool',
76 'desc': 'preemptively heal cluster around devices that may fail',
77 'runtime': True,
78 },
79 {
80 'name': 'sleep_interval',
81 'default': 600,
82 'type': 'secs',
83 'desc': 'how frequently to wake up and check device health',
84 'runtime': True,
85 },
86 ]
87
88 COMMANDS = [
89 {
90 "cmd": "device query-daemon-health-metrics "
91 "name=who,type=CephString",
92 "desc": "Get device health metrics for a given daemon",
93 "perm": "r"
94 },
95 {
96 "cmd": "device scrape-daemon-health-metrics "
97 "name=who,type=CephString",
98 "desc": "Scrape and store device health metrics "
99 "for a given daemon",
100 "perm": "r"
101 },
102 {
103 "cmd": "device scrape-health-metrics "
104 "name=devid,type=CephString,req=False",
105 "desc": "Scrape and store health metrics",
106 "perm": "r"
107 },
108 {
109 "cmd": "device get-health-metrics "
110 "name=devid,type=CephString "
111 "name=sample,type=CephString,req=False",
112 "desc": "Show stored device metrics for the device",
113 "perm": "r"
114 },
115 {
116 "cmd": "device check-health",
117 "desc": "Check life expectancy of devices",
118 "perm": "rw",
119 },
120 {
121 "cmd": "device monitoring on",
122 "desc": "Enable device health monitoring",
123 "perm": "rw",
124 },
125 {
126 "cmd": "device monitoring off",
127 "desc": "Disable device health monitoring",
128 "perm": "rw",
129 },
130 {
131 'cmd': 'device predict-life-expectancy '
132 'name=devid,type=CephString,req=true',
133 'desc': 'Predict life expectancy with local predictor',
134 'perm': 'r'
135 },
136 ]
137
138 def __init__(self, *args, **kwargs):
139 super(Module, self).__init__(*args, **kwargs)
140
141 # populate options (just until serve() runs)
142 for opt in self.MODULE_OPTIONS:
143 setattr(self, opt['name'], opt['default'])
144
145 # other
146 self.run = True
147 self.event = Event()
148 self.has_device_pool = False
149
150 def is_valid_daemon_name(self, who):
151 l = who.split('.')
152 if len(l) != 2:
153 return False
154 if l[0] not in ('osd', 'mon'):
155 return False;
156 return True;
157
158 def handle_command(self, _, cmd):
159 self.log.error("handle_command")
160
161 if cmd['prefix'] == 'device query-daemon-health-metrics':
162 who = cmd.get('who', '')
163 if not self.is_valid_daemon_name(who):
164 return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
165 (daemon_type, daemon_id) = cmd.get('who', '').split('.')
166 result = CommandResult('')
167 self.send_command(result, daemon_type, daemon_id, json.dumps({
168 'prefix': 'smart',
169 'format': 'json',
170 }), '')
171 r, outb, outs = result.wait()
172 return r, outb, outs
173 elif cmd['prefix'] == 'device scrape-daemon-health-metrics':
174 who = cmd.get('who', '')
175 if not self.is_valid_daemon_name(who):
176 return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
177 (daemon_type, daemon_id) = cmd.get('who', '').split('.')
178 return self.scrape_daemon(daemon_type, daemon_id)
179 elif cmd['prefix'] == 'device scrape-health-metrics':
180 if 'devid' in cmd:
181 return self.scrape_device(cmd['devid'])
182 return self.scrape_all()
183 elif cmd['prefix'] == 'device get-health-metrics':
184 return self.show_device_metrics(cmd['devid'], cmd.get('sample'))
185 elif cmd['prefix'] == 'device check-health':
186 return self.check_health()
187 elif cmd['prefix'] == 'device monitoring on':
188 self.set_module_option('enable_monitoring', True)
189 self.event.set()
190 return 0, '', ''
191 elif cmd['prefix'] == 'device monitoring off':
192 self.set_module_option('enable_monitoring', False)
193 self.set_health_checks({}) # avoid stuck health alerts
194 return 0, '', ''
195 elif cmd['prefix'] == 'device predict-life-expectancy':
196 return self.predict_lift_expectancy(cmd['devid'])
197 else:
198 # mgr should respect our self.COMMANDS and not call us for
199 # any prefix we don't advertise
200 raise NotImplementedError(cmd['prefix'])
201
202 def self_test(self):
203 self.config_notify()
204 osdmap = self.get('osd_map')
205 osd_id = osdmap['osds'][0]['osd']
206 osdmeta = self.get('osd_metadata')
207 devs = osdmeta.get(str(osd_id), {}).get('device_ids')
208 if devs:
209 devid = devs.split()[0].split('=')[1]
210 (r, before, err) = self.show_device_metrics(devid, '')
211 assert r == 0
212 (r, out, err) = self.scrape_device(devid)
213 assert r == 0
214 (r, after, err) = self.show_device_metrics(devid, '')
215 assert r == 0
216 assert before != after
217
218 def config_notify(self):
219 for opt in self.MODULE_OPTIONS:
220 setattr(self,
221 opt['name'],
222 self.get_module_option(opt['name']))
223 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
224
225 def notify(self, notify_type, notify_id):
226 # create device_health_metrics pool if it doesn't exist
227 if notify_type == "osd_map" and self.enable_monitoring:
228 if not self.has_device_pool:
229 self.create_device_pool()
230 self.has_device_pool = True
231
232 def create_device_pool(self):
233 self.log.debug('create %s pool' % self.pool_name)
234 # create pool
235 result = CommandResult('')
236 self.send_command(result, 'mon', '', json.dumps({
237 'prefix': 'osd pool create',
238 'format': 'json',
239 'pool': self.pool_name,
240 'pg_num': 1,
241 'pg_num_min': 1,
242 }), '')
243 r, outb, outs = result.wait()
244 assert r == 0
245 # set pool application
246 result = CommandResult('')
247 self.send_command(result, 'mon', '', json.dumps({
248 'prefix': 'osd pool application enable',
249 'format': 'json',
250 'pool': self.pool_name,
251 'app': 'mgr_devicehealth',
252 }), '')
253 r, outb, outs = result.wait()
254 assert r == 0
255
256 def serve(self):
257 self.log.info("Starting")
258 self.config_notify()
259
260 last_scrape = None
261 ls = self.get_store('last_scrape')
262 if ls:
263 try:
264 last_scrape = datetime.strptime(ls, TIME_FORMAT)
265 except ValueError as e:
266 pass
267 self.log.debug('Last scrape %s', last_scrape)
268
269 while self.run:
270 if self.enable_monitoring:
271 self.log.debug('Running')
272 self.check_health()
273
274 now = datetime.utcnow()
275 if not last_scrape:
276 next_scrape = now
277 else:
278 # align to scrape interval
279 scrape_frequency = int(self.scrape_frequency) or 86400
280 seconds = (last_scrape - datetime.utcfromtimestamp(0)).total_seconds()
281 seconds -= seconds % scrape_frequency
282 seconds += scrape_frequency
283 next_scrape = datetime.utcfromtimestamp(seconds)
284 if last_scrape:
285 self.log.debug('Last scrape %s, next scrape due %s',
286 last_scrape.strftime(TIME_FORMAT),
287 next_scrape.strftime(TIME_FORMAT))
288 else:
289 self.log.debug('Last scrape never, next scrape due %s',
290 next_scrape.strftime(TIME_FORMAT))
291 if now >= next_scrape:
292 self.scrape_all()
293 self.predict_all_devices()
294 last_scrape = now
295 self.set_store('last_scrape', last_scrape.strftime(TIME_FORMAT))
296
297 # sleep
298 sleep_interval = int(self.sleep_interval) or 60
299 self.log.debug('Sleeping for %d seconds', sleep_interval)
300 ret = self.event.wait(sleep_interval)
301 self.event.clear()
302
303 def shutdown(self):
304 self.log.info('Stopping')
305 self.run = False
306 self.event.set()
307
308 def open_connection(self, create_if_missing=True):
309 osdmap = self.get("osd_map")
310 assert osdmap is not None
311 if len(osdmap['osds']) == 0:
312 return None
313 if not self.has_device_pool:
314 if not create_if_missing:
315 return None
316 if self.enable_monitoring:
317 self.create_device_pool()
318 self.has_device_pool = True
319 ioctx = self.rados.open_ioctx(self.pool_name)
320 return ioctx
321
322 def scrape_daemon(self, daemon_type, daemon_id):
323 ioctx = self.open_connection()
324 if not ioctx:
325 return 0, "", ""
326 raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id)
327 if raw_smart_data:
328 for device, raw_data in raw_smart_data.items():
329 data = self.extract_smart_features(raw_data)
330 if device and data:
331 self.put_device_metrics(ioctx, device, data)
332 ioctx.close()
333 return 0, "", ""
334
335 def scrape_all(self):
336 osdmap = self.get("osd_map")
337 assert osdmap is not None
338 ioctx = self.open_connection()
339 if not ioctx:
340 return 0, "", ""
341 did_device = {}
342 ids = []
343 for osd in osdmap['osds']:
344 ids.append(('osd', str(osd['osd'])))
345 monmap = self.get("mon_map")
346 for mon in monmap['mons']:
347 ids.append(('mon', mon['name']))
348 for daemon_type, daemon_id in ids:
349 raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id)
350 if not raw_smart_data:
351 continue
352 for device, raw_data in raw_smart_data.items():
353 if device in did_device:
354 self.log.debug('skipping duplicate %s' % device)
355 continue
356 did_device[device] = 1
357 data = self.extract_smart_features(raw_data)
358 if device and data:
359 self.put_device_metrics(ioctx, device, data)
360 ioctx.close()
361 return 0, "", ""
362
363 def scrape_device(self, devid):
364 r = self.get("device " + devid)
365 if not r or 'device' not in r.keys():
366 return -errno.ENOENT, '', 'device ' + devid + ' not found'
367 daemons = r['device'].get('daemons', [])
368 if not daemons:
369 return (-errno.EAGAIN, '',
370 'device ' + devid + ' not claimed by any active daemons')
371 (daemon_type, daemon_id) = daemons[0].split('.')
372 ioctx = self.open_connection()
373 if not ioctx:
374 return 0, "", ""
375 raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id,
376 devid=devid)
377 if raw_smart_data:
378 for device, raw_data in raw_smart_data.items():
379 data = self.extract_smart_features(raw_data)
380 if device and data:
381 self.put_device_metrics(ioctx, device, data)
382 ioctx.close()
383 return 0, "", ""
384
385 def do_scrape_daemon(self, daemon_type, daemon_id, devid=''):
386 """
387 :return: a dict, or None if the scrape failed.
388 """
389 self.log.debug('do_scrape_daemon %s.%s' % (daemon_type, daemon_id))
390 result = CommandResult('')
391 self.send_command(result, daemon_type, daemon_id, json.dumps({
392 'prefix': 'smart',
393 'format': 'json',
394 'devid': devid,
395 }), '')
396 r, outb, outs = result.wait()
397
398 try:
399 return json.loads(outb)
400 except (IndexError, ValueError):
401 self.log.error(
402 "Fail to parse JSON result from daemon {0}.{1} ({2})".format(
403 daemon_type, daemon_id, outb))
404
405 def put_device_metrics(self, ioctx, devid, data):
406 assert devid
407 old_key = datetime.utcnow() - timedelta(
408 seconds=int(self.retention_period))
409 prune = old_key.strftime(TIME_FORMAT)
410 self.log.debug('put_device_metrics device %s prune %s' %
411 (devid, prune))
412 erase = []
413 try:
414 with rados.ReadOpCtx() as op:
415 omap_iter, ret = ioctx.get_omap_keys(op, "", MAX_SAMPLES) # fixme
416 assert ret == 0
417 ioctx.operate_read_op(op, devid)
418 for key, _ in list(omap_iter):
419 if key >= prune:
420 break
421 erase.append(key)
422 except rados.ObjectNotFound:
423 # The object doesn't already exist, no problem.
424 pass
425 except rados.Error as e:
426 # Do not proceed with writes if something unexpected
427 # went wrong with the reads.
428 self.log.exception("Error reading OMAP: {0}".format(e))
429 return
430
431 key = datetime.utcnow().strftime(TIME_FORMAT)
432 self.log.debug('put_device_metrics device %s key %s = %s, erase %s' %
433 (devid, key, data, erase))
434 with rados.WriteOpCtx() as op:
435 ioctx.set_omap(op, (key,), (str(json.dumps(data)),))
436 if len(erase):
437 ioctx.remove_omap_keys(op, tuple(erase))
438 ioctx.operate_write_op(op, devid)
439
440 def _get_device_metrics(self, devid, sample=None, min_sample=None):
441 res = {}
442 ioctx = self.open_connection(create_if_missing=False)
443 if not ioctx:
444 return {}
445 with ioctx:
446 with rados.ReadOpCtx() as op:
447 omap_iter, ret = ioctx.get_omap_vals(op, min_sample or '', sample or '',
448 MAX_SAMPLES) # fixme
449 assert ret == 0
450 try:
451 ioctx.operate_read_op(op, devid)
452 for key, value in list(omap_iter):
453 if sample and key != sample:
454 break
455 if min_sample and key < min_sample:
456 break
457 try:
458 v = json.loads(value)
459 except (ValueError, IndexError):
460 self.log.debug('unable to parse value for %s: "%s"' %
461 (key, value))
462 pass
463 res[key] = v
464 except rados.ObjectNotFound:
465 pass
466 except rados.Error as e:
467 self.log.exception("RADOS error reading omap: {0}".format(e))
468 raise
469 return res
470
471 def show_device_metrics(self, devid, sample):
472 # verify device exists
473 r = self.get("device " + devid)
474 if not r or 'device' not in r.keys():
475 return -errno.ENOENT, '', 'device ' + devid + ' not found'
476 # fetch metrics
477 res = self._get_device_metrics(devid, sample=sample)
478 return 0, json.dumps(res, indent=4, sort_keys=True), ''
479
480 def check_health(self):
481 self.log.info('Check health')
482 config = self.get('config')
483 min_in_ratio = float(config.get('mon_osd_min_in_ratio'))
484 mark_out_threshold_td = timedelta(seconds=int(self.mark_out_threshold))
485 warn_threshold_td = timedelta(seconds=int(self.warn_threshold))
486 checks = {}
487 health_warnings = {
488 DEVICE_HEALTH: [],
489 DEVICE_HEALTH_IN_USE: [],
490 }
491 devs = self.get("devices")
492 osds_in = {}
493 osds_out = {}
494 now = datetime.now(timezone.utc) # e.g. '2021-09-22 13:18:45.021712+00:00'
495 osdmap = self.get("osd_map")
496 assert osdmap is not None
497 for dev in devs['devices']:
498 devid = dev['devid']
499 if 'life_expectancy_max' not in dev:
500 continue
501 # ignore devices that are not consumed by any daemons
502 if not dev['daemons']:
503 continue
504 if not dev['life_expectancy_max'] or \
505 dev['life_expectancy_max'] == '0.000000':
506 continue
507 # life_expectancy_(min/max) is in the format of:
508 # '%Y-%m-%dT%H:%M:%S.%f%z', e.g.:
509 # '2019-01-20 21:12:12.000000+00:00'
510 life_expectancy_max = datetime.strptime(
511 dev['life_expectancy_max'],
512 '%Y-%m-%dT%H:%M:%S.%f%z')
513 self.log.debug('device %s expectancy max %s', dev,
514 life_expectancy_max)
515
516 if life_expectancy_max - now <= mark_out_threshold_td:
517 if self.self_heal:
518 # dev['daemons'] == ["osd.0","osd.1","osd.2"]
519 if dev['daemons']:
520 osds = [x for x in dev['daemons']
521 if x.startswith('osd.')]
522 osd_ids = map(lambda x: x[4:], osds)
523 for _id in osd_ids:
524 if self.is_osd_in(osdmap, _id):
525 osds_in[_id] = life_expectancy_max
526 else:
527 osds_out[_id] = 1
528
529 if life_expectancy_max - now <= warn_threshold_td:
530 # device can appear in more than one location in case
531 # of SCSI multipath
532 device_locations = map(lambda x: x['host'] + ':' + x['dev'],
533 dev['location'])
534 health_warnings[DEVICE_HEALTH].append(
535 '%s (%s); daemons %s; life expectancy between %s and %s'
536 % (dev['devid'],
537 ','.join(device_locations),
538 ','.join(dev.get('daemons', ['none'])),
539 dev['life_expectancy_max'],
540 dev.get('life_expectancy_max', 'unknown')))
541
542 # OSD might be marked 'out' (which means it has no
543 # data), however PGs are still attached to it.
544 for _id in osds_out:
545 num_pgs = self.get_osd_num_pgs(_id)
546 if num_pgs > 0:
547 health_warnings[DEVICE_HEALTH_IN_USE].append(
548 'osd.%s is marked out '
549 'but still has %s PG(s)' %
550 (_id, num_pgs))
551 if osds_in:
552 self.log.debug('osds_in %s' % osds_in)
553 # calculate target in ratio
554 num_osds = len(osdmap['osds'])
555 num_in = len([x for x in osdmap['osds'] if x['in']])
556 num_bad = len(osds_in)
557 # sort with next-to-fail first
558 bad_osds = sorted(osds_in.items(), key=operator.itemgetter(1))
559 did = 0
560 to_mark_out = []
561 for osd_id, when in bad_osds:
562 ratio = float(num_in - did - 1) / float(num_osds)
563 if ratio < min_in_ratio:
564 final_ratio = float(num_in - num_bad) / float(num_osds)
565 checks[DEVICE_HEALTH_TOOMANY] = {
566 'severity': 'warning',
567 'summary': HEALTH_MESSAGES[DEVICE_HEALTH_TOOMANY],
568 'detail': [
569 '%d OSDs with failing device(s) would bring "in" ratio to %f < mon_osd_min_in_ratio %f' % (num_bad - did, final_ratio, min_in_ratio)
570 ]
571 }
572 break
573 to_mark_out.append(osd_id)
574 did += 1
575 if to_mark_out:
576 self.mark_out_etc(to_mark_out)
577 for warning, ls in iteritems(health_warnings):
578 n = len(ls)
579 if n:
580 checks[warning] = {
581 'severity': 'warning',
582 'summary': HEALTH_MESSAGES[warning] % n,
583 'count': len(ls),
584 'detail': ls,
585 }
586 self.set_health_checks(checks)
587 return 0, "", ""
588
589 def is_osd_in(self, osdmap, osd_id):
590 for osd in osdmap['osds']:
591 if str(osd_id) == str(osd['osd']):
592 return bool(osd['in'])
593 return False
594
595 def get_osd_num_pgs(self, osd_id):
596 stats = self.get('osd_stats')
597 assert stats is not None
598 for stat in stats['osd_stats']:
599 if str(osd_id) == str(stat['osd']):
600 return stat['num_pgs']
601 return -1
602
603 def mark_out_etc(self, osd_ids):
604 self.log.info('Marking out OSDs: %s' % osd_ids)
605 result = CommandResult('')
606 self.send_command(result, 'mon', '', json.dumps({
607 'prefix': 'osd out',
608 'format': 'json',
609 'ids': osd_ids,
610 }), '')
611 r, outb, outs = result.wait()
612 if r != 0:
613 self.log.warning('Could not mark OSD %s out. r: [%s], outb: [%s], outs: [%s]' % (osd_ids, r, outb, outs))
614 for osd_id in osd_ids:
615 result = CommandResult('')
616 self.send_command(result, 'mon', '', json.dumps({
617 'prefix': 'osd primary-affinity',
618 'format': 'json',
619 'id': int(osd_id),
620 'weight': 0.0,
621 }), '')
622 r, outb, outs = result.wait()
623 if r != 0:
624 self.log.warning('Could not set osd.%s primary-affinity, r: [%s], outs: [%s]' % (osd_id, r, outb, outs))
625
626 def extract_smart_features(self, raw):
627 # FIXME: extract and normalize raw smartctl --json output and
628 # generate a dict of the fields we care about.
629 return raw
630
631 def predict_lift_expectancy(self, devid):
632 plugin_name = ''
633 model = self.get_ceph_option('device_failure_prediction_mode')
634 if model and model.lower() == 'cloud':
635 plugin_name = 'diskprediction_cloud'
636 elif model and model.lower() == 'local':
637 plugin_name = 'diskprediction_local'
638 else:
639 return -1, '', 'unable to enable any disk prediction model[local/cloud]'
640 try:
641 can_run, _ = self.remote(plugin_name, 'can_run')
642 if can_run:
643 return self.remote(plugin_name, 'predict_life_expectancy', devid=devid)
644 except:
645 return -1, '', 'unable to invoke diskprediction local or remote plugin'
646
647 def predict_all_devices(self):
648 plugin_name = ''
649 model = self.get_ceph_option('device_failure_prediction_mode')
650 if model and model.lower() == 'cloud':
651 plugin_name = 'diskprediction_cloud'
652 elif model and model.lower() == 'local':
653 plugin_name = 'diskprediction_local'
654 else:
655 return -1, '', 'unable to enable any disk prediction model[local/cloud]'
656 try:
657 can_run, _ = self.remote(plugin_name, 'can_run')
658 if can_run:
659 return self.remote(plugin_name, 'predict_all_devices')
660 except:
661 return -1, '', 'unable to invoke diskprediction local or remote plugin'
662
663 def get_recent_device_metrics(self, devid, min_sample):
664 return self._get_device_metrics(devid, min_sample=min_sample)
665
666 def get_time_format(self):
667 return TIME_FORMAT