]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | import json |
11fdf7f2 TL |
2 | from collections import defaultdict |
3 | import datetime | |
4 | ||
5 | # freq to write cached state to disk | |
20effc67 | 6 | PERSIST_PERIOD = datetime.timedelta(seconds=10) |
11fdf7f2 TL |
7 | # on disk key prefix |
8 | HEALTH_HISTORY_KEY_PREFIX = "health_history/" | |
9 | # apply on offset to "now": used for testing | |
10 | NOW_OFFSET = None | |
11 | ||
20effc67 | 12 | |
11fdf7f2 TL |
13 | class HealthEncoder(json.JSONEncoder): |
14 | def default(self, obj): | |
15 | if isinstance(obj, set): | |
16 | return list(obj) | |
17 | return json.JSONEncoder.default(self, obj) | |
18 | ||
20effc67 | 19 | |
11fdf7f2 TL |
20 | class HealthCheckAccumulator(object): |
21 | """ | |
22 | Deuplicated storage of health checks. | |
23 | """ | |
20effc67 TL |
24 | |
25 | def __init__(self, init_checks=None): | |
11fdf7f2 TL |
26 | # check : severity : { summary, detail } |
27 | # summary and detail are deduplicated | |
28 | self._checks = defaultdict(lambda: | |
20effc67 TL |
29 | defaultdict(lambda: { |
30 | "summary": set(), | |
31 | "detail": set() | |
32 | })) | |
11fdf7f2 TL |
33 | |
34 | if init_checks: | |
35 | self._update(init_checks) | |
36 | ||
37 | def __str__(self): | |
38 | return "check count {}".format(len(self._checks)) | |
39 | ||
40 | def add(self, checks): | |
41 | """ | |
42 | Add health checks to the current state | |
43 | ||
44 | Returns: | |
45 | bool: True if the state changed, False otherwise. | |
46 | """ | |
47 | changed = False | |
48 | ||
f67539c2 | 49 | for check, info in checks.items(): |
11fdf7f2 TL |
50 | |
51 | # only keep the icky stuff | |
52 | severity = info["severity"] | |
53 | if severity == "HEALTH_OK": | |
54 | continue | |
55 | ||
56 | summary = info["summary"]["message"] | |
57 | details = map(lambda d: d["message"], info["detail"]) | |
58 | ||
59 | if self._add_check(check, severity, [summary], details): | |
60 | changed = True | |
61 | ||
62 | return changed | |
63 | ||
64 | def checks(self): | |
65 | return self._checks | |
66 | ||
67 | def merge(self, other): | |
68 | assert isinstance(other, HealthCheckAccumulator) | |
69 | self._update(other._checks) | |
70 | ||
71 | def _update(self, checks): | |
72 | """Merge checks with same structure. Does not set dirty bit""" | |
73 | for check in checks: | |
74 | for severity in checks[check]: | |
75 | summaries = set(checks[check][severity]["summary"]) | |
76 | details = set(checks[check][severity]["detail"]) | |
77 | self._add_check(check, severity, summaries, details) | |
78 | ||
79 | def _add_check(self, check, severity, summaries, details): | |
80 | changed = False | |
81 | ||
82 | for summary in summaries: | |
83 | if summary not in self._checks[check][severity]["summary"]: | |
84 | changed = True | |
85 | self._checks[check][severity]["summary"].add(summary) | |
86 | ||
87 | for detail in details: | |
88 | if detail not in self._checks[check][severity]["detail"]: | |
89 | changed = True | |
90 | self._checks[check][severity]["detail"].add(detail) | |
91 | ||
92 | return changed | |
93 | ||
20effc67 | 94 | |
11fdf7f2 TL |
95 | class HealthHistorySlot(object): |
96 | """ | |
97 | Manage the life cycle of a health history time slot. | |
98 | ||
99 | A time slot is a fixed slice of wall clock time (e.g. every hours, from :00 | |
100 | to :59), and all health updates that occur during this time are deduplicated | |
101 | together. A slot is initially in a clean state, and becomes dirty when a new | |
102 | health check is observed. The state of a slot should be persisted when | |
103 | need_flush returns true. Once the state has been flushed, reset the dirty | |
104 | bit by calling mark_flushed. | |
105 | """ | |
20effc67 TL |
106 | |
107 | def __init__(self, init_health=dict()): | |
11fdf7f2 TL |
108 | self._checks = HealthCheckAccumulator(init_health.get("checks")) |
109 | self._slot = self._curr_slot() | |
110 | self._next_flush = None | |
111 | ||
112 | def __str__(self): | |
113 | return "key {} next flush {} checks {}".format( | |
114 | self.key(), self._next_flush, self._checks) | |
115 | ||
116 | def health(self): | |
20effc67 | 117 | return dict(checks=self._checks.checks()) |
11fdf7f2 TL |
118 | |
119 | def key(self): | |
120 | """Identifier in the persist store""" | |
121 | return self._key(self._slot) | |
122 | ||
123 | def expired(self): | |
124 | """True if this slot is the current slot, False otherwise""" | |
125 | return self._slot != self._curr_slot() | |
126 | ||
127 | def need_flush(self): | |
128 | """True if this slot needs to be flushed, False otherwise""" | |
129 | now = HealthHistorySlot._now() | |
130 | if self._next_flush is not None: | |
131 | if self._next_flush <= now or self.expired(): | |
132 | return True | |
133 | return False | |
134 | ||
135 | def mark_flushed(self): | |
136 | """Reset the dirty bit. Caller persists state""" | |
137 | assert self._next_flush | |
138 | self._next_flush = None | |
139 | ||
140 | def add(self, health): | |
141 | """ | |
142 | Add health to the underlying health accumulator. When the slot | |
143 | transitions from clean to dirty a target flush time is computed. | |
144 | """ | |
145 | changed = self._checks.add(health["checks"]) | |
146 | if changed and not self._next_flush: | |
147 | self._next_flush = HealthHistorySlot._now() + PERSIST_PERIOD | |
148 | return changed | |
149 | ||
150 | def merge(self, other): | |
151 | assert isinstance(other, HealthHistorySlot) | |
152 | self._checks.merge(other._checks) | |
153 | ||
154 | @staticmethod | |
155 | def key_range(hours): | |
156 | """Return the time slot keys for the past N hours""" | |
157 | def inner(curr, hours): | |
20effc67 | 158 | slot = curr - datetime.timedelta(hours=hours) |
11fdf7f2 TL |
159 | return HealthHistorySlot._key(slot) |
160 | curr = HealthHistorySlot._curr_slot() | |
161 | return map(lambda i: inner(curr, i), range(hours)) | |
162 | ||
163 | @staticmethod | |
164 | def curr_key(): | |
165 | """Key for the current UTC time slot""" | |
166 | return HealthHistorySlot._key(HealthHistorySlot._curr_slot()) | |
167 | ||
168 | @staticmethod | |
169 | def key_to_time(key): | |
170 | """Return key converted into datetime""" | |
171 | timestr = key[len(HEALTH_HISTORY_KEY_PREFIX):] | |
172 | return datetime.datetime.strptime(timestr, "%Y-%m-%d_%H") | |
173 | ||
174 | @staticmethod | |
175 | def _key(dt): | |
176 | """Key format. Example: health_2018_11_05_00""" | |
177 | return HEALTH_HISTORY_KEY_PREFIX + dt.strftime("%Y-%m-%d_%H") | |
178 | ||
179 | @staticmethod | |
180 | def _now(): | |
181 | """Control now time for easier testing""" | |
182 | now = datetime.datetime.utcnow() | |
183 | if NOW_OFFSET is not None: | |
184 | now = now + NOW_OFFSET | |
185 | return now | |
186 | ||
187 | @staticmethod | |
188 | def _curr_slot(): | |
189 | """Slot for the current UTC time""" | |
190 | dt = HealthHistorySlot._now() | |
191 | return datetime.datetime( | |
20effc67 TL |
192 | year=dt.year, |
193 | month=dt.month, | |
194 | day=dt.day, | |
195 | hour=dt.hour) |