]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/devicehealth/module.py
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / pybind / mgr / devicehealth / module.py
1 """
2 Device health monitoring
3 """
4
5 import errno
6 import json
7 from mgr_module import MgrModule, CommandResult
8 import operator
9 import rados
10 from threading import Event
11 from datetime import datetime, timedelta, date, time
12 from six import iteritems
13
14 TIME_FORMAT = '%Y%m%d-%H%M%S'
15
16 DEVICE_HEALTH = 'DEVICE_HEALTH'
17 DEVICE_HEALTH_IN_USE = 'DEVICE_HEALTH_IN_USE'
18 DEVICE_HEALTH_TOOMANY = 'DEVICE_HEALTH_TOOMANY'
19 HEALTH_MESSAGES = {
20 DEVICE_HEALTH: '%d device(s) expected to fail soon',
21 DEVICE_HEALTH_IN_USE: '%d daemons(s) expected to fail soon and still contain data',
22 DEVICE_HEALTH_TOOMANY: 'Too many daemons are expected to fail soon',
23 }
24
25
26 class Module(MgrModule):
27 MODULE_OPTIONS = [
28 {
29 'name': 'enable_monitoring',
30 'default': False,
31 'type': 'bool',
32 'desc': 'monitor device health metrics',
33 'runtime': True,
34 },
35 {
36 'name': 'scrape_frequency',
37 'default': 86400,
38 'type': 'secs',
39 'desc': 'how frequently to scrape device health metrics',
40 'runtime': True,
41 },
42 {
43 'name': 'pool_name',
44 'default': 'device_health_metrics',
45 'type': 'str',
46 'desc': 'name of pool in which to store device health metrics',
47 'runtime': True,
48 },
49 {
50 'name': 'retention_period',
51 'default': (86400 * 180),
52 'type': 'secs',
53 'desc': 'how long to retain device health metrics',
54 'runtime': True,
55 },
56 {
57 'name': 'mark_out_threshold',
58 'default': (86400 * 14 * 2),
59 'type': 'secs',
60 'desc': 'automatically mark OSD if it may fail before this long',
61 'runtime': True,
62 },
63 {
64 'name': 'warn_threshold',
65 'default': (86400 * 14 * 6),
66 'type': 'secs',
67 'desc': 'raise health warning if OSD may fail before this long',
68 'runtime': True,
69 },
70 {
71 'name': 'self_heal',
72 'default': True,
73 'type': 'bool',
74 'desc': 'preemptively heal cluster around devices that may fail',
75 'runtime': True,
76 },
77 {
78 'name': 'sleep_interval',
79 'default': 600,
80 'type': 'secs',
81 'desc': 'how frequently to wake up and check device health',
82 'runtime': True,
83 },
84 ]
85
86 COMMANDS = [
87 {
88 "cmd": "device query-daemon-health-metrics "
89 "name=who,type=CephString",
90 "desc": "Get device health metrics for a given daemon",
91 "perm": "r"
92 },
93 {
94 "cmd": "device scrape-daemon-health-metrics "
95 "name=who,type=CephString",
96 "desc": "Scrape and store device health metrics "
97 "for a given daemon",
98 "perm": "r"
99 },
100 {
101 "cmd": "device scrape-health-metrics "
102 "name=devid,type=CephString,req=False",
103 "desc": "Scrape and store health metrics",
104 "perm": "r"
105 },
106 {
107 "cmd": "device get-health-metrics "
108 "name=devid,type=CephString "
109 "name=sample,type=CephString,req=False",
110 "desc": "Show stored device metrics for the device",
111 "perm": "r"
112 },
113 {
114 "cmd": "device check-health",
115 "desc": "Check life expectancy of devices",
116 "perm": "rw",
117 },
118 {
119 "cmd": "device monitoring on",
120 "desc": "Enable device health monitoring",
121 "perm": "rw",
122 },
123 {
124 "cmd": "device monitoring off",
125 "desc": "Disable device health monitoring",
126 "perm": "rw",
127 },
128 {
129 'cmd': 'device predict-life-expectancy '
130 'name=devid,type=CephString,req=true',
131 'desc': 'Predict life expectancy with local predictor',
132 'perm': 'r'
133 },
134 ]
135
136 def __init__(self, *args, **kwargs):
137 super(Module, self).__init__(*args, **kwargs)
138
139 # populate options (just until serve() runs)
140 for opt in self.MODULE_OPTIONS:
141 setattr(self, opt['name'], opt['default'])
142
143 # other
144 self.run = True
145 self.event = Event()
146
147 def is_valid_daemon_name(self, who):
148 l = who.split('.')
149 if len(l) != 2:
150 return False
151 if l[0] not in ('osd', 'mon'):
152 return False;
153 return True;
154
155 def handle_command(self, _, cmd):
156 self.log.error("handle_command")
157
158 if cmd['prefix'] == 'device query-daemon-health-metrics':
159 who = cmd.get('who', '')
160 if not self.is_valid_daemon_name(who):
161 return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
162 (daemon_type, daemon_id) = cmd.get('who', '').split('.')
163 result = CommandResult('')
164 self.send_command(result, daemon_type, daemon_id, json.dumps({
165 'prefix': 'smart',
166 'format': 'json',
167 }), '')
168 r, outb, outs = result.wait()
169 return r, outb, outs
170 elif cmd['prefix'] == 'device scrape-daemon-health-metrics':
171 who = cmd.get('who', '')
172 if not self.is_valid_daemon_name(who):
173 return -errno.EINVAL, '', 'not a valid mon or osd daemon name'
174 (daemon_type, daemon_id) = cmd.get('who', '').split('.')
175 return self.scrape_daemon(daemon_type, daemon_id)
176 elif cmd['prefix'] == 'device scrape-health-metrics':
177 if 'devid' in cmd:
178 return self.scrape_device(cmd['devid'])
179 return self.scrape_all()
180 elif cmd['prefix'] == 'device get-health-metrics':
181 return self.show_device_metrics(cmd['devid'], cmd.get('sample'))
182 elif cmd['prefix'] == 'device check-health':
183 return self.check_health()
184 elif cmd['prefix'] == 'device monitoring on':
185 self.set_module_option('enable_monitoring', True)
186 self.event.set()
187 return 0, '', ''
188 elif cmd['prefix'] == 'device monitoring off':
189 self.set_module_option('enable_monitoring', False)
190 self.set_health_checks({}) # avoid stuck health alerts
191 return 0, '', ''
192 elif cmd['prefix'] == 'device predict-life-expectancy':
193 return self.predict_lift_expectancy(cmd['devid'])
194 else:
195 # mgr should respect our self.COMMANDS and not call us for
196 # any prefix we don't advertise
197 raise NotImplementedError(cmd['prefix'])
198
199 def self_test(self):
200 self.config_notify()
201 osdmap = self.get('osd_map')
202 osd_id = osdmap['osds'][0]['osd']
203 osdmeta = self.get('osd_metadata')
204 devs = osdmeta.get(str(osd_id), {}).get('device_ids')
205 if devs:
206 devid = devs.split()[0].split('=')[1]
207 (r, before, err) = self.show_device_metrics(devid, '')
208 assert r == 0
209 (r, out, err) = self.scrape_device(devid)
210 assert r == 0
211 (r, after, err) = self.show_device_metrics(devid, '')
212 assert r == 0
213 assert before != after
214
215 def config_notify(self):
216 for opt in self.MODULE_OPTIONS:
217 setattr(self,
218 opt['name'],
219 self.get_module_option(opt['name']))
220 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
221
222 def serve(self):
223 self.log.info("Starting")
224 self.config_notify()
225
226 last_scrape = None
227 ls = self.get_store('last_scrape')
228 if ls:
229 try:
230 last_scrape = datetime.strptime(ls, TIME_FORMAT)
231 except ValueError as e:
232 pass
233 self.log.debug('Last scrape %s', last_scrape)
234
235 while self.run:
236 if self.enable_monitoring:
237 self.log.debug('Running')
238 self.check_health()
239
240 now = datetime.utcnow()
241 if not last_scrape:
242 next_scrape = now
243 else:
244 # align to scrape interval
245 scrape_frequency = int(self.scrape_frequency) or 86400
246 seconds = (last_scrape - datetime.utcfromtimestamp(0)).total_seconds()
247 seconds -= seconds % scrape_frequency
248 seconds += scrape_frequency
249 next_scrape = datetime.utcfromtimestamp(seconds)
250 if last_scrape:
251 self.log.debug('Last scrape %s, next scrape due %s',
252 last_scrape.strftime(TIME_FORMAT),
253 next_scrape.strftime(TIME_FORMAT))
254 else:
255 self.log.debug('Last scrape never, next scrape due %s',
256 next_scrape.strftime(TIME_FORMAT))
257 if now >= next_scrape:
258 self.scrape_all()
259 self.predict_all_devices()
260 last_scrape = now
261 self.set_store('last_scrape', last_scrape.strftime(TIME_FORMAT))
262
263 # sleep
264 sleep_interval = int(self.sleep_interval) or 60
265 self.log.debug('Sleeping for %d seconds', sleep_interval)
266 ret = self.event.wait(sleep_interval)
267 self.event.clear()
268
269 def shutdown(self):
270 self.log.info('Stopping')
271 self.run = False
272 self.event.set()
273
274 def open_connection(self, create_if_missing=True):
275 pools = self.rados.list_pools()
276 is_pool = False
277 for pool in pools:
278 if pool == self.pool_name:
279 is_pool = True
280 break
281 if not is_pool:
282 if not create_if_missing:
283 return None
284 self.log.debug('create %s pool' % self.pool_name)
285 # create pool
286 result = CommandResult('')
287 self.send_command(result, 'mon', '', json.dumps({
288 'prefix': 'osd pool create',
289 'format': 'json',
290 'pool': self.pool_name,
291 'pg_num': 1,
292 'pg_num_min': 1,
293 }), '')
294 r, outb, outs = result.wait()
295 assert r == 0
296
297 # set pool application
298 result = CommandResult('')
299 self.send_command(result, 'mon', '', json.dumps({
300 'prefix': 'osd pool application enable',
301 'format': 'json',
302 'pool': self.pool_name,
303 'app': 'mgr_devicehealth',
304 }), '')
305 r, outb, outs = result.wait()
306 assert r == 0
307
308 ioctx = self.rados.open_ioctx(self.pool_name)
309 return ioctx
310
311 def scrape_daemon(self, daemon_type, daemon_id):
312 ioctx = self.open_connection()
313 raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id)
314 if raw_smart_data:
315 for device, raw_data in raw_smart_data.items():
316 data = self.extract_smart_features(raw_data)
317 self.put_device_metrics(ioctx, device, data)
318 ioctx.close()
319 return 0, "", ""
320
321 def scrape_all(self):
322 osdmap = self.get("osd_map")
323 assert osdmap is not None
324 ioctx = self.open_connection()
325 did_device = {}
326 ids = []
327 for osd in osdmap['osds']:
328 ids.append(('osd', str(osd['osd'])))
329 monmap = self.get("mon_map")
330 for mon in monmap['mons']:
331 ids.append(('mon', mon['name']))
332 for daemon_type, daemon_id in ids:
333 raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id)
334 if not raw_smart_data:
335 continue
336 for device, raw_data in raw_smart_data.items():
337 if device in did_device:
338 self.log.debug('skipping duplicate %s' % device)
339 continue
340 did_device[device] = 1
341 data = self.extract_smart_features(raw_data)
342 self.put_device_metrics(ioctx, device, data)
343 ioctx.close()
344 return 0, "", ""
345
346 def scrape_device(self, devid):
347 r = self.get("device " + devid)
348 if not r or 'device' not in r.keys():
349 return -errno.ENOENT, '', 'device ' + devid + ' not found'
350 daemons = r['device'].get('daemons', [])
351 if not daemons:
352 return (-errno.EAGAIN, '',
353 'device ' + devid + ' not claimed by any active daemons')
354 (daemon_type, daemon_id) = daemons[0].split('.')
355 ioctx = self.open_connection()
356 raw_smart_data = self.do_scrape_daemon(daemon_type, daemon_id,
357 devid=devid)
358 if raw_smart_data:
359 for device, raw_data in raw_smart_data.items():
360 data = self.extract_smart_features(raw_data)
361 self.put_device_metrics(ioctx, device, data)
362 ioctx.close()
363 return 0, "", ""
364
365 def do_scrape_daemon(self, daemon_type, daemon_id, devid=''):
366 """
367 :return: a dict, or None if the scrape failed.
368 """
369 self.log.debug('do_scrape_daemon %s.%s' % (daemon_type, daemon_id))
370 result = CommandResult('')
371 self.send_command(result, daemon_type, daemon_id, json.dumps({
372 'prefix': 'smart',
373 'format': 'json',
374 'devid': devid,
375 }), '')
376 r, outb, outs = result.wait()
377
378 try:
379 return json.loads(outb)
380 except (IndexError, ValueError):
381 self.log.error(
382 "Fail to parse JSON result from daemon {0}.{1} ({2})".format(
383 daemon_type, daemon_id, outb))
384
385 def put_device_metrics(self, ioctx, devid, data):
386 old_key = datetime.utcnow() - timedelta(
387 seconds=int(self.retention_period))
388 prune = old_key.strftime(TIME_FORMAT)
389 self.log.debug('put_device_metrics device %s prune %s' %
390 (devid, prune))
391 erase = []
392 try:
393 with rados.ReadOpCtx() as op:
394 omap_iter, ret = ioctx.get_omap_keys(op, "", 500) # fixme
395 assert ret == 0
396 ioctx.operate_read_op(op, devid)
397 for key, _ in list(omap_iter):
398 if key >= prune:
399 break
400 erase.append(key)
401 except rados.ObjectNotFound:
402 # The object doesn't already exist, no problem.
403 pass
404 except rados.Error as e:
405 # Do not proceed with writes if something unexpected
406 # went wrong with the reads.
407 self.log.exception("Error reading OMAP: {0}".format(e))
408 return
409
410 key = datetime.utcnow().strftime(TIME_FORMAT)
411 self.log.debug('put_device_metrics device %s key %s = %s, erase %s' %
412 (devid, key, data, erase))
413 with rados.WriteOpCtx() as op:
414 ioctx.set_omap(op, (key,), (str(json.dumps(data)),))
415 if len(erase):
416 ioctx.remove_omap_keys(op, tuple(erase))
417 ioctx.operate_write_op(op, devid)
418
419 def show_device_metrics(self, devid, sample):
420 # verify device exists
421 r = self.get("device " + devid)
422 if not r or 'device' not in r.keys():
423 return -errno.ENOENT, '', 'device ' + devid + ' not found'
424 # fetch metrics
425 res = {}
426 ioctx = self.open_connection(create_if_missing=False)
427 if not ioctx:
428 return 0, json.dumps(res, indent=4), ''
429 with ioctx:
430 with rados.ReadOpCtx() as op:
431 omap_iter, ret = ioctx.get_omap_vals(op, "", sample or '', 500) # fixme
432 assert ret == 0
433 try:
434 ioctx.operate_read_op(op, devid)
435 for key, value in list(omap_iter):
436 if sample and key != sample:
437 break
438 try:
439 v = json.loads(value)
440 except (ValueError, IndexError):
441 self.log.debug('unable to parse value for %s: "%s"' %
442 (key, value))
443 pass
444 res[key] = v
445 except rados.ObjectNotFound:
446 pass
447 except rados.Error as e:
448 self.log.exception("RADOS error reading omap: {0}".format(e))
449 raise
450
451 return 0, json.dumps(res, indent=4), ''
452
453 def check_health(self):
454 self.log.info('Check health')
455 config = self.get('config')
456 min_in_ratio = float(config.get('mon_osd_min_in_ratio'))
457 mark_out_threshold_td = timedelta(seconds=int(self.mark_out_threshold))
458 warn_threshold_td = timedelta(seconds=int(self.warn_threshold))
459 checks = {}
460 health_warnings = {
461 DEVICE_HEALTH: [],
462 DEVICE_HEALTH_IN_USE: [],
463 }
464 devs = self.get("devices")
465 osds_in = {}
466 osds_out = {}
467 now = datetime.utcnow()
468 osdmap = self.get("osd_map")
469 assert osdmap is not None
470 for dev in devs['devices']:
471 devid = dev['devid']
472 if 'life_expectancy_max' not in dev:
473 continue
474 # ignore devices that are not consumed by any daemons
475 if not dev['daemons']:
476 continue
477 if not dev['life_expectancy_max'] or \
478 dev['life_expectancy_max'] == '0.000000':
479 continue
480 # life_expectancy_(min/max) is in the format of:
481 # '%Y-%m-%d %H:%M:%S.%f', e.g.:
482 # '2019-01-20 21:12:12.000000'
483 life_expectancy_max = datetime.strptime(
484 dev['life_expectancy_max'],
485 '%Y-%m-%d %H:%M:%S.%f')
486 self.log.debug('device %s expectancy max %s', dev,
487 life_expectancy_max)
488
489 if life_expectancy_max - now <= mark_out_threshold_td:
490 if self.self_heal:
491 # dev['daemons'] == ["osd.0","osd.1","osd.2"]
492 if dev['daemons']:
493 osds = [x for x in dev['daemons']
494 if x.startswith('osd.')]
495 osd_ids = map(lambda x: x[4:], osds)
496 for _id in osd_ids:
497 if self.is_osd_in(osdmap, _id):
498 osds_in[_id] = life_expectancy_max
499 else:
500 osds_out[_id] = 1
501
502 if life_expectancy_max - now <= warn_threshold_td:
503 # device can appear in more than one location in case
504 # of SCSI multipath
505 device_locations = map(lambda x: x['host'] + ':' + x['dev'],
506 dev['location'])
507 health_warnings[DEVICE_HEALTH].append(
508 '%s (%s); daemons %s; life expectancy between %s and %s'
509 % (dev['devid'],
510 ','.join(device_locations),
511 ','.join(dev.get('daemons', ['none'])),
512 dev['life_expectancy_max'],
513 dev.get('life_expectancy_max', 'unknown')))
514
515 # OSD might be marked 'out' (which means it has no
516 # data), however PGs are still attached to it.
517 for _id in osds_out:
518 num_pgs = self.get_osd_num_pgs(_id)
519 if num_pgs > 0:
520 health_warnings[DEVICE_HEALTH_IN_USE].append(
521 'osd.%s is marked out '
522 'but still has %s PG(s)' %
523 (_id, num_pgs))
524 if osds_in:
525 self.log.debug('osds_in %s' % osds_in)
526 # calculate target in ratio
527 num_osds = len(osdmap['osds'])
528 num_in = len([x for x in osdmap['osds'] if x['in']])
529 num_bad = len(osds_in)
530 # sort with next-to-fail first
531 bad_osds = sorted(osds_in.items(), key=operator.itemgetter(1))
532 did = 0
533 to_mark_out = []
534 for osd_id, when in bad_osds:
535 ratio = float(num_in - did - 1) / float(num_osds)
536 if ratio < min_in_ratio:
537 final_ratio = float(num_in - num_bad) / float(num_osds)
538 checks[DEVICE_HEALTH_TOOMANY] = {
539 'severity': 'warning',
540 'summary': HEALTH_MESSAGES[DEVICE_HEALTH_TOOMANY],
541 'detail': [
542 '%d OSDs with failing device(s) would bring "in" ratio to %f < mon_osd_min_in_ratio %f' % (num_bad - did, final_ratio, min_in_ratio)
543 ]
544 }
545 break
546 to_mark_out.append(osd_id)
547 did += 1
548 if to_mark_out:
549 self.mark_out_etc(to_mark_out)
550 for warning, ls in iteritems(health_warnings):
551 n = len(ls)
552 if n:
553 checks[warning] = {
554 'severity': 'warning',
555 'summary': HEALTH_MESSAGES[warning] % n,
556 'detail': ls,
557 }
558 self.set_health_checks(checks)
559 return 0, "", ""
560
561 def is_osd_in(self, osdmap, osd_id):
562 for osd in osdmap['osds']:
563 if str(osd_id) == str(osd['osd']):
564 return bool(osd['in'])
565 return False
566
567 def get_osd_num_pgs(self, osd_id):
568 stats = self.get('osd_stats')
569 assert stats is not None
570 for stat in stats['osd_stats']:
571 if str(osd_id) == str(stat['osd']):
572 return stat['num_pgs']
573 return -1
574
575 def mark_out_etc(self, osd_ids):
576 self.log.info('Marking out OSDs: %s' % osd_ids)
577 result = CommandResult('')
578 self.send_command(result, 'mon', '', json.dumps({
579 'prefix': 'osd out',
580 'format': 'json',
581 'ids': osd_ids,
582 }), '')
583 r, outb, outs = result.wait()
584 if r != 0:
585 self.log.warn('Could not mark OSD %s out. r: [%s], outb: [%s], outs: [%s]' % (osd_ids, r, outb, outs))
586 for osd_id in osd_ids:
587 result = CommandResult('')
588 self.send_command(result, 'mon', '', json.dumps({
589 'prefix': 'osd primary-affinity',
590 'format': 'json',
591 'id': int(osd_id),
592 'weight': 0.0,
593 }), '')
594 r, outb, outs = result.wait()
595 if r != 0:
596 self.log.warn('Could not set osd.%s primary-affinity, r: [%s], outs: [%s]' % (osd_id, r, outb, outs))
597
598 def extract_smart_features(self, raw):
599 # FIXME: extract and normalize raw smartctl --json output and
600 # generate a dict of the fields we care about.
601 return raw
602
603 def predict_lift_expectancy(self, devid):
604 plugin_name = ''
605 model = self.get_ceph_option('device_failure_prediction_mode')
606 if model and model.lower() == 'cloud':
607 plugin_name = 'diskprediction_cloud'
608 elif model and model.lower() == 'local':
609 plugin_name = 'diskprediction_local'
610 else:
611 return -1, '', 'unable to enable any disk prediction model[local/cloud]'
612 try:
613 can_run, _ = self.remote(plugin_name, 'can_run')
614 if can_run:
615 return self.remote(plugin_name, 'predict_life_expectancy', devid=devid)
616 except:
617 return -1, '', 'unable to invoke diskprediction local or remote plugin'
618
619 def predict_all_devices(self):
620 plugin_name = ''
621 model = self.get_ceph_option('device_failure_prediction_mode')
622 if model and model.lower() == 'cloud':
623 plugin_name = 'diskprediction_cloud'
624 elif model and model.lower() == 'local':
625 plugin_name = 'diskprediction_local'
626 else:
627 return -1, '', 'unable to enable any disk prediction model[local/cloud]'
628 try:
629 can_run, _ = self.remote(plugin_name, 'can_run')
630 if can_run:
631 return self.remote(plugin_name, 'predict_all_devices')
632 except:
633 return -1, '', 'unable to invoke diskprediction local or remote plugin'