]>
Commit | Line | Data |
---|---|---|
1 | import datetime | |
2 | import json | |
3 | import re | |
4 | import threading | |
5 | import six | |
6 | from mgr_module import MgrModule, CommandResult | |
7 | from . import health as health_util | |
8 | ||
9 | # hours of crash history to report | |
10 | CRASH_HISTORY_HOURS = 24 | |
11 | # hours of health history to report | |
12 | HEALTH_HISTORY_HOURS = 24 | |
13 | # how many hours of health history to keep | |
14 | HEALTH_RETENTION_HOURS = 30 | |
15 | # health check name for insights health | |
16 | INSIGHTS_HEALTH_CHECK = "MGR_INSIGHTS_WARNING" | |
17 | # version tag for persistent data format | |
18 | ON_DISK_VERSION = 1 | |
19 | ||
20 | class Module(MgrModule): | |
21 | COMMANDS = [ | |
22 | { | |
23 | "cmd": "insights", | |
24 | "desc": "Retrieve insights report", | |
25 | "perm": "r", | |
26 | "poll": "false", | |
27 | }, | |
28 | { | |
29 | 'cmd': 'insights prune-health name=hours,type=CephString', | |
30 | 'desc': 'Remove health history older than <hours> hours', | |
31 | 'perm': 'rw', | |
32 | "poll": "false", | |
33 | }, | |
34 | ] | |
35 | ||
36 | def __init__(self, *args, **kwargs): | |
37 | super(Module, self).__init__(*args, **kwargs) | |
38 | ||
39 | self._shutdown = False | |
40 | self._evt = threading.Event() | |
41 | ||
42 | # health history tracking | |
43 | self._pending_health = [] | |
44 | self._health_slot = None | |
45 | ||
46 | def notify(self, ttype, ident): | |
47 | """Queue updates for processing""" | |
48 | if ttype == "health": | |
49 | self.log.info("Received health check update {} pending".format( | |
50 | len(self._pending_health))) | |
51 | health = json.loads(self.get("health")["json"]) | |
52 | self._pending_health.append(health) | |
53 | self._evt.set() | |
54 | ||
55 | def serve(self): | |
56 | self._health_reset() | |
57 | while True: | |
58 | self._evt.wait(health_util.PERSIST_PERIOD.total_seconds()) | |
59 | self._evt.clear() | |
60 | if self._shutdown: | |
61 | break | |
62 | ||
63 | # when the current health slot expires, finalize it by flushing it to | |
64 | # the store, and initializing a new empty slot. | |
65 | if self._health_slot.expired(): | |
66 | self.log.info("Health history slot expired {}".format( | |
67 | self._health_slot)) | |
68 | self._health_maybe_flush() | |
69 | self._health_reset() | |
70 | self._health_prune_history(HEALTH_RETENTION_HOURS) | |
71 | ||
72 | # fold in pending health snapshots and flush | |
73 | self.log.info("Applying {} health updates to slot {}".format( | |
74 | len(self._pending_health), self._health_slot)) | |
75 | for health in self._pending_health: | |
76 | self._health_slot.add(health) | |
77 | self._pending_health = [] | |
78 | self._health_maybe_flush() | |
79 | ||
80 | def shutdown(self): | |
81 | self._shutdown = True | |
82 | self._evt.set() | |
83 | ||
84 | def _health_reset(self): | |
85 | """Initialize the current health slot | |
86 | ||
87 | The slot will be initialized with any state found to have already been | |
88 | persisted, otherwise the slot will start empty. | |
89 | """ | |
90 | key = health_util.HealthHistorySlot.curr_key() | |
91 | data = self.get_store(key) | |
92 | if data: | |
93 | init_health = json.loads(data) | |
94 | self._health_slot = health_util.HealthHistorySlot(init_health) | |
95 | else: | |
96 | self._health_slot = health_util.HealthHistorySlot() | |
97 | self.log.info("Reset curr health slot {}".format(self._health_slot)) | |
98 | ||
99 | def _health_maybe_flush(self): | |
100 | """Store the health for the current time slot if needed""" | |
101 | ||
102 | self.log.info("Maybe flushing slot {} needed {}".format( | |
103 | self._health_slot, self._health_slot.need_flush())) | |
104 | ||
105 | if self._health_slot.need_flush(): | |
106 | key = self._health_slot.key() | |
107 | ||
108 | # build store data entry | |
109 | slot = self._health_slot.health() | |
110 | assert "version" not in slot | |
111 | slot.update(dict(version = ON_DISK_VERSION)) | |
112 | data = json.dumps(slot, cls=health_util.HealthEncoder) | |
113 | ||
114 | self.log.debug("Storing health key {} data {}".format( | |
115 | key, json.dumps(slot, indent=2, cls=health_util.HealthEncoder))) | |
116 | ||
117 | self.set_store(key, data) | |
118 | self._health_slot.mark_flushed() | |
119 | ||
120 | def _health_filter(self, f): | |
121 | """Filter hourly health reports timestamp""" | |
122 | matches = filter( | |
123 | lambda t: f(health_util.HealthHistorySlot.key_to_time(t[0])), | |
124 | six.iteritems(self.get_store_prefix(health_util.HEALTH_HISTORY_KEY_PREFIX))) | |
125 | return map(lambda t: t[0], matches) | |
126 | ||
127 | def _health_prune_history(self, hours): | |
128 | """Prune old health entries""" | |
129 | cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours = hours) | |
130 | for key in self._health_filter(lambda ts: ts <= cutoff): | |
131 | self.log.info("Removing old health slot key {}".format(key)) | |
132 | self.set_store(key, None) | |
133 | if not hours: | |
134 | self._health_slot = health_util.HealthHistorySlot() | |
135 | ||
136 | def _health_report(self, hours): | |
137 | """ | |
138 | Report a consolidated health report for the past N hours. | |
139 | """ | |
140 | # roll up the past N hours of health info | |
141 | collector = health_util.HealthHistorySlot() | |
142 | keys = health_util.HealthHistorySlot.key_range(hours) | |
143 | for key in keys: | |
144 | data = self.get_store(key) | |
145 | self.log.info("Reporting health key {} found {}".format( | |
146 | key, bool(data))) | |
147 | health = json.loads(data) if data else {} | |
148 | slot = health_util.HealthHistorySlot(health) | |
149 | collector.merge(slot) | |
150 | ||
151 | # include history that hasn't yet been flushed | |
152 | collector.merge(self._health_slot) | |
153 | ||
154 | return dict( | |
155 | current = json.loads(self.get("health")["json"]), | |
156 | history = collector.health() | |
157 | ) | |
158 | ||
159 | def _version_parse(self, version): | |
160 | """ | |
161 | Return the components of a Ceph version string. | |
162 | ||
163 | This returns nothing when the version string cannot be parsed into its | |
164 | constituent components, such as when Ceph has been built with | |
165 | ENABLE_GIT_VERSION=OFF. | |
166 | """ | |
167 | r = r"ceph version (?P<release>\d+)\.(?P<major>\d+)\.(?P<minor>\d+)" | |
168 | m = re.match(r, version) | |
169 | ver = {} if not m else { | |
170 | "release": m.group("release"), | |
171 | "major": m.group("major"), | |
172 | "minor": m.group("minor") | |
173 | } | |
174 | return { k:int(v) for k,v in six.iteritems(ver) } | |
175 | ||
176 | def _crash_history(self, hours): | |
177 | """ | |
178 | Load crash history for the past N hours from the crash module. | |
179 | """ | |
180 | params = dict( | |
181 | prefix = "crash json_report", | |
182 | hours = hours | |
183 | ) | |
184 | ||
185 | result = dict( | |
186 | summary = {}, | |
187 | hours = params["hours"], | |
188 | ) | |
189 | ||
190 | health_check_details = [] | |
191 | ||
192 | try: | |
193 | _, _, crashes = self.remote("crash", "handle_command", "", params) | |
194 | result["summary"] = json.loads(crashes) | |
195 | except Exception as e: | |
196 | errmsg = "failed to invoke crash module" | |
197 | self.log.warning("{}: {}".format(errmsg, str(e))) | |
198 | health_check_details.append(errmsg) | |
199 | else: | |
200 | self.log.debug("Crash module invocation succeeded {}".format( | |
201 | json.dumps(result["summary"], indent=2))) | |
202 | ||
203 | return result, health_check_details | |
204 | ||
205 | def _apply_osd_stats(self, osd_map): | |
206 | # map from osd id to its index in the map structure | |
207 | osd_id_to_idx = {} | |
208 | for idx in range(len(osd_map["osds"])): | |
209 | osd_id_to_idx[osd_map["osds"][idx]["osd"]] = idx | |
210 | ||
211 | # include stats, including space utilization performance counters. | |
212 | # adapted from dashboard api controller | |
213 | for s in self.get('osd_stats')['osd_stats']: | |
214 | try: | |
215 | idx = osd_id_to_idx[s["osd"]] | |
216 | osd_map["osds"][idx].update({'osd_stats': s}) | |
217 | except KeyError as e: | |
218 | self.log.warning("inconsistent api state: {}".format(str(e))) | |
219 | ||
220 | for osd in osd_map["osds"]: | |
221 | osd['stats'] = {} | |
222 | for s in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']: | |
223 | osd['stats'][s.split('.')[1]] = self.get_latest('osd', str(osd["osd"]), s) | |
224 | ||
225 | ||
226 | def _config_dump(self): | |
227 | """Report cluster configuration | |
228 | ||
229 | This report is the standard `config dump` report. It does not include | |
230 | configuration defaults; these can be inferred from the version number. | |
231 | """ | |
232 | result = CommandResult("") | |
233 | args = dict(prefix = "config dump", format = "json") | |
234 | self.send_command(result, "mon", "", json.dumps(args), "") | |
235 | ret, outb, outs = result.wait() | |
236 | if ret == 0: | |
237 | return json.loads(outb), [] | |
238 | else: | |
239 | self.log.warning("send_command 'config dump' failed. \ | |
240 | ret={}, outs=\"{}\"".format(ret, outs)) | |
241 | return [], ["Failed to read monitor config dump"] | |
242 | ||
243 | def do_report(self, inbuf, command): | |
244 | health_check_details = [] | |
245 | report = {} | |
246 | ||
247 | report.update({ | |
248 | "version": dict(full = self.version, | |
249 | **self._version_parse(self.version)) | |
250 | }) | |
251 | ||
252 | # crash history | |
253 | crashes, health_details = self._crash_history(CRASH_HISTORY_HOURS) | |
254 | report["crashes"] = crashes | |
255 | health_check_details.extend(health_details) | |
256 | ||
257 | # health history | |
258 | report["health"] = self._health_report(HEALTH_HISTORY_HOURS) | |
259 | ||
260 | # cluster configuration | |
261 | config, health_details = self._config_dump() | |
262 | report["config"] = config | |
263 | health_check_details.extend(health_details) | |
264 | ||
265 | osd_map = self.get("osd_map") | |
266 | del osd_map['pg_temp'] | |
267 | self._apply_osd_stats(osd_map) | |
268 | report["osd_dump"] = osd_map | |
269 | ||
270 | report["df"] = self.get("df") | |
271 | report["osd_tree"] = self.get("osd_map_tree") | |
272 | report["fs_map"] = self.get("fs_map") | |
273 | report["crush_map"] = self.get("osd_map_crush") | |
274 | report["mon_map"] = self.get("mon_map") | |
275 | report["service_map"] = self.get("service_map") | |
276 | report["manager_map"] = self.get("mgr_map") | |
277 | report["mon_status"] = json.loads(self.get("mon_status")["json"]) | |
278 | report["pg_summary"] = self.get("pg_summary") | |
279 | report["osd_metadata"] = self.get("osd_metadata") | |
280 | ||
281 | report.update({ | |
282 | "errors": health_check_details | |
283 | }) | |
284 | ||
285 | if health_check_details: | |
286 | self.set_health_checks({ | |
287 | INSIGHTS_HEALTH_CHECK: { | |
288 | "severity": "warning", | |
289 | "summary": "Generated incomplete Insights report", | |
290 | "detail": health_check_details | |
291 | } | |
292 | }) | |
293 | ||
294 | return 0, json.dumps(report, indent=2, cls=health_util.HealthEncoder), "" | |
295 | ||
296 | def do_prune_health(self, inbuf, command): | |
297 | try: | |
298 | hours = int(command['hours']) | |
299 | except ValueError: | |
300 | return errno.EINVAL, '', 'hours argument must be integer' | |
301 | ||
302 | self._health_prune_history(hours) | |
303 | ||
304 | return 0, "", "" | |
305 | ||
306 | def testing_set_now_time_offset(self, hours): | |
307 | """ | |
308 | Control what "now" time it is by applying an offset. This is called from | |
309 | the selftest module to manage testing scenarios related to tracking | |
310 | health history. | |
311 | """ | |
312 | try: | |
313 | hours = long(hours) | |
314 | except NameError: | |
315 | hours = int(hours) | |
316 | health_util.NOW_OFFSET = datetime.timedelta(hours = hours) | |
317 | self.log.warning("Setting now time offset {}".format(health_util.NOW_OFFSET)) | |
318 | ||
319 | def handle_command(self, inbuf, command): | |
320 | if command["prefix"] == "insights": | |
321 | return self.do_report(inbuf, command) | |
322 | elif command["prefix"] == "insights prune-health": | |
323 | return self.do_prune_health(inbuf, command) | |
324 | else: | |
325 | raise NotImplementedError(cmd["prefix"]) |