6 from mgr_module
import CLICommand
, CLIReadCommand
, HandleCommandResult
7 from mgr_module
import MgrModule
, CommandResult
8 from . import health
as health_util
10 # hours of crash history to report
11 CRASH_HISTORY_HOURS
= 24
12 # hours of health history to report
13 HEALTH_HISTORY_HOURS
= 24
14 # how many hours of health history to keep
15 HEALTH_RETENTION_HOURS
= 30
16 # health check name for insights health
17 INSIGHTS_HEALTH_CHECK
= "MGR_INSIGHTS_WARNING"
18 # version tag for persistent data format
22 class Module(MgrModule
):
23 def __init__(self
, *args
, **kwargs
):
24 super(Module
, self
).__init
__(*args
, **kwargs
)
26 self
._shutdown
= False
27 self
._evt
= threading
.Event()
29 # health history tracking
30 self
._pending
_health
= []
31 self
._health
_slot
= None
33 def notify(self
, ttype
, ident
):
34 """Queue updates for processing"""
36 self
.log
.info("Received health check update {} pending".format(
37 len(self
._pending
_health
)))
38 health
= json
.loads(self
.get("health")["json"])
39 self
._pending
_health
.append(health
)
45 self
._evt
.wait(health_util
.PERSIST_PERIOD
.total_seconds())
50 # when the current health slot expires, finalize it by flushing it to
51 # the store, and initializing a new empty slot.
52 if self
._health
_slot
.expired():
53 self
.log
.info("Health history slot expired {}".format(
55 self
._health
_maybe
_flush
()
57 self
._health
_prune
_history
(HEALTH_RETENTION_HOURS
)
59 # fold in pending health snapshots and flush
60 self
.log
.info("Applying {} health updates to slot {}".format(
61 len(self
._pending
_health
), self
._health
_slot
))
62 for health
in self
._pending
_health
:
63 self
._health
_slot
.add(health
)
64 self
._pending
_health
= []
65 self
._health
_maybe
_flush
()
71 def _health_reset(self
):
72 """Initialize the current health slot
74 The slot will be initialized with any state found to have already been
75 persisted, otherwise the slot will start empty.
77 key
= health_util
.HealthHistorySlot
.curr_key()
78 data
= self
.get_store(key
)
80 init_health
= json
.loads(data
)
81 self
._health
_slot
= health_util
.HealthHistorySlot(init_health
)
83 self
._health
_slot
= health_util
.HealthHistorySlot()
84 self
.log
.info("Reset curr health slot {}".format(self
._health
_slot
))
86 def _health_maybe_flush(self
):
87 """Store the health for the current time slot if needed"""
89 self
.log
.info("Maybe flushing slot {} needed {}".format(
90 self
._health
_slot
, self
._health
_slot
.need_flush()))
92 if self
._health
_slot
.need_flush():
93 key
= self
._health
_slot
.key()
95 # build store data entry
96 slot
= self
._health
_slot
.health()
97 assert "version" not in slot
98 slot
.update(dict(version
= ON_DISK_VERSION
))
99 data
= json
.dumps(slot
, cls
=health_util
.HealthEncoder
)
101 self
.log
.debug("Storing health key {} data {}".format(
102 key
, json
.dumps(slot
, indent
=2, cls
=health_util
.HealthEncoder
)))
104 self
.set_store(key
, data
)
105 self
._health
_slot
.mark_flushed()
107 def _health_filter(self
, f
):
108 """Filter hourly health reports timestamp"""
110 lambda t
: f(health_util
.HealthHistorySlot
.key_to_time(t
[0])),
111 self
.get_store_prefix(health_util
.HEALTH_HISTORY_KEY_PREFIX
).items())
112 return map(lambda t
: t
[0], matches
)
114 def _health_prune_history(self
, hours
):
115 """Prune old health entries"""
116 cutoff
= datetime
.datetime
.utcnow() - datetime
.timedelta(hours
= hours
)
117 for key
in self
._health
_filter
(lambda ts
: ts
<= cutoff
):
118 self
.log
.info("Removing old health slot key {}".format(key
))
119 self
.set_store(key
, None)
121 self
._health
_slot
= health_util
.HealthHistorySlot()
123 def _health_report(self
, hours
):
125 Report a consolidated health report for the past N hours.
127 # roll up the past N hours of health info
128 collector
= health_util
.HealthHistorySlot()
129 keys
= health_util
.HealthHistorySlot
.key_range(hours
)
131 data
= self
.get_store(key
)
132 self
.log
.info("Reporting health key {} found {}".format(
134 health
= json
.loads(data
) if data
else {}
135 slot
= health_util
.HealthHistorySlot(health
)
136 collector
.merge(slot
)
138 # include history that hasn't yet been flushed
139 collector
.merge(self
._health
_slot
)
142 current
= json
.loads(self
.get("health")["json"]),
143 history
= collector
.health()
146 def _version_parse(self
, version
):
148 Return the components of a Ceph version string.
150 This returns nothing when the version string cannot be parsed into its
151 constituent components, such as when Ceph has been built with
152 ENABLE_GIT_VERSION=OFF.
154 r
= r
"ceph version (?P<release>\d+)\.(?P<major>\d+)\.(?P<minor>\d+)"
155 m
= re
.match(r
, version
)
156 ver
= {} if not m
else {
157 "release": m
.group("release"),
158 "major": m
.group("major"),
159 "minor": m
.group("minor")
161 return {k
: int(v
) for k
, v
in ver
.items()}
163 def _crash_history(self
, hours
):
165 Load crash history for the past N hours from the crash module.
168 prefix
="crash json_report",
174 hours
=params
["hours"],
177 health_check_details
= []
180 _
, _
, crashes
= self
.remote("crash", "handle_command", "", params
)
181 result
["summary"] = json
.loads(crashes
)
182 except Exception as e
:
183 errmsg
= "failed to invoke crash module"
184 self
.log
.warning("{}: {}".format(errmsg
, str(e
)))
185 health_check_details
.append(errmsg
)
187 self
.log
.debug("Crash module invocation succeeded {}".format(
188 json
.dumps(result
["summary"], indent
=2)))
190 return result
, health_check_details
192 def _apply_osd_stats(self
, osd_map
):
193 # map from osd id to its index in the map structure
195 for idx
in range(len(osd_map
["osds"])):
196 osd_id_to_idx
[osd_map
["osds"][idx
]["osd"]] = idx
198 # include stats, including space utilization performance counters.
199 # adapted from dashboard api controller
200 for s
in self
.get('osd_stats')['osd_stats']:
202 idx
= osd_id_to_idx
[s
["osd"]]
203 osd_map
["osds"][idx
].update({'osd_stats': s
})
204 except KeyError as e
:
205 self
.log
.warning("inconsistent api state: {}".format(str(e
)))
207 for osd
in osd_map
["osds"]:
209 for s
in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
210 osd
['stats'][s
.split('.')[1]] = self
.get_latest('osd', str(osd
["osd"]), s
)
213 def _config_dump(self
):
214 """Report cluster configuration
216 This report is the standard `config dump` report. It does not include
217 configuration defaults; these can be inferred from the version number.
219 result
= CommandResult("")
220 args
= dict(prefix
= "config dump", format
= "json")
221 self
.send_command(result
, "mon", "", json
.dumps(args
), "")
222 ret
, outb
, outs
= result
.wait()
224 return json
.loads(outb
), []
226 self
.log
.warning("send_command 'config dump' failed. \
227 ret={}, outs=\"{}\"".format(ret
, outs
))
228 return [], ["Failed to read monitor config dump"]
230 @CLIReadCommand('insights')
233 Retrieve insights report
235 health_check_details
= []
239 "version": dict(full
= self
.version
,
240 **self
._version
_parse
(self
.version
))
244 crashes
, health_details
= self
._crash
_history
(CRASH_HISTORY_HOURS
)
245 report
["crashes"] = crashes
246 health_check_details
.extend(health_details
)
249 report
["health"] = self
._health
_report
(HEALTH_HISTORY_HOURS
)
251 # cluster configuration
252 config
, health_details
= self
._config
_dump
()
253 report
["config"] = config
254 health_check_details
.extend(health_details
)
256 osd_map
= self
.get("osd_map")
257 del osd_map
['pg_temp']
258 self
._apply
_osd
_stats
(osd_map
)
259 report
["osd_dump"] = osd_map
261 report
["df"] = self
.get("df")
262 report
["osd_tree"] = self
.get("osd_map_tree")
263 report
["fs_map"] = self
.get("fs_map")
264 report
["crush_map"] = self
.get("osd_map_crush")
265 report
["mon_map"] = self
.get("mon_map")
266 report
["service_map"] = self
.get("service_map")
267 report
["manager_map"] = self
.get("mgr_map")
268 report
["mon_status"] = json
.loads(self
.get("mon_status")["json"])
269 report
["pg_summary"] = self
.get("pg_summary")
270 report
["osd_metadata"] = self
.get("osd_metadata")
273 "errors": health_check_details
276 if health_check_details
:
277 self
.set_health_checks({
278 INSIGHTS_HEALTH_CHECK
: {
279 "severity": "warning",
280 "summary": "Generated incomplete Insights report",
281 "detail": health_check_details
285 result
= json
.dumps(report
, indent
=2, cls
=health_util
.HealthEncoder
)
286 return HandleCommandResult(stdout
=result
)
288 @CLICommand('insights prune-health')
289 def do_prune_health(self
, hours
: int):
291 Remove health history older than <hours> hours
293 self
._health
_prune
_history
(hours
)
294 return HandleCommandResult()
296 def testing_set_now_time_offset(self
, hours
):
298 Control what "now" time it is by applying an offset. This is called from
299 the selftest module to manage testing scenarios related to tracking
303 health_util
.NOW_OFFSET
= datetime
.timedelta(hours
=hours
)
304 self
.log
.warning("Setting now time offset {}".format(health_util
.NOW_OFFSET
))