]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/progress/module.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / pybind / mgr / progress / module.py
CommitLineData
11fdf7f2
TL
1from mgr_module import MgrModule
2import threading
3import datetime
4import uuid
5
6import json
7
8
9ENCODING_VERSION = 1
10
11# keep a global reference to the module so we can use it from Event methods
12_module = None
13
14
15class Event(object):
16 """
17 A generic "event" that has a start time, completion percentage,
18 and a list of "refs" that are (type, id) tuples describing which
19 objects (osds, pools) this relates to.
20 """
21
22 def __init__(self, message, refs):
23 self._message = message
24 self._refs = refs
25
26 self.started_at = datetime.datetime.utcnow()
27
28 self.id = None
29
30 def _refresh(self):
31 global _module
32 _module.log.debug('refreshing mgr for %s (%s) at %f' % (self.id, self._message,
81eedcae
TL
33 self.progress))
34 _module.update_progress_event(self.id, self._message, self.progress)
11fdf7f2
TL
35
36 @property
37 def message(self):
38 return self._message
39
40 @property
41 def refs(self):
42 return self._refs
43
44 @property
45 def progress(self):
46 raise NotImplementedError()
47
494da23a
TL
48 @property
49 def failed(self):
50 return False
51
52 @property
53 def failure_message(self):
54 return None
55
11fdf7f2 56 def summary(self):
81eedcae 57 return "{0} {1}".format(self.progress, self.message)
11fdf7f2
TL
58
59 def _progress_str(self, width):
60 inner_width = width - 2
61 out = "["
62 done_chars = int(self.progress * inner_width)
63 out += done_chars * '='
64 out += (inner_width - done_chars) * '.'
65 out += "]"
66
67 return out
68
69 def twoline_progress(self):
70 """
71 e.g.
72
73 - Eating my delicious strudel
74 [===============..............]
75
76 """
77 return "{0}\n {1}".format(
78 self._message, self._progress_str(30))
79
80 def to_json(self):
81 return {
82 "id": self.id,
83 "message": self.message,
84 "refs": self._refs
85 }
86
87
88class GhostEvent(Event):
89 """
90 The ghost of a completed event: these are the fields that we persist
91 after the event is complete.
92 """
93
494da23a
TL
94 def __init__(self, my_id, message, refs,
95 failed=False, failure_message=None):
11fdf7f2
TL
96 super(GhostEvent, self).__init__(message, refs)
97 self.id = my_id
98
494da23a
TL
99 if failed:
100 self._failed = True
101 self._failure_message = failure_message
102 else:
103 self._failed = False
104
11fdf7f2
TL
105 @property
106 def progress(self):
107 return 1.0
108
494da23a
TL
109 @property
110 def failed(self):
111 return self._failed
112
113 @property
114 def failure_message(self):
115 return self._failure_message if self._failed else None
116
117 def to_json(self):
118 d = {
119 "id": self.id,
120 "message": self.message,
121 "refs": self._refs
122 }
123 if self._failed:
124 d["failed"] = True
125 d["failure_message"] = self._failure_message
126 return d
127
11fdf7f2
TL
128
129class RemoteEvent(Event):
130 """
131 An event that was published by another module: we know nothing about
132 this, rely on the other module to continuously update us with
133 progress information as it emerges.
134 """
135
136 def __init__(self, my_id, message, refs):
137 super(RemoteEvent, self).__init__(message, refs)
138 self.id = my_id
139 self._progress = 0.0
494da23a 140 self._failed = False
11fdf7f2
TL
141 self._refresh()
142
143 def set_progress(self, progress):
144 self._progress = progress
145 self._refresh()
146
494da23a
TL
147 def set_failed(self, message):
148 self._progress = 1.0
149 self._failed = True
150 self._failure_message = message
151 self._refresh()
152
11fdf7f2
TL
153 @property
154 def progress(self):
155 return self._progress
156
494da23a
TL
157 @property
158 def failed(self):
159 return self._failed
160
161 @property
162 def failure_message(self):
163 return self._failure_message if self._failed else None
164
11fdf7f2
TL
165
166class PgRecoveryEvent(Event):
167 """
168 An event whose completion is determined by the recovery of a set of
169 PGs to a healthy state.
170
171 Always call update() immediately after construction.
172 """
173
81eedcae 174 def __init__(self, message, refs, which_pgs, evacuate_osds):
11fdf7f2
TL
175 super(PgRecoveryEvent, self).__init__(message, refs)
176
177 self._pgs = which_pgs
178
81eedcae 179 self._evacuate_osds = evacuate_osds
11fdf7f2
TL
180
181 self._original_pg_count = len(self._pgs)
182
183 self._original_bytes_recovered = None
184
185 self._progress = 0.0
186
187 self.id = str(uuid.uuid4())
188 self._refresh()
189
190 @property
191 def evacuating_osds(self):
192 return self. _evacuate_osds
193
194 def pg_update(self, pg_dump, log):
195 # FIXME: O(pg_num) in python
196 # FIXME: far more fields getting pythonized than we really care about
197 pg_to_state = dict([(p['pgid'], p) for p in pg_dump['pg_stats']])
198
199 if self._original_bytes_recovered is None:
200 self._original_bytes_recovered = {}
81eedcae 201 missing_pgs = []
11fdf7f2
TL
202 for pg in self._pgs:
203 pg_str = str(pg)
81eedcae
TL
204 if pg_str in pg_to_state:
205 self._original_bytes_recovered[pg] = \
206 pg_to_state[pg_str]['stat_sum']['num_bytes_recovered']
207 else:
208 missing_pgs.append(pg)
209 if pg_dump.get('pg_ready', False):
210 for pg in missing_pgs:
211 self._pgs.remove(pg)
11fdf7f2
TL
212
213 complete_accumulate = 0.0
214
215 # Calculating progress as the number of PGs recovered divided by the
216 # original where partially completed PGs count for something
217 # between 0.0-1.0. This is perhaps less faithful than looking at the
218 # total number of bytes recovered, but it does a better job of
219 # representing the work still to do if there are a number of very
220 # few-bytes PGs that still need the housekeeping of their recovery
221 # to be done. This is subjective...
222
223 complete = set()
224 for pg in self._pgs:
225 pg_str = str(pg)
226 try:
227 info = pg_to_state[pg_str]
228 except KeyError:
229 # The PG is gone! Probably a pool was deleted. Drop it.
230 complete.add(pg)
231 continue
232
233 state = info['state']
234
235 states = state.split("+")
236
237 unmoved = bool(set(self._evacuate_osds) & (
238 set(info['up']) | set(info['acting'])))
239
240 if "active" in states and "clean" in states and not unmoved:
241 complete.add(pg)
242 else:
243 if info['stat_sum']['num_bytes'] == 0:
244 # Empty PGs are considered 0% done until they are
245 # in the correct state.
246 pass
247 else:
248 recovered = info['stat_sum']['num_bytes_recovered']
249 total_bytes = info['stat_sum']['num_bytes']
250 if total_bytes > 0:
251 ratio = float(recovered -
252 self._original_bytes_recovered[pg]) / \
253 total_bytes
254
255 # Since the recovered bytes (over time) could perhaps
256 # exceed the contents of the PG (moment in time), we
257 # must clamp this
258 ratio = min(ratio, 1.0)
259
260 else:
261 # Dataless PGs (e.g. containing only OMAPs) count
262 # as half done.
263 ratio = 0.5
264
265 complete_accumulate += ratio
266
267 self._pgs = list(set(self._pgs) ^ complete)
268 completed_pgs = self._original_pg_count - len(self._pgs)
269 self._progress = (completed_pgs + complete_accumulate)\
270 / self._original_pg_count
271 self._refresh()
272
273 log.info("Updated progress to {0} ({1})".format(
274 self._progress, self._message
275 ))
276
277 @property
278 def progress(self):
279 return self._progress
280
281
282class PgId(object):
283 def __init__(self, pool_id, ps):
284 self.pool_id = pool_id
285 self.ps = ps
286
287 def __cmp__(self, other):
288 return (self.pool_id, self.ps) == (other.pool_id, other.ps)
289
290 def __lt__(self, other):
291 return (self.pool_id, self.ps) < (other.pool_id, other.ps)
292
293 def __str__(self):
294 return "{0}.{1:x}".format(self.pool_id, self.ps)
295
296
297class Module(MgrModule):
298 COMMANDS = [
299 {"cmd": "progress",
300 "desc": "Show progress of recovery operations",
301 "perm": "r"},
302 {"cmd": "progress json",
303 "desc": "Show machine readable progress information",
304 "perm": "r"},
305 {"cmd": "progress clear",
306 "desc": "Reset progress tracking",
307 "perm": "rw"}
308 ]
309
310 MODULE_OPTIONS = [
311 {
312 'name': 'max_completed_events',
313 'default': 50,
314 'type': 'int',
315 'desc': 'number of past completed events to remember',
316 'runtime': True,
317 },
318 {
319 'name': 'persist_interval',
320 'default': 5,
321 'type': 'secs',
322 'desc': 'how frequently to persist completed events',
323 'runtime': True,
324 },
325 ]
326
327 def __init__(self, *args, **kwargs):
328 super(Module, self).__init__(*args, **kwargs)
329
330 self._events = {}
331 self._completed_events = []
332
333 self._old_osd_map = None
334
335 self._ready = threading.Event()
336 self._shutdown = threading.Event()
337
338 self._latest_osdmap = None
339
340 self._dirty = False
341
342 global _module
343 _module = self
344
345 def config_notify(self):
346 for opt in self.MODULE_OPTIONS:
347 setattr(self,
348 opt['name'],
349 self.get_module_option(opt['name']))
350 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
351
352 def _osd_out(self, old_map, old_dump, new_map, osd_id):
353 affected_pgs = []
354 unmoved_pgs = []
355 for pool in old_dump['pools']:
356 pool_id = pool['pool']
357 for ps in range(0, pool['pg_num']):
358 up_acting = old_map.pg_to_up_acting_osds(pool['pool'], ps)
359
360 # Was this OSD affected by the OSD going out?
361 old_osds = set(up_acting['up']) | set(up_acting['acting'])
362 was_on_out_osd = osd_id in old_osds
363 if not was_on_out_osd:
364 continue
365
366 self.log.debug("pool_id, ps = {0}, {1}".format(
367 pool_id, ps
368 ))
369
370 self.log.debug(
371 "up_acting: {0}".format(json.dumps(up_acting, indent=2)))
372
373 new_up_acting = new_map.pg_to_up_acting_osds(pool['pool'], ps)
374 new_osds = set(new_up_acting['up']) | set(new_up_acting['acting'])
375
376 # Has this OSD been assigned a new location?
377 # (it might not be if there is no suitable place to move
378 # after an OSD failure)
379 is_relocated = len(new_osds - old_osds) > 0
380
381 self.log.debug(
382 "new_up_acting: {0}".format(json.dumps(new_up_acting,
383 indent=2)))
384
385 if was_on_out_osd and is_relocated:
386 # This PG is now in motion, track its progress
387 affected_pgs.append(PgId(pool_id, ps))
388 elif not is_relocated:
389 # This PG didn't get a new location, we'll log it
390 unmoved_pgs.append(PgId(pool_id, ps))
391
392 # In the case that we ignored some PGs, log the reason why (we may
393 # not end up creating a progress event)
394 if len(unmoved_pgs):
395 self.log.warn("{0} PGs were on osd.{1}, but didn't get new locations".format(
396 len(unmoved_pgs), osd_id))
397
398 self.log.warn("{0} PGs affected by osd.{1} going out".format(
399 len(affected_pgs), osd_id))
400
401 if len(affected_pgs) == 0:
402 # Don't emit events if there were no PGs
403 return
404
405 # TODO: reconcile with existing events referring to this OSD going out
406 ev = PgRecoveryEvent(
407 "Rebalancing after osd.{0} marked out".format(osd_id),
408 refs=[("osd", osd_id)],
409 which_pgs=affected_pgs,
81eedcae 410 evacuate_osds=[osd_id]
11fdf7f2
TL
411 )
412 ev.pg_update(self.get("pg_dump"), self.log)
413 self._events[ev.id] = ev
414
415 def _osd_in(self, osd_id):
416 for ev_id, ev in self._events.items():
417 if isinstance(ev, PgRecoveryEvent) and osd_id in ev.evacuating_osds:
418 self.log.info("osd.{0} came back in, cancelling event".format(
419 osd_id
420 ))
421 self._complete(ev)
422
423 def _osdmap_changed(self, old_osdmap, new_osdmap):
424 old_dump = old_osdmap.dump()
425 new_dump = new_osdmap.dump()
426
427 old_osds = dict([(o['osd'], o) for o in old_dump['osds']])
428
429 for osd in new_dump['osds']:
430 osd_id = osd['osd']
431 new_weight = osd['in']
432 if osd_id in old_osds:
433 old_weight = old_osds[osd_id]['in']
434
435 if new_weight == 0.0 and old_weight > new_weight:
436 self.log.warn("osd.{0} marked out".format(osd_id))
437 self._osd_out(old_osdmap, old_dump, new_osdmap, osd_id)
438 elif new_weight >= 1.0 and old_weight == 0.0:
439 # Only consider weight>=1.0 as "in" to avoid spawning
440 # individual recovery events on every adjustment
441 # in a gradual weight-in
442 self.log.warn("osd.{0} marked in".format(osd_id))
443 self._osd_in(osd_id)
444
445 def notify(self, notify_type, notify_data):
446 self._ready.wait()
447
448 if notify_type == "osd_map":
449 old_osdmap = self._latest_osdmap
450 self._latest_osdmap = self.get_osdmap()
451
452 self.log.info("Processing OSDMap change {0}..{1}".format(
453 old_osdmap.get_epoch(), self._latest_osdmap.get_epoch()
454 ))
455 self._osdmap_changed(old_osdmap, self._latest_osdmap)
456 elif notify_type == "pg_summary":
457 data = self.get("pg_dump")
458 for ev_id, ev in self._events.items():
459 if isinstance(ev, PgRecoveryEvent):
460 ev.pg_update(data, self.log)
461 self.maybe_complete(ev)
462
463 def maybe_complete(self, event):
464 if event.progress >= 1.0:
465 self._complete(event)
466
467 def _save(self):
468 self.log.info("Writing back {0} completed events".format(
469 len(self._completed_events)
470 ))
471 # TODO: bound the number we store.
472 encoded = json.dumps({
473 "events": [ev.to_json() for ev in self._completed_events],
474 "version": ENCODING_VERSION,
475 "compat_version": ENCODING_VERSION
476 })
477 self.set_store("completed", encoded)
478
479 def _load(self):
480 stored = self.get_store("completed")
481
482 if stored is None:
483 self.log.info("No stored events to load")
484 return
485
486 decoded = json.loads(stored)
487 if decoded['compat_version'] > ENCODING_VERSION:
488 raise RuntimeError("Cannot decode version {0}".format(
489 decoded['compat_version']))
490
491 for ev in decoded['events']:
494da23a
TL
492 self._completed_events.append(GhostEvent(ev['id'], ev['message'],
493 ev['refs'],
494 ev.get('failed', False),
495 ev.get('failure_message')))
11fdf7f2
TL
496
497 self._prune_completed_events()
498
499 def _prune_completed_events(self):
500 length = len(self._completed_events)
501 if length > self.max_completed_events:
502 self._completed_events = self._completed_events[length - self.max_completed_events : length]
503 self._dirty = True
504
505 def serve(self):
506 self.config_notify()
507 self.clear_all_progress_events()
508 self.log.info("Loading...")
509
510 self._load()
511 self.log.info("Loaded {0} historic events".format(self._completed_events))
512
513 self._latest_osdmap = self.get_osdmap()
514 self.log.info("Loaded OSDMap, ready.")
515
516 self._ready.set()
517
518 while not self._shutdown.is_set():
519 # Lazy periodic write back of completed events
520 if self._dirty:
521 self._save()
522 self._dirty = False
523
524 self._shutdown.wait(timeout=self.persist_interval)
525
526 self._shutdown.wait()
527
528 def shutdown(self):
529 self._shutdown.set()
530 self.clear_all_progress_events()
531
81eedcae 532 def update(self, ev_id, ev_msg, ev_progress, refs=None):
11fdf7f2
TL
533 """
534 For calling from other mgr modules
535 """
81eedcae
TL
536 if refs is None:
537 refs = []
11fdf7f2
TL
538 try:
539 ev = self._events[ev_id]
540 except KeyError:
81eedcae 541 ev = RemoteEvent(ev_id, ev_msg, refs)
11fdf7f2
TL
542 self._events[ev_id] = ev
543 self.log.info("update: starting ev {0} ({1})".format(
544 ev_id, ev_msg))
545 else:
546 self.log.debug("update: {0} on {1}".format(
547 ev_progress, ev_msg))
548
549 ev.set_progress(ev_progress)
550 ev._refresh()
551
552 def _complete(self, ev):
553 duration = (datetime.datetime.utcnow() - ev.started_at)
554 self.log.info("Completed event {0} ({1}) in {2} seconds".format(
555 ev.id, ev.message, duration.seconds
556 ))
557 self.complete_progress_event(ev.id)
558
559 self._completed_events.append(
494da23a
TL
560 GhostEvent(ev.id, ev.message, ev.refs,
561 failed=ev.failed, failure_message=ev.failure_message))
11fdf7f2
TL
562 del self._events[ev.id]
563 self._prune_completed_events()
564 self._dirty = True
565
566 def complete(self, ev_id):
567 """
568 For calling from other mgr modules
569 """
570 try:
571 ev = self._events[ev_id]
572 ev.set_progress(1.0)
573 self.log.info("complete: finished ev {0} ({1})".format(ev_id,
574 ev.message))
575 self._complete(ev)
576 except KeyError:
577 self.log.warn("complete: ev {0} does not exist".format(ev_id))
578 pass
579
494da23a
TL
580 def fail(self, ev_id, message):
581 """
582 For calling from other mgr modules to mark an event as failed (and
583 complete)
584 """
585 try:
586 ev = self._events[ev_id]
587 ev.set_failed(message)
588 self.log.info("fail: finished ev {0} ({1}): {2}".format(ev_id,
589 ev.message,
590 message))
591 self._complete(ev)
592 except KeyError:
593 self.log.warn("fail: ev {0} does not exist".format(ev_id))
594
11fdf7f2
TL
595 def _handle_ls(self):
596 if len(self._events) or len(self._completed_events):
597 out = ""
598 chrono_order = sorted(self._events.values(),
599 key=lambda x: x.started_at, reverse=True)
600 for ev in chrono_order:
601 out += ev.twoline_progress()
602 out += "\n"
603
604 if len(self._completed_events):
605 # TODO: limit number of completed events to show
606 out += "\n"
607 for ev in self._completed_events:
494da23a
TL
608 out += "[{0}]: {1}\n".format("Complete" if not ev.failed else "Failed",
609 ev.twoline_progress())
11fdf7f2
TL
610
611 return 0, out, ""
612 else:
613 return 0, "", "Nothing in progress"
614
615 def _json(self):
616 return {
617 'events': [ev.to_json() for ev in self._events.values()],
618 'completed': [ev.to_json() for ev in self._completed_events]
619 }
620
621 def _handle_clear(self):
622 self._events = {}
623 self._completed_events = []
624 self._dirty = True
625 self._save()
626
627 return 0, "", ""
628
629 def handle_command(self, _, cmd):
630 if cmd['prefix'] == "progress":
631 return self._handle_ls()
632 elif cmd['prefix'] == "progress clear":
633 # The clear command isn't usually needed - it's to enable
634 # the admin to "kick" this module if it seems to have done
635 # something wrong (e.g. we have a bug causing a progress event
636 # that never finishes)
637 return self._handle_clear()
638 elif cmd['prefix'] == "progress json":
639 return 0, json.dumps(self._json(), indent=2), ""
640 else:
641 raise NotImplementedError(cmd['prefix'])