]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/insights/module.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / insights / module.py
CommitLineData
11fdf7f2
TL
1import datetime
2import json
3import re
4import threading
f67539c2
TL
5
6from mgr_module import CLICommand, CLIReadCommand, HandleCommandResult
20effc67 7from mgr_module import MgrModule, CommandResult, NotifyType
11fdf7f2
TL
8from . import health as health_util
9
10# hours of crash history to report
11CRASH_HISTORY_HOURS = 24
12# hours of health history to report
13HEALTH_HISTORY_HOURS = 24
14# how many hours of health history to keep
15HEALTH_RETENTION_HOURS = 30
16# health check name for insights health
17INSIGHTS_HEALTH_CHECK = "MGR_INSIGHTS_WARNING"
18# version tag for persistent data format
19ON_DISK_VERSION = 1
20
11fdf7f2 21
f67539c2 22class Module(MgrModule):
20effc67
TL
23
24 NOTIFY_TYPES = [NotifyType.health]
25
11fdf7f2
TL
26 def __init__(self, *args, **kwargs):
27 super(Module, self).__init__(*args, **kwargs)
28
29 self._shutdown = False
30 self._evt = threading.Event()
31
32 # health history tracking
33 self._pending_health = []
34 self._health_slot = None
522d829b
TL
35 self._store = {}
36
37 # The following three functions, get_store, set_store, and get_store_prefix
38 # mask the functions defined in the parent to avoid storing large keys
39 # persistently to disk as that was proving problematic. Long term we may
40 # implement a different mechanism to make these persistent. When that day
41 # comes it should just be a matter of deleting these three functions.
42 def get_store(self, key):
43 return self._store.get(key)
44
45 def set_store(self, key, value):
46 if value is None:
47 if key in self._store:
48 del self._store[key]
49 else:
50 self._store[key] = value
51
52 def get_store_prefix(self, prefix):
53 return { k: v for k, v in self._store.items() if k.startswith(prefix) }
54
11fdf7f2 55
20effc67 56 def notify(self, ttype: NotifyType, ident):
11fdf7f2 57 """Queue updates for processing"""
20effc67 58 if ttype == NotifyType.health:
11fdf7f2
TL
59 self.log.info("Received health check update {} pending".format(
60 len(self._pending_health)))
61 health = json.loads(self.get("health")["json"])
62 self._pending_health.append(health)
63 self._evt.set()
64
65 def serve(self):
66 self._health_reset()
67 while True:
68 self._evt.wait(health_util.PERSIST_PERIOD.total_seconds())
69 self._evt.clear()
70 if self._shutdown:
71 break
72
73 # when the current health slot expires, finalize it by flushing it to
74 # the store, and initializing a new empty slot.
75 if self._health_slot.expired():
76 self.log.info("Health history slot expired {}".format(
77 self._health_slot))
78 self._health_maybe_flush()
79 self._health_reset()
80 self._health_prune_history(HEALTH_RETENTION_HOURS)
81
82 # fold in pending health snapshots and flush
83 self.log.info("Applying {} health updates to slot {}".format(
84 len(self._pending_health), self._health_slot))
85 for health in self._pending_health:
86 self._health_slot.add(health)
87 self._pending_health = []
88 self._health_maybe_flush()
89
90 def shutdown(self):
91 self._shutdown = True
92 self._evt.set()
93
94 def _health_reset(self):
95 """Initialize the current health slot
96
97 The slot will be initialized with any state found to have already been
98 persisted, otherwise the slot will start empty.
99 """
100 key = health_util.HealthHistorySlot.curr_key()
101 data = self.get_store(key)
102 if data:
103 init_health = json.loads(data)
104 self._health_slot = health_util.HealthHistorySlot(init_health)
105 else:
106 self._health_slot = health_util.HealthHistorySlot()
107 self.log.info("Reset curr health slot {}".format(self._health_slot))
108
109 def _health_maybe_flush(self):
110 """Store the health for the current time slot if needed"""
111
112 self.log.info("Maybe flushing slot {} needed {}".format(
113 self._health_slot, self._health_slot.need_flush()))
114
115 if self._health_slot.need_flush():
116 key = self._health_slot.key()
117
118 # build store data entry
119 slot = self._health_slot.health()
120 assert "version" not in slot
20effc67 121 slot.update(dict(version=ON_DISK_VERSION))
11fdf7f2
TL
122 data = json.dumps(slot, cls=health_util.HealthEncoder)
123
124 self.log.debug("Storing health key {} data {}".format(
125 key, json.dumps(slot, indent=2, cls=health_util.HealthEncoder)))
126
127 self.set_store(key, data)
128 self._health_slot.mark_flushed()
129
130 def _health_filter(self, f):
131 """Filter hourly health reports timestamp"""
132 matches = filter(
133 lambda t: f(health_util.HealthHistorySlot.key_to_time(t[0])),
f67539c2 134 self.get_store_prefix(health_util.HEALTH_HISTORY_KEY_PREFIX).items())
11fdf7f2
TL
135 return map(lambda t: t[0], matches)
136
137 def _health_prune_history(self, hours):
138 """Prune old health entries"""
20effc67 139 cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours=hours)
11fdf7f2
TL
140 for key in self._health_filter(lambda ts: ts <= cutoff):
141 self.log.info("Removing old health slot key {}".format(key))
142 self.set_store(key, None)
9f95a23c
TL
143 if not hours:
144 self._health_slot = health_util.HealthHistorySlot()
11fdf7f2
TL
145
146 def _health_report(self, hours):
147 """
148 Report a consolidated health report for the past N hours.
149 """
150 # roll up the past N hours of health info
151 collector = health_util.HealthHistorySlot()
152 keys = health_util.HealthHistorySlot.key_range(hours)
153 for key in keys:
154 data = self.get_store(key)
155 self.log.info("Reporting health key {} found {}".format(
156 key, bool(data)))
157 health = json.loads(data) if data else {}
158 slot = health_util.HealthHistorySlot(health)
159 collector.merge(slot)
160
161 # include history that hasn't yet been flushed
162 collector.merge(self._health_slot)
163
164 return dict(
20effc67
TL
165 current=json.loads(self.get("health")["json"]),
166 history=collector.health()
11fdf7f2
TL
167 )
168
169 def _version_parse(self, version):
170 """
171 Return the components of a Ceph version string.
172
173 This returns nothing when the version string cannot be parsed into its
174 constituent components, such as when Ceph has been built with
175 ENABLE_GIT_VERSION=OFF.
176 """
e306af50 177 r = r"ceph version (?P<release>\d+)\.(?P<major>\d+)\.(?P<minor>\d+)"
11fdf7f2
TL
178 m = re.match(r, version)
179 ver = {} if not m else {
180 "release": m.group("release"),
181 "major": m.group("major"),
182 "minor": m.group("minor")
183 }
f67539c2 184 return {k: int(v) for k, v in ver.items()}
11fdf7f2
TL
185
186 def _crash_history(self, hours):
187 """
188 Load crash history for the past N hours from the crash module.
189 """
11fdf7f2 190 result = dict(
f67539c2 191 summary={},
20effc67 192 hours=hours,
11fdf7f2
TL
193 )
194
195 health_check_details = []
196
197 try:
20effc67 198 _, _, crashes = self.remote("crash", "do_json_report", hours)
11fdf7f2
TL
199 result["summary"] = json.loads(crashes)
200 except Exception as e:
201 errmsg = "failed to invoke crash module"
202 self.log.warning("{}: {}".format(errmsg, str(e)))
203 health_check_details.append(errmsg)
204 else:
205 self.log.debug("Crash module invocation succeeded {}".format(
206 json.dumps(result["summary"], indent=2)))
207
208 return result, health_check_details
209
210 def _apply_osd_stats(self, osd_map):
211 # map from osd id to its index in the map structure
212 osd_id_to_idx = {}
213 for idx in range(len(osd_map["osds"])):
214 osd_id_to_idx[osd_map["osds"][idx]["osd"]] = idx
215
216 # include stats, including space utilization performance counters.
217 # adapted from dashboard api controller
218 for s in self.get('osd_stats')['osd_stats']:
219 try:
220 idx = osd_id_to_idx[s["osd"]]
221 osd_map["osds"][idx].update({'osd_stats': s})
222 except KeyError as e:
223 self.log.warning("inconsistent api state: {}".format(str(e)))
224
225 for osd in osd_map["osds"]:
226 osd['stats'] = {}
227 for s in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
228 osd['stats'][s.split('.')[1]] = self.get_latest('osd', str(osd["osd"]), s)
229
11fdf7f2
TL
230 def _config_dump(self):
231 """Report cluster configuration
232
233 This report is the standard `config dump` report. It does not include
234 configuration defaults; these can be inferred from the version number.
235 """
236 result = CommandResult("")
20effc67 237 args = dict(prefix="config dump", format="json")
11fdf7f2
TL
238 self.send_command(result, "mon", "", json.dumps(args), "")
239 ret, outb, outs = result.wait()
240 if ret == 0:
241 return json.loads(outb), []
242 else:
243 self.log.warning("send_command 'config dump' failed. \
244 ret={}, outs=\"{}\"".format(ret, outs))
245 return [], ["Failed to read monitor config dump"]
246
f67539c2
TL
247 @CLIReadCommand('insights')
248 def do_report(self):
249 '''
250 Retrieve insights report
251 '''
11fdf7f2
TL
252 health_check_details = []
253 report = {}
254
255 report.update({
20effc67
TL
256 "version": dict(full=self.version,
257 **self._version_parse(self.version))
11fdf7f2
TL
258 })
259
260 # crash history
261 crashes, health_details = self._crash_history(CRASH_HISTORY_HOURS)
262 report["crashes"] = crashes
263 health_check_details.extend(health_details)
264
265 # health history
266 report["health"] = self._health_report(HEALTH_HISTORY_HOURS)
267
268 # cluster configuration
269 config, health_details = self._config_dump()
270 report["config"] = config
271 health_check_details.extend(health_details)
272
273 osd_map = self.get("osd_map")
274 del osd_map['pg_temp']
275 self._apply_osd_stats(osd_map)
276 report["osd_dump"] = osd_map
277
278 report["df"] = self.get("df")
279 report["osd_tree"] = self.get("osd_map_tree")
280 report["fs_map"] = self.get("fs_map")
281 report["crush_map"] = self.get("osd_map_crush")
282 report["mon_map"] = self.get("mon_map")
283 report["service_map"] = self.get("service_map")
284 report["manager_map"] = self.get("mgr_map")
285 report["mon_status"] = json.loads(self.get("mon_status")["json"])
286 report["pg_summary"] = self.get("pg_summary")
287 report["osd_metadata"] = self.get("osd_metadata")
288
289 report.update({
290 "errors": health_check_details
291 })
292
293 if health_check_details:
294 self.set_health_checks({
295 INSIGHTS_HEALTH_CHECK: {
296 "severity": "warning",
297 "summary": "Generated incomplete Insights report",
298 "detail": health_check_details
299 }
300 })
301
f67539c2
TL
302 result = json.dumps(report, indent=2, cls=health_util.HealthEncoder)
303 return HandleCommandResult(stdout=result)
11fdf7f2 304
f67539c2
TL
305 @CLICommand('insights prune-health')
306 def do_prune_health(self, hours: int):
307 '''
308 Remove health history older than <hours> hours
309 '''
11fdf7f2 310 self._health_prune_history(hours)
f67539c2 311 return HandleCommandResult()
11fdf7f2
TL
312
313 def testing_set_now_time_offset(self, hours):
314 """
315 Control what "now" time it is by applying an offset. This is called from
316 the selftest module to manage testing scenarios related to tracking
317 health history.
318 """
f67539c2
TL
319 hours = int(hours)
320 health_util.NOW_OFFSET = datetime.timedelta(hours=hours)
11fdf7f2 321 self.log.warning("Setting now time offset {}".format(health_util.NOW_OFFSET))