]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/insights/module.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / insights / module.py
CommitLineData
11fdf7f2
TL
1import datetime
2import json
3import re
4import threading
5import six
6from mgr_module import MgrModule, CommandResult
7from . import health as health_util
8
9# hours of crash history to report
10CRASH_HISTORY_HOURS = 24
11# hours of health history to report
12HEALTH_HISTORY_HOURS = 24
13# how many hours of health history to keep
14HEALTH_RETENTION_HOURS = 30
15# health check name for insights health
16INSIGHTS_HEALTH_CHECK = "MGR_INSIGHTS_WARNING"
17# version tag for persistent data format
18ON_DISK_VERSION = 1
19
20class Module(MgrModule):
21 COMMANDS = [
22 {
23 "cmd": "insights",
24 "desc": "Retrieve insights report",
25 "perm": "r",
26 "poll": "false",
27 },
28 {
29 'cmd': 'insights prune-health name=hours,type=CephString',
30 'desc': 'Remove health history older than <hours> hours',
31 'perm': 'rw',
32 "poll": "false",
33 },
34 ]
35
36 def __init__(self, *args, **kwargs):
37 super(Module, self).__init__(*args, **kwargs)
38
39 self._shutdown = False
40 self._evt = threading.Event()
41
42 # health history tracking
43 self._pending_health = []
44 self._health_slot = None
45
46 def notify(self, ttype, ident):
47 """Queue updates for processing"""
48 if ttype == "health":
49 self.log.info("Received health check update {} pending".format(
50 len(self._pending_health)))
51 health = json.loads(self.get("health")["json"])
52 self._pending_health.append(health)
53 self._evt.set()
54
55 def serve(self):
56 self._health_reset()
57 while True:
58 self._evt.wait(health_util.PERSIST_PERIOD.total_seconds())
59 self._evt.clear()
60 if self._shutdown:
61 break
62
63 # when the current health slot expires, finalize it by flushing it to
64 # the store, and initializing a new empty slot.
65 if self._health_slot.expired():
66 self.log.info("Health history slot expired {}".format(
67 self._health_slot))
68 self._health_maybe_flush()
69 self._health_reset()
70 self._health_prune_history(HEALTH_RETENTION_HOURS)
71
72 # fold in pending health snapshots and flush
73 self.log.info("Applying {} health updates to slot {}".format(
74 len(self._pending_health), self._health_slot))
75 for health in self._pending_health:
76 self._health_slot.add(health)
77 self._pending_health = []
78 self._health_maybe_flush()
79
80 def shutdown(self):
81 self._shutdown = True
82 self._evt.set()
83
84 def _health_reset(self):
85 """Initialize the current health slot
86
87 The slot will be initialized with any state found to have already been
88 persisted, otherwise the slot will start empty.
89 """
90 key = health_util.HealthHistorySlot.curr_key()
91 data = self.get_store(key)
92 if data:
93 init_health = json.loads(data)
94 self._health_slot = health_util.HealthHistorySlot(init_health)
95 else:
96 self._health_slot = health_util.HealthHistorySlot()
97 self.log.info("Reset curr health slot {}".format(self._health_slot))
98
99 def _health_maybe_flush(self):
100 """Store the health for the current time slot if needed"""
101
102 self.log.info("Maybe flushing slot {} needed {}".format(
103 self._health_slot, self._health_slot.need_flush()))
104
105 if self._health_slot.need_flush():
106 key = self._health_slot.key()
107
108 # build store data entry
109 slot = self._health_slot.health()
110 assert "version" not in slot
111 slot.update(dict(version = ON_DISK_VERSION))
112 data = json.dumps(slot, cls=health_util.HealthEncoder)
113
114 self.log.debug("Storing health key {} data {}".format(
115 key, json.dumps(slot, indent=2, cls=health_util.HealthEncoder)))
116
117 self.set_store(key, data)
118 self._health_slot.mark_flushed()
119
120 def _health_filter(self, f):
121 """Filter hourly health reports timestamp"""
122 matches = filter(
123 lambda t: f(health_util.HealthHistorySlot.key_to_time(t[0])),
124 six.iteritems(self.get_store_prefix(health_util.HEALTH_HISTORY_KEY_PREFIX)))
125 return map(lambda t: t[0], matches)
126
127 def _health_prune_history(self, hours):
128 """Prune old health entries"""
129 cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours = hours)
130 for key in self._health_filter(lambda ts: ts <= cutoff):
131 self.log.info("Removing old health slot key {}".format(key))
132 self.set_store(key, None)
9f95a23c
TL
133 if not hours:
134 self._health_slot = health_util.HealthHistorySlot()
11fdf7f2
TL
135
136 def _health_report(self, hours):
137 """
138 Report a consolidated health report for the past N hours.
139 """
140 # roll up the past N hours of health info
141 collector = health_util.HealthHistorySlot()
142 keys = health_util.HealthHistorySlot.key_range(hours)
143 for key in keys:
144 data = self.get_store(key)
145 self.log.info("Reporting health key {} found {}".format(
146 key, bool(data)))
147 health = json.loads(data) if data else {}
148 slot = health_util.HealthHistorySlot(health)
149 collector.merge(slot)
150
151 # include history that hasn't yet been flushed
152 collector.merge(self._health_slot)
153
154 return dict(
155 current = json.loads(self.get("health")["json"]),
156 history = collector.health()
157 )
158
159 def _version_parse(self, version):
160 """
161 Return the components of a Ceph version string.
162
163 This returns nothing when the version string cannot be parsed into its
164 constituent components, such as when Ceph has been built with
165 ENABLE_GIT_VERSION=OFF.
166 """
e306af50 167 r = r"ceph version (?P<release>\d+)\.(?P<major>\d+)\.(?P<minor>\d+)"
11fdf7f2
TL
168 m = re.match(r, version)
169 ver = {} if not m else {
170 "release": m.group("release"),
171 "major": m.group("major"),
172 "minor": m.group("minor")
173 }
174 return { k:int(v) for k,v in six.iteritems(ver) }
175
176 def _crash_history(self, hours):
177 """
178 Load crash history for the past N hours from the crash module.
179 """
180 params = dict(
181 prefix = "crash json_report",
182 hours = hours
183 )
184
185 result = dict(
186 summary = {},
187 hours = params["hours"],
188 )
189
190 health_check_details = []
191
192 try:
193 _, _, crashes = self.remote("crash", "handle_command", "", params)
194 result["summary"] = json.loads(crashes)
195 except Exception as e:
196 errmsg = "failed to invoke crash module"
197 self.log.warning("{}: {}".format(errmsg, str(e)))
198 health_check_details.append(errmsg)
199 else:
200 self.log.debug("Crash module invocation succeeded {}".format(
201 json.dumps(result["summary"], indent=2)))
202
203 return result, health_check_details
204
205 def _apply_osd_stats(self, osd_map):
206 # map from osd id to its index in the map structure
207 osd_id_to_idx = {}
208 for idx in range(len(osd_map["osds"])):
209 osd_id_to_idx[osd_map["osds"][idx]["osd"]] = idx
210
211 # include stats, including space utilization performance counters.
212 # adapted from dashboard api controller
213 for s in self.get('osd_stats')['osd_stats']:
214 try:
215 idx = osd_id_to_idx[s["osd"]]
216 osd_map["osds"][idx].update({'osd_stats': s})
217 except KeyError as e:
218 self.log.warning("inconsistent api state: {}".format(str(e)))
219
220 for osd in osd_map["osds"]:
221 osd['stats'] = {}
222 for s in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
223 osd['stats'][s.split('.')[1]] = self.get_latest('osd', str(osd["osd"]), s)
224
225
226 def _config_dump(self):
227 """Report cluster configuration
228
229 This report is the standard `config dump` report. It does not include
230 configuration defaults; these can be inferred from the version number.
231 """
232 result = CommandResult("")
233 args = dict(prefix = "config dump", format = "json")
234 self.send_command(result, "mon", "", json.dumps(args), "")
235 ret, outb, outs = result.wait()
236 if ret == 0:
237 return json.loads(outb), []
238 else:
239 self.log.warning("send_command 'config dump' failed. \
240 ret={}, outs=\"{}\"".format(ret, outs))
241 return [], ["Failed to read monitor config dump"]
242
243 def do_report(self, inbuf, command):
244 health_check_details = []
245 report = {}
246
247 report.update({
248 "version": dict(full = self.version,
249 **self._version_parse(self.version))
250 })
251
252 # crash history
253 crashes, health_details = self._crash_history(CRASH_HISTORY_HOURS)
254 report["crashes"] = crashes
255 health_check_details.extend(health_details)
256
257 # health history
258 report["health"] = self._health_report(HEALTH_HISTORY_HOURS)
259
260 # cluster configuration
261 config, health_details = self._config_dump()
262 report["config"] = config
263 health_check_details.extend(health_details)
264
265 osd_map = self.get("osd_map")
266 del osd_map['pg_temp']
267 self._apply_osd_stats(osd_map)
268 report["osd_dump"] = osd_map
269
270 report["df"] = self.get("df")
271 report["osd_tree"] = self.get("osd_map_tree")
272 report["fs_map"] = self.get("fs_map")
273 report["crush_map"] = self.get("osd_map_crush")
274 report["mon_map"] = self.get("mon_map")
275 report["service_map"] = self.get("service_map")
276 report["manager_map"] = self.get("mgr_map")
277 report["mon_status"] = json.loads(self.get("mon_status")["json"])
278 report["pg_summary"] = self.get("pg_summary")
279 report["osd_metadata"] = self.get("osd_metadata")
280
281 report.update({
282 "errors": health_check_details
283 })
284
285 if health_check_details:
286 self.set_health_checks({
287 INSIGHTS_HEALTH_CHECK: {
288 "severity": "warning",
289 "summary": "Generated incomplete Insights report",
290 "detail": health_check_details
291 }
292 })
293
294 return 0, json.dumps(report, indent=2, cls=health_util.HealthEncoder), ""
295
296 def do_prune_health(self, inbuf, command):
297 try:
298 hours = int(command['hours'])
299 except ValueError:
300 return errno.EINVAL, '', 'hours argument must be integer'
301
302 self._health_prune_history(hours)
303
304 return 0, "", ""
305
306 def testing_set_now_time_offset(self, hours):
307 """
308 Control what "now" time it is by applying an offset. This is called from
309 the selftest module to manage testing scenarios related to tracking
310 health history.
311 """
9f95a23c
TL
312 try:
313 hours = long(hours)
314 except NameError:
315 hours = int(hours)
11fdf7f2
TL
316 health_util.NOW_OFFSET = datetime.timedelta(hours = hours)
317 self.log.warning("Setting now time offset {}".format(health_util.NOW_OFFSET))
318
319 def handle_command(self, inbuf, command):
320 if command["prefix"] == "insights":
321 return self.do_report(inbuf, command)
322 elif command["prefix"] == "insights prune-health":
323 return self.do_prune_health(inbuf, command)
324 else:
325 raise NotImplementedError(cmd["prefix"])