]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | import datetime |
2 | import json | |
3 | import re | |
4 | import threading | |
f67539c2 TL |
5 | |
6 | from mgr_module import CLICommand, CLIReadCommand, HandleCommandResult | |
20effc67 | 7 | from mgr_module import MgrModule, CommandResult, NotifyType |
11fdf7f2 TL |
8 | from . import health as health_util |
9 | ||
10 | # hours of crash history to report | |
11 | CRASH_HISTORY_HOURS = 24 | |
12 | # hours of health history to report | |
13 | HEALTH_HISTORY_HOURS = 24 | |
14 | # how many hours of health history to keep | |
15 | HEALTH_RETENTION_HOURS = 30 | |
16 | # health check name for insights health | |
17 | INSIGHTS_HEALTH_CHECK = "MGR_INSIGHTS_WARNING" | |
18 | # version tag for persistent data format | |
19 | ON_DISK_VERSION = 1 | |
20 | ||
11fdf7f2 | 21 | |
f67539c2 | 22 | class Module(MgrModule): |
20effc67 TL |
23 | |
24 | NOTIFY_TYPES = [NotifyType.health] | |
25 | ||
11fdf7f2 TL |
26 | def __init__(self, *args, **kwargs): |
27 | super(Module, self).__init__(*args, **kwargs) | |
28 | ||
29 | self._shutdown = False | |
30 | self._evt = threading.Event() | |
31 | ||
32 | # health history tracking | |
33 | self._pending_health = [] | |
34 | self._health_slot = None | |
522d829b TL |
35 | self._store = {} |
36 | ||
37 | # The following three functions, get_store, set_store, and get_store_prefix | |
38 | # mask the functions defined in the parent to avoid storing large keys | |
39 | # persistently to disk as that was proving problematic. Long term we may | |
40 | # implement a different mechanism to make these persistent. When that day | |
41 | # comes it should just be a matter of deleting these three functions. | |
42 | def get_store(self, key): | |
43 | return self._store.get(key) | |
44 | ||
45 | def set_store(self, key, value): | |
46 | if value is None: | |
47 | if key in self._store: | |
48 | del self._store[key] | |
49 | else: | |
50 | self._store[key] = value | |
51 | ||
52 | def get_store_prefix(self, prefix): | |
53 | return { k: v for k, v in self._store.items() if k.startswith(prefix) } | |
54 | ||
11fdf7f2 | 55 | |
20effc67 | 56 | def notify(self, ttype: NotifyType, ident): |
11fdf7f2 | 57 | """Queue updates for processing""" |
20effc67 | 58 | if ttype == NotifyType.health: |
11fdf7f2 TL |
59 | self.log.info("Received health check update {} pending".format( |
60 | len(self._pending_health))) | |
61 | health = json.loads(self.get("health")["json"]) | |
62 | self._pending_health.append(health) | |
63 | self._evt.set() | |
64 | ||
65 | def serve(self): | |
66 | self._health_reset() | |
67 | while True: | |
68 | self._evt.wait(health_util.PERSIST_PERIOD.total_seconds()) | |
69 | self._evt.clear() | |
70 | if self._shutdown: | |
71 | break | |
72 | ||
73 | # when the current health slot expires, finalize it by flushing it to | |
74 | # the store, and initializing a new empty slot. | |
75 | if self._health_slot.expired(): | |
76 | self.log.info("Health history slot expired {}".format( | |
77 | self._health_slot)) | |
78 | self._health_maybe_flush() | |
79 | self._health_reset() | |
80 | self._health_prune_history(HEALTH_RETENTION_HOURS) | |
81 | ||
82 | # fold in pending health snapshots and flush | |
83 | self.log.info("Applying {} health updates to slot {}".format( | |
84 | len(self._pending_health), self._health_slot)) | |
85 | for health in self._pending_health: | |
86 | self._health_slot.add(health) | |
87 | self._pending_health = [] | |
88 | self._health_maybe_flush() | |
89 | ||
90 | def shutdown(self): | |
91 | self._shutdown = True | |
92 | self._evt.set() | |
93 | ||
94 | def _health_reset(self): | |
95 | """Initialize the current health slot | |
96 | ||
97 | The slot will be initialized with any state found to have already been | |
98 | persisted, otherwise the slot will start empty. | |
99 | """ | |
100 | key = health_util.HealthHistorySlot.curr_key() | |
101 | data = self.get_store(key) | |
102 | if data: | |
103 | init_health = json.loads(data) | |
104 | self._health_slot = health_util.HealthHistorySlot(init_health) | |
105 | else: | |
106 | self._health_slot = health_util.HealthHistorySlot() | |
107 | self.log.info("Reset curr health slot {}".format(self._health_slot)) | |
108 | ||
109 | def _health_maybe_flush(self): | |
110 | """Store the health for the current time slot if needed""" | |
111 | ||
112 | self.log.info("Maybe flushing slot {} needed {}".format( | |
113 | self._health_slot, self._health_slot.need_flush())) | |
114 | ||
115 | if self._health_slot.need_flush(): | |
116 | key = self._health_slot.key() | |
117 | ||
118 | # build store data entry | |
119 | slot = self._health_slot.health() | |
120 | assert "version" not in slot | |
20effc67 | 121 | slot.update(dict(version=ON_DISK_VERSION)) |
11fdf7f2 TL |
122 | data = json.dumps(slot, cls=health_util.HealthEncoder) |
123 | ||
124 | self.log.debug("Storing health key {} data {}".format( | |
125 | key, json.dumps(slot, indent=2, cls=health_util.HealthEncoder))) | |
126 | ||
127 | self.set_store(key, data) | |
128 | self._health_slot.mark_flushed() | |
129 | ||
130 | def _health_filter(self, f): | |
131 | """Filter hourly health reports timestamp""" | |
132 | matches = filter( | |
133 | lambda t: f(health_util.HealthHistorySlot.key_to_time(t[0])), | |
f67539c2 | 134 | self.get_store_prefix(health_util.HEALTH_HISTORY_KEY_PREFIX).items()) |
11fdf7f2 TL |
135 | return map(lambda t: t[0], matches) |
136 | ||
137 | def _health_prune_history(self, hours): | |
138 | """Prune old health entries""" | |
20effc67 | 139 | cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=hours) |
11fdf7f2 TL |
140 | for key in self._health_filter(lambda ts: ts <= cutoff): |
141 | self.log.info("Removing old health slot key {}".format(key)) | |
142 | self.set_store(key, None) | |
9f95a23c TL |
143 | if not hours: |
144 | self._health_slot = health_util.HealthHistorySlot() | |
11fdf7f2 TL |
145 | |
146 | def _health_report(self, hours): | |
147 | """ | |
148 | Report a consolidated health report for the past N hours. | |
149 | """ | |
150 | # roll up the past N hours of health info | |
151 | collector = health_util.HealthHistorySlot() | |
152 | keys = health_util.HealthHistorySlot.key_range(hours) | |
153 | for key in keys: | |
154 | data = self.get_store(key) | |
155 | self.log.info("Reporting health key {} found {}".format( | |
156 | key, bool(data))) | |
157 | health = json.loads(data) if data else {} | |
158 | slot = health_util.HealthHistorySlot(health) | |
159 | collector.merge(slot) | |
160 | ||
161 | # include history that hasn't yet been flushed | |
162 | collector.merge(self._health_slot) | |
163 | ||
164 | return dict( | |
20effc67 TL |
165 | current=json.loads(self.get("health")["json"]), |
166 | history=collector.health() | |
11fdf7f2 TL |
167 | ) |
168 | ||
169 | def _version_parse(self, version): | |
170 | """ | |
171 | Return the components of a Ceph version string. | |
172 | ||
173 | This returns nothing when the version string cannot be parsed into its | |
174 | constituent components, such as when Ceph has been built with | |
175 | ENABLE_GIT_VERSION=OFF. | |
176 | """ | |
e306af50 | 177 | r = r"ceph version (?P<release>\d+)\.(?P<major>\d+)\.(?P<minor>\d+)" |
11fdf7f2 TL |
178 | m = re.match(r, version) |
179 | ver = {} if not m else { | |
180 | "release": m.group("release"), | |
181 | "major": m.group("major"), | |
182 | "minor": m.group("minor") | |
183 | } | |
f67539c2 | 184 | return {k: int(v) for k, v in ver.items()} |
11fdf7f2 TL |
185 | |
186 | def _crash_history(self, hours): | |
187 | """ | |
188 | Load crash history for the past N hours from the crash module. | |
189 | """ | |
11fdf7f2 | 190 | result = dict( |
f67539c2 | 191 | summary={}, |
20effc67 | 192 | hours=hours, |
11fdf7f2 TL |
193 | ) |
194 | ||
195 | health_check_details = [] | |
196 | ||
197 | try: | |
20effc67 | 198 | _, _, crashes = self.remote("crash", "do_json_report", hours) |
11fdf7f2 TL |
199 | result["summary"] = json.loads(crashes) |
200 | except Exception as e: | |
201 | errmsg = "failed to invoke crash module" | |
202 | self.log.warning("{}: {}".format(errmsg, str(e))) | |
203 | health_check_details.append(errmsg) | |
204 | else: | |
205 | self.log.debug("Crash module invocation succeeded {}".format( | |
206 | json.dumps(result["summary"], indent=2))) | |
207 | ||
208 | return result, health_check_details | |
209 | ||
210 | def _apply_osd_stats(self, osd_map): | |
211 | # map from osd id to its index in the map structure | |
212 | osd_id_to_idx = {} | |
213 | for idx in range(len(osd_map["osds"])): | |
214 | osd_id_to_idx[osd_map["osds"][idx]["osd"]] = idx | |
215 | ||
216 | # include stats, including space utilization performance counters. | |
217 | # adapted from dashboard api controller | |
218 | for s in self.get('osd_stats')['osd_stats']: | |
219 | try: | |
220 | idx = osd_id_to_idx[s["osd"]] | |
221 | osd_map["osds"][idx].update({'osd_stats': s}) | |
222 | except KeyError as e: | |
223 | self.log.warning("inconsistent api state: {}".format(str(e))) | |
224 | ||
225 | for osd in osd_map["osds"]: | |
226 | osd['stats'] = {} | |
227 | for s in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']: | |
228 | osd['stats'][s.split('.')[1]] = self.get_latest('osd', str(osd["osd"]), s) | |
229 | ||
11fdf7f2 TL |
230 | def _config_dump(self): |
231 | """Report cluster configuration | |
232 | ||
233 | This report is the standard `config dump` report. It does not include | |
234 | configuration defaults; these can be inferred from the version number. | |
235 | """ | |
236 | result = CommandResult("") | |
20effc67 | 237 | args = dict(prefix="config dump", format="json") |
11fdf7f2 TL |
238 | self.send_command(result, "mon", "", json.dumps(args), "") |
239 | ret, outb, outs = result.wait() | |
240 | if ret == 0: | |
241 | return json.loads(outb), [] | |
242 | else: | |
243 | self.log.warning("send_command 'config dump' failed. \ | |
244 | ret={}, outs=\"{}\"".format(ret, outs)) | |
245 | return [], ["Failed to read monitor config dump"] | |
246 | ||
f67539c2 TL |
247 | @CLIReadCommand('insights') |
248 | def do_report(self): | |
249 | ''' | |
250 | Retrieve insights report | |
251 | ''' | |
11fdf7f2 TL |
252 | health_check_details = [] |
253 | report = {} | |
254 | ||
255 | report.update({ | |
20effc67 TL |
256 | "version": dict(full=self.version, |
257 | **self._version_parse(self.version)) | |
11fdf7f2 TL |
258 | }) |
259 | ||
260 | # crash history | |
261 | crashes, health_details = self._crash_history(CRASH_HISTORY_HOURS) | |
262 | report["crashes"] = crashes | |
263 | health_check_details.extend(health_details) | |
264 | ||
265 | # health history | |
266 | report["health"] = self._health_report(HEALTH_HISTORY_HOURS) | |
267 | ||
268 | # cluster configuration | |
269 | config, health_details = self._config_dump() | |
270 | report["config"] = config | |
271 | health_check_details.extend(health_details) | |
272 | ||
273 | osd_map = self.get("osd_map") | |
274 | del osd_map['pg_temp'] | |
275 | self._apply_osd_stats(osd_map) | |
276 | report["osd_dump"] = osd_map | |
277 | ||
278 | report["df"] = self.get("df") | |
279 | report["osd_tree"] = self.get("osd_map_tree") | |
280 | report["fs_map"] = self.get("fs_map") | |
281 | report["crush_map"] = self.get("osd_map_crush") | |
282 | report["mon_map"] = self.get("mon_map") | |
283 | report["service_map"] = self.get("service_map") | |
284 | report["manager_map"] = self.get("mgr_map") | |
285 | report["mon_status"] = json.loads(self.get("mon_status")["json"]) | |
286 | report["pg_summary"] = self.get("pg_summary") | |
287 | report["osd_metadata"] = self.get("osd_metadata") | |
288 | ||
289 | report.update({ | |
290 | "errors": health_check_details | |
291 | }) | |
292 | ||
293 | if health_check_details: | |
294 | self.set_health_checks({ | |
295 | INSIGHTS_HEALTH_CHECK: { | |
296 | "severity": "warning", | |
297 | "summary": "Generated incomplete Insights report", | |
298 | "detail": health_check_details | |
299 | } | |
300 | }) | |
301 | ||
f67539c2 TL |
302 | result = json.dumps(report, indent=2, cls=health_util.HealthEncoder) |
303 | return HandleCommandResult(stdout=result) | |
11fdf7f2 | 304 | |
f67539c2 TL |
305 | @CLICommand('insights prune-health') |
306 | def do_prune_health(self, hours: int): | |
307 | ''' | |
308 | Remove health history older than <hours> hours | |
309 | ''' | |
11fdf7f2 | 310 | self._health_prune_history(hours) |
f67539c2 | 311 | return HandleCommandResult() |
11fdf7f2 TL |
312 | |
313 | def testing_set_now_time_offset(self, hours): | |
314 | """ | |
315 | Control what "now" time it is by applying an offset. This is called from | |
316 | the selftest module to manage testing scenarios related to tracking | |
317 | health history. | |
318 | """ | |
f67539c2 TL |
319 | hours = int(hours) |
320 | health_util.NOW_OFFSET = datetime.timedelta(hours=hours) | |
11fdf7f2 | 321 | self.log.warning("Setting now time offset {}".format(health_util.NOW_OFFSET)) |