]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/insights/module.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / insights / module.py
1 import datetime
2 import json
3 import re
4 import threading
5
6 from mgr_module import CLICommand, CLIReadCommand, HandleCommandResult
7 from mgr_module import MgrModule, CommandResult
8 from . import health as health_util
9
10 # hours of crash history to report
11 CRASH_HISTORY_HOURS = 24
12 # hours of health history to report
13 HEALTH_HISTORY_HOURS = 24
14 # how many hours of health history to keep
15 HEALTH_RETENTION_HOURS = 30
16 # health check name for insights health
17 INSIGHTS_HEALTH_CHECK = "MGR_INSIGHTS_WARNING"
18 # version tag for persistent data format
19 ON_DISK_VERSION = 1
20
21
22 class Module(MgrModule):
23 def __init__(self, *args, **kwargs):
24 super(Module, self).__init__(*args, **kwargs)
25
26 self._shutdown = False
27 self._evt = threading.Event()
28
29 # health history tracking
30 self._pending_health = []
31 self._health_slot = None
32
33 def notify(self, ttype, ident):
34 """Queue updates for processing"""
35 if ttype == "health":
36 self.log.info("Received health check update {} pending".format(
37 len(self._pending_health)))
38 health = json.loads(self.get("health")["json"])
39 self._pending_health.append(health)
40 self._evt.set()
41
42 def serve(self):
43 self._health_reset()
44 while True:
45 self._evt.wait(health_util.PERSIST_PERIOD.total_seconds())
46 self._evt.clear()
47 if self._shutdown:
48 break
49
50 # when the current health slot expires, finalize it by flushing it to
51 # the store, and initializing a new empty slot.
52 if self._health_slot.expired():
53 self.log.info("Health history slot expired {}".format(
54 self._health_slot))
55 self._health_maybe_flush()
56 self._health_reset()
57 self._health_prune_history(HEALTH_RETENTION_HOURS)
58
59 # fold in pending health snapshots and flush
60 self.log.info("Applying {} health updates to slot {}".format(
61 len(self._pending_health), self._health_slot))
62 for health in self._pending_health:
63 self._health_slot.add(health)
64 self._pending_health = []
65 self._health_maybe_flush()
66
67 def shutdown(self):
68 self._shutdown = True
69 self._evt.set()
70
71 def _health_reset(self):
72 """Initialize the current health slot
73
74 The slot will be initialized with any state found to have already been
75 persisted, otherwise the slot will start empty.
76 """
77 key = health_util.HealthHistorySlot.curr_key()
78 data = self.get_store(key)
79 if data:
80 init_health = json.loads(data)
81 self._health_slot = health_util.HealthHistorySlot(init_health)
82 else:
83 self._health_slot = health_util.HealthHistorySlot()
84 self.log.info("Reset curr health slot {}".format(self._health_slot))
85
86 def _health_maybe_flush(self):
87 """Store the health for the current time slot if needed"""
88
89 self.log.info("Maybe flushing slot {} needed {}".format(
90 self._health_slot, self._health_slot.need_flush()))
91
92 if self._health_slot.need_flush():
93 key = self._health_slot.key()
94
95 # build store data entry
96 slot = self._health_slot.health()
97 assert "version" not in slot
98 slot.update(dict(version = ON_DISK_VERSION))
99 data = json.dumps(slot, cls=health_util.HealthEncoder)
100
101 self.log.debug("Storing health key {} data {}".format(
102 key, json.dumps(slot, indent=2, cls=health_util.HealthEncoder)))
103
104 self.set_store(key, data)
105 self._health_slot.mark_flushed()
106
107 def _health_filter(self, f):
108 """Filter hourly health reports timestamp"""
109 matches = filter(
110 lambda t: f(health_util.HealthHistorySlot.key_to_time(t[0])),
111 self.get_store_prefix(health_util.HEALTH_HISTORY_KEY_PREFIX).items())
112 return map(lambda t: t[0], matches)
113
114 def _health_prune_history(self, hours):
115 """Prune old health entries"""
116 cutoff = datetime.datetime.utcnow() - datetime.timedelta(hours = hours)
117 for key in self._health_filter(lambda ts: ts <= cutoff):
118 self.log.info("Removing old health slot key {}".format(key))
119 self.set_store(key, None)
120 if not hours:
121 self._health_slot = health_util.HealthHistorySlot()
122
123 def _health_report(self, hours):
124 """
125 Report a consolidated health report for the past N hours.
126 """
127 # roll up the past N hours of health info
128 collector = health_util.HealthHistorySlot()
129 keys = health_util.HealthHistorySlot.key_range(hours)
130 for key in keys:
131 data = self.get_store(key)
132 self.log.info("Reporting health key {} found {}".format(
133 key, bool(data)))
134 health = json.loads(data) if data else {}
135 slot = health_util.HealthHistorySlot(health)
136 collector.merge(slot)
137
138 # include history that hasn't yet been flushed
139 collector.merge(self._health_slot)
140
141 return dict(
142 current = json.loads(self.get("health")["json"]),
143 history = collector.health()
144 )
145
146 def _version_parse(self, version):
147 """
148 Return the components of a Ceph version string.
149
150 This returns nothing when the version string cannot be parsed into its
151 constituent components, such as when Ceph has been built with
152 ENABLE_GIT_VERSION=OFF.
153 """
154 r = r"ceph version (?P<release>\d+)\.(?P<major>\d+)\.(?P<minor>\d+)"
155 m = re.match(r, version)
156 ver = {} if not m else {
157 "release": m.group("release"),
158 "major": m.group("major"),
159 "minor": m.group("minor")
160 }
161 return {k: int(v) for k, v in ver.items()}
162
163 def _crash_history(self, hours):
164 """
165 Load crash history for the past N hours from the crash module.
166 """
167 params = dict(
168 prefix="crash json_report",
169 hours=hours
170 )
171
172 result = dict(
173 summary={},
174 hours=params["hours"],
175 )
176
177 health_check_details = []
178
179 try:
180 _, _, crashes = self.remote("crash", "handle_command", "", params)
181 result["summary"] = json.loads(crashes)
182 except Exception as e:
183 errmsg = "failed to invoke crash module"
184 self.log.warning("{}: {}".format(errmsg, str(e)))
185 health_check_details.append(errmsg)
186 else:
187 self.log.debug("Crash module invocation succeeded {}".format(
188 json.dumps(result["summary"], indent=2)))
189
190 return result, health_check_details
191
192 def _apply_osd_stats(self, osd_map):
193 # map from osd id to its index in the map structure
194 osd_id_to_idx = {}
195 for idx in range(len(osd_map["osds"])):
196 osd_id_to_idx[osd_map["osds"][idx]["osd"]] = idx
197
198 # include stats, including space utilization performance counters.
199 # adapted from dashboard api controller
200 for s in self.get('osd_stats')['osd_stats']:
201 try:
202 idx = osd_id_to_idx[s["osd"]]
203 osd_map["osds"][idx].update({'osd_stats': s})
204 except KeyError as e:
205 self.log.warning("inconsistent api state: {}".format(str(e)))
206
207 for osd in osd_map["osds"]:
208 osd['stats'] = {}
209 for s in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
210 osd['stats'][s.split('.')[1]] = self.get_latest('osd', str(osd["osd"]), s)
211
212
213 def _config_dump(self):
214 """Report cluster configuration
215
216 This report is the standard `config dump` report. It does not include
217 configuration defaults; these can be inferred from the version number.
218 """
219 result = CommandResult("")
220 args = dict(prefix = "config dump", format = "json")
221 self.send_command(result, "mon", "", json.dumps(args), "")
222 ret, outb, outs = result.wait()
223 if ret == 0:
224 return json.loads(outb), []
225 else:
226 self.log.warning("send_command 'config dump' failed. \
227 ret={}, outs=\"{}\"".format(ret, outs))
228 return [], ["Failed to read monitor config dump"]
229
230 @CLIReadCommand('insights')
231 def do_report(self):
232 '''
233 Retrieve insights report
234 '''
235 health_check_details = []
236 report = {}
237
238 report.update({
239 "version": dict(full = self.version,
240 **self._version_parse(self.version))
241 })
242
243 # crash history
244 crashes, health_details = self._crash_history(CRASH_HISTORY_HOURS)
245 report["crashes"] = crashes
246 health_check_details.extend(health_details)
247
248 # health history
249 report["health"] = self._health_report(HEALTH_HISTORY_HOURS)
250
251 # cluster configuration
252 config, health_details = self._config_dump()
253 report["config"] = config
254 health_check_details.extend(health_details)
255
256 osd_map = self.get("osd_map")
257 del osd_map['pg_temp']
258 self._apply_osd_stats(osd_map)
259 report["osd_dump"] = osd_map
260
261 report["df"] = self.get("df")
262 report["osd_tree"] = self.get("osd_map_tree")
263 report["fs_map"] = self.get("fs_map")
264 report["crush_map"] = self.get("osd_map_crush")
265 report["mon_map"] = self.get("mon_map")
266 report["service_map"] = self.get("service_map")
267 report["manager_map"] = self.get("mgr_map")
268 report["mon_status"] = json.loads(self.get("mon_status")["json"])
269 report["pg_summary"] = self.get("pg_summary")
270 report["osd_metadata"] = self.get("osd_metadata")
271
272 report.update({
273 "errors": health_check_details
274 })
275
276 if health_check_details:
277 self.set_health_checks({
278 INSIGHTS_HEALTH_CHECK: {
279 "severity": "warning",
280 "summary": "Generated incomplete Insights report",
281 "detail": health_check_details
282 }
283 })
284
285 result = json.dumps(report, indent=2, cls=health_util.HealthEncoder)
286 return HandleCommandResult(stdout=result)
287
288 @CLICommand('insights prune-health')
289 def do_prune_health(self, hours: int):
290 '''
291 Remove health history older than <hours> hours
292 '''
293 self._health_prune_history(hours)
294 return HandleCommandResult()
295
296 def testing_set_now_time_offset(self, hours):
297 """
298 Control what "now" time it is by applying an offset. This is called from
299 the selftest module to manage testing scenarios related to tracking
300 health history.
301 """
302 hours = int(hours)
303 health_util.NOW_OFFSET = datetime.timedelta(hours=hours)
304 self.log.warning("Setting now time offset {}".format(health_util.NOW_OFFSET))