]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/insights/module.py
update download target update for octopus release
[ceph.git] / ceph / src / pybind / mgr / insights / module.py
CommitLineData
11fdf7f2
TL
1import datetime
2import json
3import re
4import threading
5import six
6from mgr_module import MgrModule, CommandResult
7from . import health as health_util
8
9# hours of crash history to report
10CRASH_HISTORY_HOURS = 24
11# hours of health history to report
12HEALTH_HISTORY_HOURS = 24
13# how many hours of health history to keep
14HEALTH_RETENTION_HOURS = 30
15# health check name for insights health
16INSIGHTS_HEALTH_CHECK = "MGR_INSIGHTS_WARNING"
17# version tag for persistent data format
18ON_DISK_VERSION = 1
19
20class Module(MgrModule):
21 COMMANDS = [
22 {
23 "cmd": "insights",
24 "desc": "Retrieve insights report",
25 "perm": "r",
26 "poll": "false",
27 },
28 {
29 'cmd': 'insights prune-health name=hours,type=CephString',
30 'desc': 'Remove health history older than <hours> hours',
31 'perm': 'rw',
32 "poll": "false",
33 },
34 ]
35
36 def __init__(self, *args, **kwargs):
37 super(Module, self).__init__(*args, **kwargs)
38
39 self._shutdown = False
40 self._evt = threading.Event()
41
42 # health history tracking
43 self._pending_health = []
44 self._health_slot = None
45
46 def notify(self, ttype, ident):
47 """Queue updates for processing"""
48 if ttype == "health":
49 self.log.info("Received health check update {} pending".format(
50 len(self._pending_health)))
51 health = json.loads(self.get("health")["json"])
52 self._pending_health.append(health)
53 self._evt.set()
54
55 def serve(self):
56 self._health_reset()
57 while True:
58 self._evt.wait(health_util.PERSIST_PERIOD.total_seconds())
59 self._evt.clear()
60 if self._shutdown:
61 break
62
63 # when the current health slot expires, finalize it by flushing it to
64 # the store, and initializing a new empty slot.
65 if self._health_slot.expired():
66 self.log.info("Health history slot expired {}".format(
67 self._health_slot))
68 self._health_maybe_flush()
69 self._health_reset()
70 self._health_prune_history(HEALTH_RETENTION_HOURS)
71
72 # fold in pending health snapshots and flush
73 self.log.info("Applying {} health updates to slot {}".format(
74 len(self._pending_health), self._health_slot))
75 for health in self._pending_health:
76 self._health_slot.add(health)
77 self._pending_health = []
78 self._health_maybe_flush()
79
80 def shutdown(self):
81 self._shutdown = True
82 self._evt.set()
83
84 def _health_reset(self):
85 """Initialize the current health slot
86
87 The slot will be initialized with any state found to have already been
88 persisted, otherwise the slot will start empty.
89 """
90 key = health_util.HealthHistorySlot.curr_key()
91 data = self.get_store(key)
92 if data:
93 init_health = json.loads(data)
94 self._health_slot = health_util.HealthHistorySlot(init_health)
95 else:
96 self._health_slot = health_util.HealthHistorySlot()
97 self.log.info("Reset curr health slot {}".format(self._health_slot))
98
99 def _health_maybe_flush(self):
100 """Store the health for the current time slot if needed"""
101
102 self.log.info("Maybe flushing slot {} needed {}".format(
103 self._health_slot, self._health_slot.need_flush()))
104
105 if self._health_slot.need_flush():
106 key = self._health_slot.key()
107
108 # build store data entry
109 slot = self._health_slot.health()
110 assert "version" not in slot
111 slot.update(dict(version = ON_DISK_VERSION))
112 data = json.dumps(slot, cls=health_util.HealthEncoder)
113
114 self.log.debug("Storing health key {} data {}".format(
115 key, json.dumps(slot, indent=2, cls=health_util.HealthEncoder)))
116
117 self.set_store(key, data)
118 self._health_slot.mark_flushed()
119
120 def _health_filter(self, f):
121 """Filter hourly health reports timestamp"""
122 matches = filter(
123 lambda t: f(health_util.HealthHistorySlot.key_to_time(t[0])),
124 six.iteritems(self.get_store_prefix(health_util.HEALTH_HISTORY_KEY_PREFIX)))
125 return map(lambda t: t[0], matches)
126
127 def _health_prune_history(self, hours):
128 """Prune old health entries"""
129 cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours = hours)
130 for key in self._health_filter(lambda ts: ts <= cutoff):
131 self.log.info("Removing old health slot key {}".format(key))
132 self.set_store(key, None)
133
134 def _health_report(self, hours):
135 """
136 Report a consolidated health report for the past N hours.
137 """
138 # roll up the past N hours of health info
139 collector = health_util.HealthHistorySlot()
140 keys = health_util.HealthHistorySlot.key_range(hours)
141 for key in keys:
142 data = self.get_store(key)
143 self.log.info("Reporting health key {} found {}".format(
144 key, bool(data)))
145 health = json.loads(data) if data else {}
146 slot = health_util.HealthHistorySlot(health)
147 collector.merge(slot)
148
149 # include history that hasn't yet been flushed
150 collector.merge(self._health_slot)
151
152 return dict(
153 current = json.loads(self.get("health")["json"]),
154 history = collector.health()
155 )
156
157 def _version_parse(self, version):
158 """
159 Return the components of a Ceph version string.
160
161 This returns nothing when the version string cannot be parsed into its
162 constituent components, such as when Ceph has been built with
163 ENABLE_GIT_VERSION=OFF.
164 """
165 r = "ceph version (?P<release>\d+)\.(?P<major>\d+)\.(?P<minor>\d+)"
166 m = re.match(r, version)
167 ver = {} if not m else {
168 "release": m.group("release"),
169 "major": m.group("major"),
170 "minor": m.group("minor")
171 }
172 return { k:int(v) for k,v in six.iteritems(ver) }
173
174 def _crash_history(self, hours):
175 """
176 Load crash history for the past N hours from the crash module.
177 """
178 params = dict(
179 prefix = "crash json_report",
180 hours = hours
181 )
182
183 result = dict(
184 summary = {},
185 hours = params["hours"],
186 )
187
188 health_check_details = []
189
190 try:
191 _, _, crashes = self.remote("crash", "handle_command", "", params)
192 result["summary"] = json.loads(crashes)
193 except Exception as e:
194 errmsg = "failed to invoke crash module"
195 self.log.warning("{}: {}".format(errmsg, str(e)))
196 health_check_details.append(errmsg)
197 else:
198 self.log.debug("Crash module invocation succeeded {}".format(
199 json.dumps(result["summary"], indent=2)))
200
201 return result, health_check_details
202
203 def _apply_osd_stats(self, osd_map):
204 # map from osd id to its index in the map structure
205 osd_id_to_idx = {}
206 for idx in range(len(osd_map["osds"])):
207 osd_id_to_idx[osd_map["osds"][idx]["osd"]] = idx
208
209 # include stats, including space utilization performance counters.
210 # adapted from dashboard api controller
211 for s in self.get('osd_stats')['osd_stats']:
212 try:
213 idx = osd_id_to_idx[s["osd"]]
214 osd_map["osds"][idx].update({'osd_stats': s})
215 except KeyError as e:
216 self.log.warning("inconsistent api state: {}".format(str(e)))
217
218 for osd in osd_map["osds"]:
219 osd['stats'] = {}
220 for s in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
221 osd['stats'][s.split('.')[1]] = self.get_latest('osd', str(osd["osd"]), s)
222
223
224 def _config_dump(self):
225 """Report cluster configuration
226
227 This report is the standard `config dump` report. It does not include
228 configuration defaults; these can be inferred from the version number.
229 """
230 result = CommandResult("")
231 args = dict(prefix = "config dump", format = "json")
232 self.send_command(result, "mon", "", json.dumps(args), "")
233 ret, outb, outs = result.wait()
234 if ret == 0:
235 return json.loads(outb), []
236 else:
237 self.log.warning("send_command 'config dump' failed. \
238 ret={}, outs=\"{}\"".format(ret, outs))
239 return [], ["Failed to read monitor config dump"]
240
241 def do_report(self, inbuf, command):
242 health_check_details = []
243 report = {}
244
245 report.update({
246 "version": dict(full = self.version,
247 **self._version_parse(self.version))
248 })
249
250 # crash history
251 crashes, health_details = self._crash_history(CRASH_HISTORY_HOURS)
252 report["crashes"] = crashes
253 health_check_details.extend(health_details)
254
255 # health history
256 report["health"] = self._health_report(HEALTH_HISTORY_HOURS)
257
258 # cluster configuration
259 config, health_details = self._config_dump()
260 report["config"] = config
261 health_check_details.extend(health_details)
262
263 osd_map = self.get("osd_map")
264 del osd_map['pg_temp']
265 self._apply_osd_stats(osd_map)
266 report["osd_dump"] = osd_map
267
268 report["df"] = self.get("df")
269 report["osd_tree"] = self.get("osd_map_tree")
270 report["fs_map"] = self.get("fs_map")
271 report["crush_map"] = self.get("osd_map_crush")
272 report["mon_map"] = self.get("mon_map")
273 report["service_map"] = self.get("service_map")
274 report["manager_map"] = self.get("mgr_map")
275 report["mon_status"] = json.loads(self.get("mon_status")["json"])
276 report["pg_summary"] = self.get("pg_summary")
277 report["osd_metadata"] = self.get("osd_metadata")
278
279 report.update({
280 "errors": health_check_details
281 })
282
283 if health_check_details:
284 self.set_health_checks({
285 INSIGHTS_HEALTH_CHECK: {
286 "severity": "warning",
287 "summary": "Generated incomplete Insights report",
288 "detail": health_check_details
289 }
290 })
291
292 return 0, json.dumps(report, indent=2, cls=health_util.HealthEncoder), ""
293
294 def do_prune_health(self, inbuf, command):
295 try:
296 hours = int(command['hours'])
297 except ValueError:
298 return errno.EINVAL, '', 'hours argument must be integer'
299
300 self._health_prune_history(hours)
301
302 return 0, "", ""
303
304 def testing_set_now_time_offset(self, hours):
305 """
306 Control what "now" time it is by applying an offset. This is called from
307 the selftest module to manage testing scenarios related to tracking
308 health history.
309 """
310 hours = long(hours)
311 health_util.NOW_OFFSET = datetime.timedelta(hours = hours)
312 self.log.warning("Setting now time offset {}".format(health_util.NOW_OFFSET))
313
314 def handle_command(self, inbuf, command):
315 if command["prefix"] == "insights":
316 return self.do_report(inbuf, command)
317 elif command["prefix"] == "insights prune-health":
318 return self.do_prune_health(inbuf, command)
319 else:
320 raise NotImplementedError(cmd["prefix"])