]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/progress/module.py
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / pybind / mgr / progress / module.py
CommitLineData
11fdf7f2
TL
1from mgr_module import MgrModule
2import threading
3import datetime
4import uuid
5
6import json
7
8
9ENCODING_VERSION = 1
10
11# keep a global reference to the module so we can use it from Event methods
12_module = None
13
14
15class Event(object):
16 """
17 A generic "event" that has a start time, completion percentage,
18 and a list of "refs" that are (type, id) tuples describing which
19 objects (osds, pools) this relates to.
20 """
21
22 def __init__(self, message, refs):
23 self._message = message
24 self._refs = refs
25
26 self.started_at = datetime.datetime.utcnow()
27
28 self.id = None
29
30 def _refresh(self):
31 global _module
32 _module.log.debug('refreshing mgr for %s (%s) at %f' % (self.id, self._message,
81eedcae
TL
33 self.progress))
34 _module.update_progress_event(self.id, self._message, self.progress)
11fdf7f2
TL
35
36 @property
37 def message(self):
38 return self._message
39
40 @property
41 def refs(self):
42 return self._refs
43
44 @property
45 def progress(self):
46 raise NotImplementedError()
47
48 def summary(self):
81eedcae 49 return "{0} {1}".format(self.progress, self.message)
11fdf7f2
TL
50
51 def _progress_str(self, width):
52 inner_width = width - 2
53 out = "["
54 done_chars = int(self.progress * inner_width)
55 out += done_chars * '='
56 out += (inner_width - done_chars) * '.'
57 out += "]"
58
59 return out
60
61 def twoline_progress(self):
62 """
63 e.g.
64
65 - Eating my delicious strudel
66 [===============..............]
67
68 """
69 return "{0}\n {1}".format(
70 self._message, self._progress_str(30))
71
72 def to_json(self):
73 return {
74 "id": self.id,
75 "message": self.message,
76 "refs": self._refs
77 }
78
79
80class GhostEvent(Event):
81 """
82 The ghost of a completed event: these are the fields that we persist
83 after the event is complete.
84 """
85
86 def __init__(self, my_id, message, refs):
87 super(GhostEvent, self).__init__(message, refs)
88 self.id = my_id
89
90 @property
91 def progress(self):
92 return 1.0
93
94
95class RemoteEvent(Event):
96 """
97 An event that was published by another module: we know nothing about
98 this, rely on the other module to continuously update us with
99 progress information as it emerges.
100 """
101
102 def __init__(self, my_id, message, refs):
103 super(RemoteEvent, self).__init__(message, refs)
104 self.id = my_id
105 self._progress = 0.0
106 self._refresh()
107
108 def set_progress(self, progress):
109 self._progress = progress
110 self._refresh()
111
112 @property
113 def progress(self):
114 return self._progress
115
116
117class PgRecoveryEvent(Event):
118 """
119 An event whose completion is determined by the recovery of a set of
120 PGs to a healthy state.
121
122 Always call update() immediately after construction.
123 """
124
81eedcae 125 def __init__(self, message, refs, which_pgs, evacuate_osds):
11fdf7f2
TL
126 super(PgRecoveryEvent, self).__init__(message, refs)
127
128 self._pgs = which_pgs
129
81eedcae 130 self._evacuate_osds = evacuate_osds
11fdf7f2
TL
131
132 self._original_pg_count = len(self._pgs)
133
134 self._original_bytes_recovered = None
135
136 self._progress = 0.0
137
138 self.id = str(uuid.uuid4())
139 self._refresh()
140
141 @property
142 def evacuating_osds(self):
143 return self. _evacuate_osds
144
145 def pg_update(self, pg_dump, log):
146 # FIXME: O(pg_num) in python
147 # FIXME: far more fields getting pythonized than we really care about
148 pg_to_state = dict([(p['pgid'], p) for p in pg_dump['pg_stats']])
149
150 if self._original_bytes_recovered is None:
151 self._original_bytes_recovered = {}
81eedcae 152 missing_pgs = []
11fdf7f2
TL
153 for pg in self._pgs:
154 pg_str = str(pg)
81eedcae
TL
155 if pg_str in pg_to_state:
156 self._original_bytes_recovered[pg] = \
157 pg_to_state[pg_str]['stat_sum']['num_bytes_recovered']
158 else:
159 missing_pgs.append(pg)
160 if pg_dump.get('pg_ready', False):
161 for pg in missing_pgs:
162 self._pgs.remove(pg)
11fdf7f2
TL
163
164 complete_accumulate = 0.0
165
166 # Calculating progress as the number of PGs recovered divided by the
167 # original where partially completed PGs count for something
168 # between 0.0-1.0. This is perhaps less faithful than looking at the
169 # total number of bytes recovered, but it does a better job of
170 # representing the work still to do if there are a number of very
171 # few-bytes PGs that still need the housekeeping of their recovery
172 # to be done. This is subjective...
173
174 complete = set()
175 for pg in self._pgs:
176 pg_str = str(pg)
177 try:
178 info = pg_to_state[pg_str]
179 except KeyError:
180 # The PG is gone! Probably a pool was deleted. Drop it.
181 complete.add(pg)
182 continue
183
184 state = info['state']
185
186 states = state.split("+")
187
188 unmoved = bool(set(self._evacuate_osds) & (
189 set(info['up']) | set(info['acting'])))
190
191 if "active" in states and "clean" in states and not unmoved:
192 complete.add(pg)
193 else:
194 if info['stat_sum']['num_bytes'] == 0:
195 # Empty PGs are considered 0% done until they are
196 # in the correct state.
197 pass
198 else:
199 recovered = info['stat_sum']['num_bytes_recovered']
200 total_bytes = info['stat_sum']['num_bytes']
201 if total_bytes > 0:
202 ratio = float(recovered -
203 self._original_bytes_recovered[pg]) / \
204 total_bytes
205
206 # Since the recovered bytes (over time) could perhaps
207 # exceed the contents of the PG (moment in time), we
208 # must clamp this
209 ratio = min(ratio, 1.0)
210
211 else:
212 # Dataless PGs (e.g. containing only OMAPs) count
213 # as half done.
214 ratio = 0.5
215
216 complete_accumulate += ratio
217
218 self._pgs = list(set(self._pgs) ^ complete)
219 completed_pgs = self._original_pg_count - len(self._pgs)
220 self._progress = (completed_pgs + complete_accumulate)\
221 / self._original_pg_count
222 self._refresh()
223
224 log.info("Updated progress to {0} ({1})".format(
225 self._progress, self._message
226 ))
227
228 @property
229 def progress(self):
230 return self._progress
231
232
233class PgId(object):
234 def __init__(self, pool_id, ps):
235 self.pool_id = pool_id
236 self.ps = ps
237
238 def __cmp__(self, other):
239 return (self.pool_id, self.ps) == (other.pool_id, other.ps)
240
241 def __lt__(self, other):
242 return (self.pool_id, self.ps) < (other.pool_id, other.ps)
243
244 def __str__(self):
245 return "{0}.{1:x}".format(self.pool_id, self.ps)
246
247
248class Module(MgrModule):
249 COMMANDS = [
250 {"cmd": "progress",
251 "desc": "Show progress of recovery operations",
252 "perm": "r"},
253 {"cmd": "progress json",
254 "desc": "Show machine readable progress information",
255 "perm": "r"},
256 {"cmd": "progress clear",
257 "desc": "Reset progress tracking",
258 "perm": "rw"}
259 ]
260
261 MODULE_OPTIONS = [
262 {
263 'name': 'max_completed_events',
264 'default': 50,
265 'type': 'int',
266 'desc': 'number of past completed events to remember',
267 'runtime': True,
268 },
269 {
270 'name': 'persist_interval',
271 'default': 5,
272 'type': 'secs',
273 'desc': 'how frequently to persist completed events',
274 'runtime': True,
275 },
276 ]
277
278 def __init__(self, *args, **kwargs):
279 super(Module, self).__init__(*args, **kwargs)
280
281 self._events = {}
282 self._completed_events = []
283
284 self._old_osd_map = None
285
286 self._ready = threading.Event()
287 self._shutdown = threading.Event()
288
289 self._latest_osdmap = None
290
291 self._dirty = False
292
293 global _module
294 _module = self
295
296 def config_notify(self):
297 for opt in self.MODULE_OPTIONS:
298 setattr(self,
299 opt['name'],
300 self.get_module_option(opt['name']))
301 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
302
303 def _osd_out(self, old_map, old_dump, new_map, osd_id):
304 affected_pgs = []
305 unmoved_pgs = []
306 for pool in old_dump['pools']:
307 pool_id = pool['pool']
308 for ps in range(0, pool['pg_num']):
309 up_acting = old_map.pg_to_up_acting_osds(pool['pool'], ps)
310
311 # Was this OSD affected by the OSD going out?
312 old_osds = set(up_acting['up']) | set(up_acting['acting'])
313 was_on_out_osd = osd_id in old_osds
314 if not was_on_out_osd:
315 continue
316
317 self.log.debug("pool_id, ps = {0}, {1}".format(
318 pool_id, ps
319 ))
320
321 self.log.debug(
322 "up_acting: {0}".format(json.dumps(up_acting, indent=2)))
323
324 new_up_acting = new_map.pg_to_up_acting_osds(pool['pool'], ps)
325 new_osds = set(new_up_acting['up']) | set(new_up_acting['acting'])
326
327 # Has this OSD been assigned a new location?
328 # (it might not be if there is no suitable place to move
329 # after an OSD failure)
330 is_relocated = len(new_osds - old_osds) > 0
331
332 self.log.debug(
333 "new_up_acting: {0}".format(json.dumps(new_up_acting,
334 indent=2)))
335
336 if was_on_out_osd and is_relocated:
337 # This PG is now in motion, track its progress
338 affected_pgs.append(PgId(pool_id, ps))
339 elif not is_relocated:
340 # This PG didn't get a new location, we'll log it
341 unmoved_pgs.append(PgId(pool_id, ps))
342
343 # In the case that we ignored some PGs, log the reason why (we may
344 # not end up creating a progress event)
345 if len(unmoved_pgs):
346 self.log.warn("{0} PGs were on osd.{1}, but didn't get new locations".format(
347 len(unmoved_pgs), osd_id))
348
349 self.log.warn("{0} PGs affected by osd.{1} going out".format(
350 len(affected_pgs), osd_id))
351
352 if len(affected_pgs) == 0:
353 # Don't emit events if there were no PGs
354 return
355
356 # TODO: reconcile with existing events referring to this OSD going out
357 ev = PgRecoveryEvent(
358 "Rebalancing after osd.{0} marked out".format(osd_id),
359 refs=[("osd", osd_id)],
360 which_pgs=affected_pgs,
81eedcae 361 evacuate_osds=[osd_id]
11fdf7f2
TL
362 )
363 ev.pg_update(self.get("pg_dump"), self.log)
364 self._events[ev.id] = ev
365
366 def _osd_in(self, osd_id):
367 for ev_id, ev in self._events.items():
368 if isinstance(ev, PgRecoveryEvent) and osd_id in ev.evacuating_osds:
369 self.log.info("osd.{0} came back in, cancelling event".format(
370 osd_id
371 ))
372 self._complete(ev)
373
374 def _osdmap_changed(self, old_osdmap, new_osdmap):
375 old_dump = old_osdmap.dump()
376 new_dump = new_osdmap.dump()
377
378 old_osds = dict([(o['osd'], o) for o in old_dump['osds']])
379
380 for osd in new_dump['osds']:
381 osd_id = osd['osd']
382 new_weight = osd['in']
383 if osd_id in old_osds:
384 old_weight = old_osds[osd_id]['in']
385
386 if new_weight == 0.0 and old_weight > new_weight:
387 self.log.warn("osd.{0} marked out".format(osd_id))
388 self._osd_out(old_osdmap, old_dump, new_osdmap, osd_id)
389 elif new_weight >= 1.0 and old_weight == 0.0:
390 # Only consider weight>=1.0 as "in" to avoid spawning
391 # individual recovery events on every adjustment
392 # in a gradual weight-in
393 self.log.warn("osd.{0} marked in".format(osd_id))
394 self._osd_in(osd_id)
395
396 def notify(self, notify_type, notify_data):
397 self._ready.wait()
398
399 if notify_type == "osd_map":
400 old_osdmap = self._latest_osdmap
401 self._latest_osdmap = self.get_osdmap()
402
403 self.log.info("Processing OSDMap change {0}..{1}".format(
404 old_osdmap.get_epoch(), self._latest_osdmap.get_epoch()
405 ))
406 self._osdmap_changed(old_osdmap, self._latest_osdmap)
407 elif notify_type == "pg_summary":
408 data = self.get("pg_dump")
409 for ev_id, ev in self._events.items():
410 if isinstance(ev, PgRecoveryEvent):
411 ev.pg_update(data, self.log)
412 self.maybe_complete(ev)
413
414 def maybe_complete(self, event):
415 if event.progress >= 1.0:
416 self._complete(event)
417
418 def _save(self):
419 self.log.info("Writing back {0} completed events".format(
420 len(self._completed_events)
421 ))
422 # TODO: bound the number we store.
423 encoded = json.dumps({
424 "events": [ev.to_json() for ev in self._completed_events],
425 "version": ENCODING_VERSION,
426 "compat_version": ENCODING_VERSION
427 })
428 self.set_store("completed", encoded)
429
430 def _load(self):
431 stored = self.get_store("completed")
432
433 if stored is None:
434 self.log.info("No stored events to load")
435 return
436
437 decoded = json.loads(stored)
438 if decoded['compat_version'] > ENCODING_VERSION:
439 raise RuntimeError("Cannot decode version {0}".format(
440 decoded['compat_version']))
441
442 for ev in decoded['events']:
443 self._completed_events.append(GhostEvent(ev['id'], ev['message'], ev['refs']))
444
445 self._prune_completed_events()
446
447 def _prune_completed_events(self):
448 length = len(self._completed_events)
449 if length > self.max_completed_events:
450 self._completed_events = self._completed_events[length - self.max_completed_events : length]
451 self._dirty = True
452
453 def serve(self):
454 self.config_notify()
455 self.clear_all_progress_events()
456 self.log.info("Loading...")
457
458 self._load()
459 self.log.info("Loaded {0} historic events".format(self._completed_events))
460
461 self._latest_osdmap = self.get_osdmap()
462 self.log.info("Loaded OSDMap, ready.")
463
464 self._ready.set()
465
466 while not self._shutdown.is_set():
467 # Lazy periodic write back of completed events
468 if self._dirty:
469 self._save()
470 self._dirty = False
471
472 self._shutdown.wait(timeout=self.persist_interval)
473
474 self._shutdown.wait()
475
476 def shutdown(self):
477 self._shutdown.set()
478 self.clear_all_progress_events()
479
81eedcae 480 def update(self, ev_id, ev_msg, ev_progress, refs=None):
11fdf7f2
TL
481 """
482 For calling from other mgr modules
483 """
81eedcae
TL
484 if refs is None:
485 refs = []
11fdf7f2
TL
486 try:
487 ev = self._events[ev_id]
488 except KeyError:
81eedcae 489 ev = RemoteEvent(ev_id, ev_msg, refs)
11fdf7f2
TL
490 self._events[ev_id] = ev
491 self.log.info("update: starting ev {0} ({1})".format(
492 ev_id, ev_msg))
493 else:
494 self.log.debug("update: {0} on {1}".format(
495 ev_progress, ev_msg))
496
497 ev.set_progress(ev_progress)
498 ev._refresh()
499
500 def _complete(self, ev):
501 duration = (datetime.datetime.utcnow() - ev.started_at)
502 self.log.info("Completed event {0} ({1}) in {2} seconds".format(
503 ev.id, ev.message, duration.seconds
504 ))
505 self.complete_progress_event(ev.id)
506
507 self._completed_events.append(
508 GhostEvent(ev.id, ev.message, ev.refs))
509 del self._events[ev.id]
510 self._prune_completed_events()
511 self._dirty = True
512
513 def complete(self, ev_id):
514 """
515 For calling from other mgr modules
516 """
517 try:
518 ev = self._events[ev_id]
519 ev.set_progress(1.0)
520 self.log.info("complete: finished ev {0} ({1})".format(ev_id,
521 ev.message))
522 self._complete(ev)
523 except KeyError:
524 self.log.warn("complete: ev {0} does not exist".format(ev_id))
525 pass
526
527 def _handle_ls(self):
528 if len(self._events) or len(self._completed_events):
529 out = ""
530 chrono_order = sorted(self._events.values(),
531 key=lambda x: x.started_at, reverse=True)
532 for ev in chrono_order:
533 out += ev.twoline_progress()
534 out += "\n"
535
536 if len(self._completed_events):
537 # TODO: limit number of completed events to show
538 out += "\n"
539 for ev in self._completed_events:
540 out += "[Complete]: {0}\n".format(ev.twoline_progress())
541
542 return 0, out, ""
543 else:
544 return 0, "", "Nothing in progress"
545
546 def _json(self):
547 return {
548 'events': [ev.to_json() for ev in self._events.values()],
549 'completed': [ev.to_json() for ev in self._completed_events]
550 }
551
552 def _handle_clear(self):
553 self._events = {}
554 self._completed_events = []
555 self._dirty = True
556 self._save()
557
558 return 0, "", ""
559
560 def handle_command(self, _, cmd):
561 if cmd['prefix'] == "progress":
562 return self._handle_ls()
563 elif cmd['prefix'] == "progress clear":
564 # The clear command isn't usually needed - it's to enable
565 # the admin to "kick" this module if it seems to have done
566 # something wrong (e.g. we have a bug causing a progress event
567 # that never finishes)
568 return self._handle_clear()
569 elif cmd['prefix'] == "progress json":
570 return 0, json.dumps(self._json(), indent=2), ""
571 else:
572 raise NotImplementedError(cmd['prefix'])