]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/devicehealth/module.py
04986bb17b1bbf446ffe47b339a7ad90c5e52f3b
2 Device health monitoring
7 from mgr_module
import MgrModule
, CommandResult
, CLIRequiresDB
, CLICommand
, CLIReadCommand
, Option
11 from threading
import Event
12 from datetime
import datetime
, timedelta
, timezone
13 from typing
import cast
, Any
, Dict
, List
, Optional
, Sequence
, Tuple
, TYPE_CHECKING
, Union
15 TIME_FORMAT
= '%Y%m%d-%H%M%S'
17 DEVICE_HEALTH
= 'DEVICE_HEALTH'
18 DEVICE_HEALTH_IN_USE
= 'DEVICE_HEALTH_IN_USE'
19 DEVICE_HEALTH_TOOMANY
= 'DEVICE_HEALTH_TOOMANY'
21 DEVICE_HEALTH
: '%d device(s) expected to fail soon',
22 DEVICE_HEALTH_IN_USE
: '%d daemon(s) expected to fail soon and still contain data',
23 DEVICE_HEALTH_TOOMANY
: 'Too many daemons are expected to fail soon',
27 def get_ata_wear_level(data
: Dict
[Any
, Any
]) -> Optional
[float]:
29 Extract wear level (as float) from smartctl -x --json output for SATA SSD
31 for page
in data
.get("ata_device_statistics", {}).get("pages", []):
32 if page
is None or page
.get("number") != 7:
34 for item
in page
.get("table", []):
35 if item
["offset"] == 8:
36 return item
["value"] / 100.0
40 def get_nvme_wear_level(data
: Dict
[Any
, Any
]) -> Optional
[float]:
42 Extract wear level (as float) from smartctl -x --json output for NVME SSD
44 pct_used
= data
.get("nvme_smart_health_information_log", {}).get("percentage_used")
47 return pct_used
/ 100.0
50 class Module(MgrModule
):
52 # latest (if db does not exist)
55 devid TEXT PRIMARY KEY
57 CREATE TABLE DeviceHealthMetrics (
58 time DATETIME DEFAULT (strftime('%s', 'now')),
59 devid TEXT NOT NULL REFERENCES Device (devid),
60 raw_smart TEXT NOT NULL,
61 PRIMARY KEY (time, devid)
69 devid TEXT PRIMARY KEY
71 CREATE TABLE DeviceHealthMetrics (
72 time DATETIME DEFAULT (strftime('%s', 'now')),
73 devid TEXT NOT NULL REFERENCES Device (devid),
74 raw_smart TEXT NOT NULL,
75 PRIMARY KEY (time, devid)
82 name
='enable_monitoring',
85 desc
='monitor device health metrics',
89 name
='scrape_frequency',
92 desc
='how frequently to scrape device health metrics',
97 default
='device_health_metrics',
99 desc
='name of pool in which to store device health metrics',
103 name
='retention_period',
104 default
=(86400 * 180),
106 desc
='how long to retain device health metrics',
110 name
='mark_out_threshold',
111 default
=(86400 * 14 * 2),
113 desc
='automatically mark OSD if it may fail before this long',
117 name
='warn_threshold',
118 default
=(86400 * 14 * 6),
120 desc
='raise health warning if OSD may fail before this long',
127 desc
='preemptively heal cluster around devices that may fail',
131 name
='sleep_interval',
134 desc
='how frequently to wake up and check device health',
139 def __init__(self
, *args
: Any
, **kwargs
: Any
) -> None:
140 super(Module
, self
).__init
__(*args
, **kwargs
)
142 # populate options (just until serve() runs)
143 for opt
in self
.MODULE_OPTIONS
:
144 setattr(self
, opt
['name'], opt
['default'])
150 # for mypy which does not run the code
152 self
.enable_monitoring
= True
153 self
.scrape_frequency
= 0.0
155 self
.device_health_metrics
= ''
156 self
.retention_period
= 0.0
157 self
.mark_out_threshold
= 0.0
158 self
.warn_threshold
= 0.0
159 self
.self_heal
= True
160 self
.sleep_interval
= 0.0
162 def is_valid_daemon_name(self
, who
: str) -> bool:
163 parts
= who
.split('.', 1)
166 return parts
[0] in ('osd', 'mon')
168 @CLIReadCommand('device query-daemon-health-metrics')
169 def do_query_daemon_health_metrics(self
, who
: str) -> Tuple
[int, str, str]:
171 Get device health metrics for a given daemon
173 if not self
.is_valid_daemon_name(who
):
174 return -errno
.EINVAL
, '', 'not a valid mon or osd daemon name'
175 (daemon_type
, daemon_id
) = who
.split('.')
176 result
= CommandResult('')
177 self
.send_command(result
, daemon_type
, daemon_id
, json
.dumps({
184 @CLIReadCommand('device scrape-daemon-health-metrics')
185 def do_scrape_daemon_health_metrics(self
, who
: str) -> Tuple
[int, str, str]:
187 Scrape and store device health metrics for a given daemon
189 if not self
.is_valid_daemon_name(who
):
190 return -errno
.EINVAL
, '', 'not a valid mon or osd daemon name'
191 (daemon_type
, daemon_id
) = who
.split('.')
192 return self
.scrape_daemon(daemon_type
, daemon_id
)
195 @CLIReadCommand('device scrape-health-metrics')
196 def do_scrape_health_metrics(self
, devid
: Optional
[str] = None) -> Tuple
[int, str, str]:
198 Scrape and store device health metrics
201 return self
.scrape_all()
203 return self
.scrape_device(devid
)
206 @CLIReadCommand('device get-health-metrics')
207 def do_get_health_metrics(self
, devid
: str, sample
: Optional
[str] = None) -> Tuple
[int, str, str]:
209 Show stored device metrics for the device
211 return self
.show_device_metrics(devid
, sample
)
214 @CLICommand('device check-health')
215 def do_check_health(self
) -> Tuple
[int, str, str]:
217 Check life expectancy of devices
219 return self
.check_health()
221 @CLICommand('device monitoring on')
222 def do_monitoring_on(self
) -> Tuple
[int, str, str]:
224 Enable device health monitoring
226 self
.set_module_option('enable_monitoring', True)
230 @CLICommand('device monitoring off')
231 def do_monitoring_off(self
) -> Tuple
[int, str, str]:
233 Disable device health monitoring
235 self
.set_module_option('enable_monitoring', False)
236 self
.set_health_checks({}) # avoid stuck health alerts
240 @CLIReadCommand('device predict-life-expectancy')
241 def do_predict_life_expectancy(self
, devid
: str) -> Tuple
[int, str, str]:
243 Predict life expectancy with local predictor
245 return self
.predict_lift_expectancy(devid
)
247 def self_test(self
) -> None:
248 assert self
.db_ready()
250 osdmap
= self
.get('osd_map')
251 osd_id
= osdmap
['osds'][0]['osd']
252 osdmeta
= self
.get('osd_metadata')
253 devs
= osdmeta
.get(str(osd_id
), {}).get('device_ids')
255 devid
= devs
.split()[0].split('=')[1]
256 self
.log
.debug(f
"getting devid {devid}")
257 (r
, before
, err
) = self
.show_device_metrics(devid
, None)
259 self
.log
.debug(f
"before: {before}")
260 (r
, out
, err
) = self
.scrape_device(devid
)
262 (r
, after
, err
) = self
.show_device_metrics(devid
, None)
264 self
.log
.debug(f
"after: {after}")
265 assert before
!= after
267 def config_notify(self
) -> None:
268 for opt
in self
.MODULE_OPTIONS
:
271 self
.get_module_option(opt
['name']))
272 self
.log
.debug(' %s = %s', opt
['name'], getattr(self
, opt
['name']))
274 def _legacy_put_device_metrics(self
, t
: str, devid
: str, data
: str) -> None:
276 INSERT OR IGNORE INTO DeviceHealthMetrics (time, devid, raw_smart)
280 self
._create
_device
(devid
)
281 epoch
= self
._t
2epoch
(t
)
282 json
.loads(data
) # valid?
283 self
.db
.execute(SQL
, (epoch
, devid
, data
))
285 devre
= r
"[a-zA-Z0-9-]+[_-][a-zA-Z0-9-]+[_-][a-zA-Z0-9-]+"
287 def _load_legacy_object(self
, ioctx
: rados
.Ioctx
, oid
: str) -> bool:
289 self
.log
.debug(f
"loading object {oid}")
290 if re
.search(self
.devre
, oid
) is None:
292 with rados
.ReadOpCtx() as op
:
293 it
, rc
= ioctx
.get_omap_vals(op
, None, None, MAX_OMAP
)
295 ioctx
.operate_read_op(op
, oid
)
297 for t
, raw_smart
in it
:
298 self
.log
.debug(f
"putting {oid} {t}")
299 self
._legacy
_put
_device
_metrics
(t
, oid
, raw_smart
)
301 assert count
< MAX_OMAP
302 self
.log
.debug(f
"removing object {oid}")
303 ioctx
.remove_object(oid
)
306 def check_legacy_pool(self
) -> bool:
308 # 'device_health_metrics' is automatically renamed '.mgr' in
310 ioctx
= self
.rados
.open_ioctx(self
.MGR_POOL_NAME
)
311 except rados
.ObjectNotFound
:
317 with ioctx
, self
._db
_lock
, self
.db
:
319 for obj
in ioctx
.list_objects():
321 if self
._load
_legacy
_object
(ioctx
, obj
.key
):
323 except json
.decoder
.JSONDecodeError
:
328 self
.log
.debug(f
"finished reading legacy pool, complete = {done}")
331 def serve(self
) -> None:
332 self
.log
.info("Starting")
336 finished_loading_legacy
= False
338 if self
.db_ready() and self
.enable_monitoring
:
339 self
.log
.debug('Running')
341 if not finished_loading_legacy
:
342 finished_loading_legacy
= self
.check_legacy_pool()
344 if last_scrape
is None:
345 ls
= self
.get_kv('last_scrape')
348 last_scrape
= datetime
.strptime(ls
, TIME_FORMAT
)
351 self
.log
.debug('Last scrape %s', last_scrape
)
355 now
= datetime
.utcnow()
359 # align to scrape interval
360 scrape_frequency
= self
.scrape_frequency
or 86400
361 seconds
= (last_scrape
- datetime
.utcfromtimestamp(0)).total_seconds()
362 seconds
-= seconds
% scrape_frequency
363 seconds
+= scrape_frequency
364 next_scrape
= datetime
.utcfromtimestamp(seconds
)
366 self
.log
.debug('Last scrape %s, next scrape due %s',
367 last_scrape
.strftime(TIME_FORMAT
),
368 next_scrape
.strftime(TIME_FORMAT
))
370 self
.log
.debug('Last scrape never, next scrape due %s',
371 next_scrape
.strftime(TIME_FORMAT
))
372 if now
>= next_scrape
:
374 self
.predict_all_devices()
376 self
.set_kv('last_scrape', last_scrape
.strftime(TIME_FORMAT
))
379 sleep_interval
= self
.sleep_interval
or 60
380 if not finished_loading_legacy
:
382 self
.log
.debug('Sleeping for %d seconds', sleep_interval
)
383 self
.event
.wait(sleep_interval
)
386 def shutdown(self
) -> None:
387 self
.log
.info('Stopping')
391 def scrape_daemon(self
, daemon_type
: str, daemon_id
: str) -> Tuple
[int, str, str]:
392 if not self
.db_ready():
393 return -errno
.EAGAIN
, "", "mgr db not yet available"
394 raw_smart_data
= self
.do_scrape_daemon(daemon_type
, daemon_id
)
396 for device
, raw_data
in raw_smart_data
.items():
397 data
= self
.extract_smart_features(raw_data
)
399 self
.put_device_metrics(device
, data
)
402 def scrape_all(self
) -> Tuple
[int, str, str]:
403 if not self
.db_ready():
404 return -errno
.EAGAIN
, "", "mgr db not yet available"
405 osdmap
= self
.get("osd_map")
406 assert osdmap
is not None
409 for osd
in osdmap
['osds']:
410 ids
.append(('osd', str(osd
['osd'])))
411 monmap
= self
.get("mon_map")
412 for mon
in monmap
['mons']:
413 ids
.append(('mon', mon
['name']))
414 for daemon_type
, daemon_id
in ids
:
415 raw_smart_data
= self
.do_scrape_daemon(daemon_type
, daemon_id
)
416 if not raw_smart_data
:
418 for device
, raw_data
in raw_smart_data
.items():
419 if device
in did_device
:
420 self
.log
.debug('skipping duplicate %s' % device
)
422 did_device
[device
] = 1
423 data
= self
.extract_smart_features(raw_data
)
425 self
.put_device_metrics(device
, data
)
428 def scrape_device(self
, devid
: str) -> Tuple
[int, str, str]:
429 if not self
.db_ready():
430 return -errno
.EAGAIN
, "", "mgr db not yet available"
431 r
= self
.get("device " + devid
)
432 if not r
or 'device' not in r
.keys():
433 return -errno
.ENOENT
, '', 'device ' + devid
+ ' not found'
434 daemons
= r
['device'].get('daemons', [])
436 return (-errno
.EAGAIN
, '',
437 'device ' + devid
+ ' not claimed by any active daemons')
438 (daemon_type
, daemon_id
) = daemons
[0].split('.')
439 raw_smart_data
= self
.do_scrape_daemon(daemon_type
, daemon_id
,
442 for device
, raw_data
in raw_smart_data
.items():
443 data
= self
.extract_smart_features(raw_data
)
445 self
.put_device_metrics(device
, data
)
448 def do_scrape_daemon(self
,
451 devid
: str = '') -> Optional
[Dict
[str, Any
]]:
453 :return: a dict, or None if the scrape failed.
455 self
.log
.debug('do_scrape_daemon %s.%s' % (daemon_type
, daemon_id
))
456 result
= CommandResult('')
457 self
.send_command(result
, daemon_type
, daemon_id
, json
.dumps({
462 r
, outb
, outs
= result
.wait()
465 return json
.loads(outb
)
466 except (IndexError, ValueError):
468 "Fail to parse JSON result from daemon {0}.{1} ({2})".format(
469 daemon_type
, daemon_id
, outb
))
472 def _prune_device_metrics(self
) -> None:
474 DELETE FROM DeviceHealthMetrics
475 WHERE time < (strftime('%s', 'now') - ?);
478 cursor
= self
.db
.execute(SQL
, (self
.retention_period
,))
479 if cursor
.rowcount
>= 1:
480 self
.log
.info(f
"pruned {cursor.rowcount} metrics")
482 def _create_device(self
, devid
: str) -> None:
484 INSERT OR IGNORE INTO Device VALUES (?);
487 cursor
= self
.db
.execute(SQL
, (devid
,))
488 if cursor
.rowcount
>= 1:
489 self
.log
.info(f
"created device {devid}")
491 self
.log
.debug(f
"device {devid} already exists")
493 def put_device_metrics(self
, devid
: str, data
: Any
) -> None:
495 INSERT INTO DeviceHealthMetrics (devid, raw_smart)
499 with self
._db
_lock
, self
.db
:
500 self
._create
_device
(devid
)
501 self
.db
.execute(SQL
, (devid
, json
.dumps(data
)))
502 self
._prune
_device
_metrics
()
504 # extract wear level?
505 wear_level
= get_ata_wear_level(data
)
506 if wear_level
is None:
507 wear_level
= get_nvme_wear_level(data
)
508 dev_data
= self
.get(f
"device {devid}") or {}
509 if wear_level
is not None:
510 if dev_data
.get(wear_level
) != str(wear_level
):
511 dev_data
["wear_level"] = str(wear_level
)
512 self
.log
.debug(f
"updating {devid} wear level to {wear_level}")
513 self
.set_device_wear_level(devid
, wear_level
)
515 if "wear_level" in dev_data
:
516 del dev_data
["wear_level"]
517 self
.log
.debug(f
"removing {devid} wear level")
518 self
.set_device_wear_level(devid
, -1.0)
520 def _t2epoch(self
, t
: Optional
[str]) -> int:
524 return int(datetime
.strptime(t
, TIME_FORMAT
).strftime("%s"))
526 def _get_device_metrics(self
, devid
: str,
527 sample
: Optional
[str] = None,
528 min_sample
: Optional
[str] = None) -> Dict
[str, Dict
[str, Any
]]:
532 SELECT time, raw_smart
533 FROM DeviceHealthMetrics
534 WHERE devid = ? AND time = ?
538 SELECT time, raw_smart
539 FROM DeviceHealthMetrics
540 WHERE devid = ? AND ? <= time
547 isample
= self
._t
2epoch
(sample
)
549 imin_sample
= self
._t
2epoch
(min_sample
)
551 self
.log
.debug(f
"_get_device_metrics: {devid} {sample} {min_sample}")
553 with self
._db
_lock
, self
.db
:
555 cursor
= self
.db
.execute(SQL_EXACT
, (devid
, isample
))
557 cursor
= self
.db
.execute(SQL_MIN
, (devid
, imin_sample
))
560 dt
= datetime
.utcfromtimestamp(t
).strftime(TIME_FORMAT
)
562 res
[dt
] = json
.loads(row
['raw_smart'])
563 except (ValueError, IndexError):
564 self
.log
.debug(f
"unable to parse value for {devid}:{t}")
568 def show_device_metrics(self
, devid
: str, sample
: Optional
[str]) -> Tuple
[int, str, str]:
569 # verify device exists
570 r
= self
.get("device " + devid
)
571 if not r
or 'device' not in r
.keys():
572 return -errno
.ENOENT
, '', 'device ' + devid
+ ' not found'
574 res
= self
._get
_device
_metrics
(devid
, sample
=sample
)
575 return 0, json
.dumps(res
, indent
=4, sort_keys
=True), ''
577 def check_health(self
) -> Tuple
[int, str, str]:
578 self
.log
.info('Check health')
579 config
= self
.get('config')
580 min_in_ratio
= float(config
.get('mon_osd_min_in_ratio'))
581 mark_out_threshold_td
= timedelta(seconds
=self
.mark_out_threshold
)
582 warn_threshold_td
= timedelta(seconds
=self
.warn_threshold
)
583 checks
: Dict
[str, Dict
[str, Union
[int, str, Sequence
[str]]]] = {}
584 health_warnings
: Dict
[str, List
[str]] = {
586 DEVICE_HEALTH_IN_USE
: [],
588 devs
= self
.get("devices")
591 now
= datetime
.now(timezone
.utc
) # e.g. '2021-09-22 13:18:45.021712+00:00'
592 osdmap
= self
.get("osd_map")
593 assert osdmap
is not None
594 for dev
in devs
['devices']:
595 if 'life_expectancy_max' not in dev
:
597 # ignore devices that are not consumed by any daemons
598 if not dev
['daemons']:
600 if not dev
['life_expectancy_max'] or \
601 dev
['life_expectancy_max'] == '0.000000':
603 # life_expectancy_(min/max) is in the format of:
604 # '%Y-%m-%dT%H:%M:%S.%f%z', e.g.:
605 # '2019-01-20 21:12:12.000000+00:00'
606 life_expectancy_max
= datetime
.strptime(
607 dev
['life_expectancy_max'],
608 '%Y-%m-%dT%H:%M:%S.%f%z')
609 self
.log
.debug('device %s expectancy max %s', dev
,
612 if life_expectancy_max
- now
<= mark_out_threshold_td
:
614 # dev['daemons'] == ["osd.0","osd.1","osd.2"]
616 osds
= [x
for x
in dev
['daemons']
617 if x
.startswith('osd.')]
618 osd_ids
= map(lambda x
: x
[4:], osds
)
620 if self
.is_osd_in(osdmap
, _id
):
621 osds_in
[_id
] = life_expectancy_max
625 if life_expectancy_max
- now
<= warn_threshold_td
:
626 # device can appear in more than one location in case
628 device_locations
= map(lambda x
: x
['host'] + ':' + x
['dev'],
630 health_warnings
[DEVICE_HEALTH
].append(
631 '%s (%s); daemons %s; life expectancy between %s and %s'
633 ','.join(device_locations
),
634 ','.join(dev
.get('daemons', ['none'])),
635 dev
['life_expectancy_max'],
636 dev
.get('life_expectancy_max', 'unknown')))
638 # OSD might be marked 'out' (which means it has no
639 # data), however PGs are still attached to it.
641 num_pgs
= self
.get_osd_num_pgs(_id
)
643 health_warnings
[DEVICE_HEALTH_IN_USE
].append(
644 'osd.%s is marked out '
645 'but still has %s PG(s)' %
648 self
.log
.debug('osds_in %s' % osds_in
)
649 # calculate target in ratio
650 num_osds
= len(osdmap
['osds'])
651 num_in
= len([x
for x
in osdmap
['osds'] if x
['in']])
652 num_bad
= len(osds_in
)
653 # sort with next-to-fail first
654 bad_osds
= sorted(osds_in
.items(), key
=operator
.itemgetter(1))
657 for osd_id
, when
in bad_osds
:
658 ratio
= float(num_in
- did
- 1) / float(num_osds
)
659 if ratio
< min_in_ratio
:
660 final_ratio
= float(num_in
- num_bad
) / float(num_osds
)
661 checks
[DEVICE_HEALTH_TOOMANY
] = {
662 'severity': 'warning',
663 'summary': HEALTH_MESSAGES
[DEVICE_HEALTH_TOOMANY
],
665 '%d OSDs with failing device(s) would bring "in" ratio to %f < mon_osd_min_in_ratio %f' % (
666 num_bad
- did
, final_ratio
, min_in_ratio
)
670 to_mark_out
.append(osd_id
)
673 self
.mark_out_etc(to_mark_out
)
674 for warning
, ls
in health_warnings
.items():
678 'severity': 'warning',
679 'summary': HEALTH_MESSAGES
[warning
] % n
,
683 self
.set_health_checks(checks
)
686 def is_osd_in(self
, osdmap
: Dict
[str, Any
], osd_id
: str) -> bool:
687 for osd
in osdmap
['osds']:
688 if osd_id
== str(osd
['osd']):
689 return bool(osd
['in'])
692 def get_osd_num_pgs(self
, osd_id
: str) -> int:
693 stats
= self
.get('osd_stats')
694 assert stats
is not None
695 for stat
in stats
['osd_stats']:
696 if osd_id
== str(stat
['osd']):
697 return stat
['num_pgs']
700 def mark_out_etc(self
, osd_ids
: List
[str]) -> None:
701 self
.log
.info('Marking out OSDs: %s' % osd_ids
)
702 result
= CommandResult('')
703 self
.send_command(result
, 'mon', '', json
.dumps({
708 r
, outb
, outs
= result
.wait()
710 self
.log
.warning('Could not mark OSD %s out. r: [%s], outb: [%s], outs: [%s]',
711 osd_ids
, r
, outb
, outs
)
712 for osd_id
in osd_ids
:
713 result
= CommandResult('')
714 self
.send_command(result
, 'mon', '', json
.dumps({
715 'prefix': 'osd primary-affinity',
720 r
, outb
, outs
= result
.wait()
722 self
.log
.warning('Could not set osd.%s primary-affinity, '
723 'r: [%s], outb: [%s], outs: [%s]',
724 osd_id
, r
, outb
, outs
)
726 def extract_smart_features(self
, raw
: Any
) -> Any
:
727 # FIXME: extract and normalize raw smartctl --json output and
728 # generate a dict of the fields we care about.
731 def predict_lift_expectancy(self
, devid
: str) -> Tuple
[int, str, str]:
733 model
= self
.get_ceph_option('device_failure_prediction_mode')
734 if cast(str, model
).lower() == 'local':
735 plugin_name
= 'diskprediction_local'
737 return -1, '', 'unable to enable any disk prediction model[local/cloud]'
739 can_run
, _
= self
.remote(plugin_name
, 'can_run')
741 return self
.remote(plugin_name
, 'predict_life_expectancy', devid
=devid
)
743 return -1, '', f
'{plugin_name} is not available'
745 return -1, '', 'unable to invoke diskprediction local or remote plugin'
747 def predict_all_devices(self
) -> Tuple
[int, str, str]:
749 model
= self
.get_ceph_option('device_failure_prediction_mode')
750 if cast(str, model
).lower() == 'local':
751 plugin_name
= 'diskprediction_local'
753 return -1, '', 'unable to enable any disk prediction model[local/cloud]'
755 can_run
, _
= self
.remote(plugin_name
, 'can_run')
757 return self
.remote(plugin_name
, 'predict_all_devices')
759 return -1, '', f
'{plugin_name} is not available'
761 return -1, '', 'unable to invoke diskprediction local or remote plugin'
763 def get_recent_device_metrics(self
, devid
: str, min_sample
: str) -> Dict
[str, Dict
[str, Any
]]:
764 return self
._get
_device
_metrics
(devid
, min_sample
=min_sample
)
766 def get_time_format(self
) -> str: