6 from mgr_module
import MgrModule
, CommandResult
7 from . import health
as health_util
9 # hours of crash history to report
10 CRASH_HISTORY_HOURS
= 24
11 # hours of health history to report
12 HEALTH_HISTORY_HOURS
= 24
13 # how many hours of health history to keep
14 HEALTH_RETENTION_HOURS
= 30
15 # health check name for insights health
16 INSIGHTS_HEALTH_CHECK
= "MGR_INSIGHTS_WARNING"
17 # version tag for persistent data format
20 class Module(MgrModule
):
24 "desc": "Retrieve insights report",
29 'cmd': 'insights prune-health name=hours,type=CephString',
30 'desc': 'Remove health history older than <hours> hours',
36 def __init__(self
, *args
, **kwargs
):
37 super(Module
, self
).__init
__(*args
, **kwargs
)
39 self
._shutdown
= False
40 self
._evt
= threading
.Event()
42 # health history tracking
43 self
._pending
_health
= []
44 self
._health
_slot
= None
46 def notify(self
, ttype
, ident
):
47 """Queue updates for processing"""
49 self
.log
.info("Received health check update {} pending".format(
50 len(self
._pending
_health
)))
51 health
= json
.loads(self
.get("health")["json"])
52 self
._pending
_health
.append(health
)
58 self
._evt
.wait(health_util
.PERSIST_PERIOD
.total_seconds())
63 # when the current health slot expires, finalize it by flushing it to
64 # the store, and initializing a new empty slot.
65 if self
._health
_slot
.expired():
66 self
.log
.info("Health history slot expired {}".format(
68 self
._health
_maybe
_flush
()
70 self
._health
_prune
_history
(HEALTH_RETENTION_HOURS
)
72 # fold in pending health snapshots and flush
73 self
.log
.info("Applying {} health updates to slot {}".format(
74 len(self
._pending
_health
), self
._health
_slot
))
75 for health
in self
._pending
_health
:
76 self
._health
_slot
.add(health
)
77 self
._pending
_health
= []
78 self
._health
_maybe
_flush
()
84 def _health_reset(self
):
85 """Initialize the current health slot
87 The slot will be initialized with any state found to have already been
88 persisted, otherwise the slot will start empty.
90 key
= health_util
.HealthHistorySlot
.curr_key()
91 data
= self
.get_store(key
)
93 init_health
= json
.loads(data
)
94 self
._health
_slot
= health_util
.HealthHistorySlot(init_health
)
96 self
._health
_slot
= health_util
.HealthHistorySlot()
97 self
.log
.info("Reset curr health slot {}".format(self
._health
_slot
))
99 def _health_maybe_flush(self
):
100 """Store the health for the current time slot if needed"""
102 self
.log
.info("Maybe flushing slot {} needed {}".format(
103 self
._health
_slot
, self
._health
_slot
.need_flush()))
105 if self
._health
_slot
.need_flush():
106 key
= self
._health
_slot
.key()
108 # build store data entry
109 slot
= self
._health
_slot
.health()
110 assert "version" not in slot
111 slot
.update(dict(version
= ON_DISK_VERSION
))
112 data
= json
.dumps(slot
, cls
=health_util
.HealthEncoder
)
114 self
.log
.debug("Storing health key {} data {}".format(
115 key
, json
.dumps(slot
, indent
=2, cls
=health_util
.HealthEncoder
)))
117 self
.set_store(key
, data
)
118 self
._health
_slot
.mark_flushed()
120 def _health_filter(self
, f
):
121 """Filter hourly health reports timestamp"""
123 lambda t
: f(health_util
.HealthHistorySlot
.key_to_time(t
[0])),
124 six
.iteritems(self
.get_store_prefix(health_util
.HEALTH_HISTORY_KEY_PREFIX
)))
125 return map(lambda t
: t
[0], matches
)
127 def _health_prune_history(self
, hours
):
128 """Prune old health entries"""
129 cutoff
= datetime
.datetime
.utcnow() - datetime
.timedelta(hours
= hours
)
130 for key
in self
._health
_filter
(lambda ts
: ts
<= cutoff
):
131 self
.log
.info("Removing old health slot key {}".format(key
))
132 self
.set_store(key
, None)
134 self
._health
_slot
= health_util
.HealthHistorySlot()
136 def _health_report(self
, hours
):
138 Report a consolidated health report for the past N hours.
140 # roll up the past N hours of health info
141 collector
= health_util
.HealthHistorySlot()
142 keys
= health_util
.HealthHistorySlot
.key_range(hours
)
144 data
= self
.get_store(key
)
145 self
.log
.info("Reporting health key {} found {}".format(
147 health
= json
.loads(data
) if data
else {}
148 slot
= health_util
.HealthHistorySlot(health
)
149 collector
.merge(slot
)
151 # include history that hasn't yet been flushed
152 collector
.merge(self
._health
_slot
)
155 current
= json
.loads(self
.get("health")["json"]),
156 history
= collector
.health()
159 def _version_parse(self
, version
):
161 Return the components of a Ceph version string.
163 This returns nothing when the version string cannot be parsed into its
164 constituent components, such as when Ceph has been built with
165 ENABLE_GIT_VERSION=OFF.
167 r
= "ceph version (?P<release>\d+)\.(?P<major>\d+)\.(?P<minor>\d+)"
168 m
= re
.match(r
, version
)
169 ver
= {} if not m
else {
170 "release": m
.group("release"),
171 "major": m
.group("major"),
172 "minor": m
.group("minor")
174 return { k
:int(v
) for k
,v
in six
.iteritems(ver
) }
176 def _crash_history(self
, hours
):
178 Load crash history for the past N hours from the crash module.
181 prefix
= "crash json_report",
187 hours
= params
["hours"],
190 health_check_details
= []
193 _
, _
, crashes
= self
.remote("crash", "handle_command", "", params
)
194 result
["summary"] = json
.loads(crashes
)
195 except Exception as e
:
196 errmsg
= "failed to invoke crash module"
197 self
.log
.warning("{}: {}".format(errmsg
, str(e
)))
198 health_check_details
.append(errmsg
)
200 self
.log
.debug("Crash module invocation succeeded {}".format(
201 json
.dumps(result
["summary"], indent
=2)))
203 return result
, health_check_details
205 def _apply_osd_stats(self
, osd_map
):
206 # map from osd id to its index in the map structure
208 for idx
in range(len(osd_map
["osds"])):
209 osd_id_to_idx
[osd_map
["osds"][idx
]["osd"]] = idx
211 # include stats, including space utilization performance counters.
212 # adapted from dashboard api controller
213 for s
in self
.get('osd_stats')['osd_stats']:
215 idx
= osd_id_to_idx
[s
["osd"]]
216 osd_map
["osds"][idx
].update({'osd_stats': s
})
217 except KeyError as e
:
218 self
.log
.warning("inconsistent api state: {}".format(str(e
)))
220 for osd
in osd_map
["osds"]:
222 for s
in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
223 osd
['stats'][s
.split('.')[1]] = self
.get_latest('osd', str(osd
["osd"]), s
)
226 def _config_dump(self
):
227 """Report cluster configuration
229 This report is the standard `config dump` report. It does not include
230 configuration defaults; these can be inferred from the version number.
232 result
= CommandResult("")
233 args
= dict(prefix
= "config dump", format
= "json")
234 self
.send_command(result
, "mon", "", json
.dumps(args
), "")
235 ret
, outb
, outs
= result
.wait()
237 return json
.loads(outb
), []
239 self
.log
.warning("send_command 'config dump' failed. \
240 ret={}, outs=\"{}\"".format(ret
, outs
))
241 return [], ["Failed to read monitor config dump"]
243 def do_report(self
, inbuf
, command
):
244 health_check_details
= []
248 "version": dict(full
= self
.version
,
249 **self
._version
_parse
(self
.version
))
253 crashes
, health_details
= self
._crash
_history
(CRASH_HISTORY_HOURS
)
254 report
["crashes"] = crashes
255 health_check_details
.extend(health_details
)
258 report
["health"] = self
._health
_report
(HEALTH_HISTORY_HOURS
)
260 # cluster configuration
261 config
, health_details
= self
._config
_dump
()
262 report
["config"] = config
263 health_check_details
.extend(health_details
)
265 osd_map
= self
.get("osd_map")
266 del osd_map
['pg_temp']
267 self
._apply
_osd
_stats
(osd_map
)
268 report
["osd_dump"] = osd_map
270 report
["df"] = self
.get("df")
271 report
["osd_tree"] = self
.get("osd_map_tree")
272 report
["fs_map"] = self
.get("fs_map")
273 report
["crush_map"] = self
.get("osd_map_crush")
274 report
["mon_map"] = self
.get("mon_map")
275 report
["service_map"] = self
.get("service_map")
276 report
["manager_map"] = self
.get("mgr_map")
277 report
["mon_status"] = json
.loads(self
.get("mon_status")["json"])
278 report
["pg_summary"] = self
.get("pg_summary")
279 report
["osd_metadata"] = self
.get("osd_metadata")
282 "errors": health_check_details
285 if health_check_details
:
286 self
.set_health_checks({
287 INSIGHTS_HEALTH_CHECK
: {
288 "severity": "warning",
289 "summary": "Generated incomplete Insights report",
290 "detail": health_check_details
294 return 0, json
.dumps(report
, indent
=2, cls
=health_util
.HealthEncoder
), ""
296 def do_prune_health(self
, inbuf
, command
):
298 hours
= int(command
['hours'])
300 return errno
.EINVAL
, '', 'hours argument must be integer'
302 self
._health
_prune
_history
(hours
)
306 def testing_set_now_time_offset(self
, hours
):
308 Control what "now" time it is by applying an offset. This is called from
309 the selftest module to manage testing scenarios related to tracking
316 health_util
.NOW_OFFSET
= datetime
.timedelta(hours
= hours
)
317 self
.log
.warning("Setting now time offset {}".format(health_util
.NOW_OFFSET
))
319 def handle_command(self
, inbuf
, command
):
320 if command
["prefix"] == "insights":
321 return self
.do_report(inbuf
, command
)
322 elif command
["prefix"] == "insights prune-health":
323 return self
.do_prune_health(inbuf
, command
)
325 raise NotImplementedError(cmd
["prefix"])