]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/progress/module.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / progress / module.py
1 try:
2 from typing import List, Dict, Union, Any, Optional
3 from typing import TYPE_CHECKING
4 except ImportError:
5 TYPE_CHECKING = False
6
7 from mgr_module import MgrModule, OSDMap
8 from mgr_util import to_pretty_timedelta
9 from datetime import timedelta
10 import os
11 import threading
12 import datetime
13 import uuid
14 import time
15
16 import json
17
18
19 ENCODING_VERSION = 2
20
21 # keep a global reference to the module so we can use it from Event methods
22 _module = None # type: Optional["Module"]
23
24 # if unit test we want MgrModule to be blank
25 if 'UNITTEST' in os.environ:
26 MgrModule = object # type: ignore
27
28 class Event(object):
29 """
30 A generic "event" that has a start time, completion percentage,
31 and a list of "refs" that are (type, id) tuples describing which
32 objects (osds, pools) this relates to.
33 """
34
35 def __init__(self, message, refs, started_at=None):
36 # type: (str, List[str], Optional[float]) -> None
37 self._message = message
38 self._refs = refs
39 self.started_at = started_at if started_at else time.time()
40 self.id = None # type: Optional[str]
41
42 def _refresh(self):
43 global _module
44 assert _module
45 _module.log.debug('refreshing mgr for %s (%s) at %f' % (self.id, self._message,
46 self.progress))
47 _module.update_progress_event(
48 self.id, self.twoline_progress(6), self.progress)
49
50 @property
51 def message(self):
52 # type: () -> str
53 return self._message
54
55 @property
56 def refs(self):
57 # type: () -> List[str]
58 return self._refs
59
60 @property
61 def progress(self):
62 # type: () -> float
63 raise NotImplementedError()
64
65 @property
66 def duration_str(self):
67 duration = time.time() - self.started_at
68 return "(%s)" % (
69 to_pretty_timedelta(timedelta(seconds=duration)))
70
71 @property
72 def failed(self):
73 return False
74
75 @property
76 def failure_message(self):
77 return None
78
79 def summary(self):
80 # type: () -> str
81 return "{0} {1} {2}".format(self.progress, self.message,
82 self.duration_str)
83
84 def _progress_str(self, width):
85 inner_width = width - 2
86 out = "["
87 done_chars = int(self.progress * inner_width)
88 out += done_chars * '='
89 out += (inner_width - done_chars) * '.'
90 out += "]"
91
92 return out
93
94 def twoline_progress(self, indent=4):
95 """
96 e.g.
97
98 - Eating my delicious strudel (since: 30s)
99 [===============..............] (remaining: 04m)
100
101 """
102 time_remaining = self.estimated_time_remaining()
103 if time_remaining:
104 remaining = "(remaining: %s)" % (
105 to_pretty_timedelta(timedelta(seconds=time_remaining)))
106 else:
107 remaining = ''
108 return "{0} {1}\n{2}{3} {4}".format(self._message,
109 self.duration_str,
110 " " * indent,
111 self._progress_str(30),
112 remaining)
113
114 def to_json(self):
115 # type: () -> Dict[str, Any]
116 return {
117 "id": self.id,
118 "message": self.message,
119 "duration": self.duration_str,
120 "refs": self._refs,
121 "progress": self.progress,
122 "started_at": self.started_at,
123 "time_remaining": self.estimated_time_remaining()
124 }
125
126 def estimated_time_remaining(self):
127 elapsed = time.time() - self.started_at
128 progress = self.progress
129 if progress == 0.0:
130 return None
131 return int(elapsed * (1 - progress) / progress)
132
133 class GhostEvent(Event):
134 """
135 The ghost of a completed event: these are the fields that we persist
136 after the event is complete.
137 """
138
139 def __init__(self, my_id, message, refs, started_at, finished_at=None,
140 failed=False, failure_message=None):
141 super(GhostEvent, self).__init__(message, refs, started_at)
142 self.finished_at = finished_at if finished_at else time.time()
143 self.id = my_id
144
145 if failed:
146 self._failed = True
147 self._failure_message = failure_message
148 else:
149 self._failed = False
150
151 @property
152 def progress(self):
153 return 1.0
154
155 @property
156 def failed(self):
157 return self._failed
158
159 @property
160 def failure_message(self):
161 return self._failure_message if self._failed else None
162
163 def to_json(self):
164 d = {
165 "id": self.id,
166 "message": self.message,
167 "refs": self._refs,
168 "started_at": self.started_at,
169 "finished_at": self.finished_at
170 }
171 if self._failed:
172 d["failed"] = True
173 d["failure_message"] = self._failure_message
174 return d
175
176
177 class RemoteEvent(Event):
178 """
179 An event that was published by another module: we know nothing about
180 this, rely on the other module to continuously update us with
181 progress information as it emerges.
182 """
183
184 def __init__(self, my_id, message, refs):
185 # type: (str, str, List[str]) -> None
186 super(RemoteEvent, self).__init__(message, refs)
187 self.id = my_id
188 self._progress = 0.0
189 self._failed = False
190 self._refresh()
191
192 def set_progress(self, progress):
193 # type: (float) -> None
194 self._progress = progress
195 self._refresh()
196
197 def set_failed(self, message):
198 self._progress = 1.0
199 self._failed = True
200 self._failure_message = message
201 self._refresh()
202
203 def set_message(self, message):
204 self._message = message
205 self._refresh()
206
207 @property
208 def progress(self):
209 return self._progress
210
211 @property
212 def failed(self):
213 return self._failed
214
215 @property
216 def failure_message(self):
217 return self._failure_message if self._failed else None
218
219
220 class PgRecoveryEvent(Event):
221 """
222 An event whose completion is determined by the recovery of a set of
223 PGs to a healthy state.
224
225 Always call update() immediately after construction.
226 """
227
228 def __init__(self, message, refs, which_pgs, which_osds, start_epoch):
229 # type: (str, List[Any], List[PgId], List[str], int) -> None
230 super(PgRecoveryEvent, self).__init__(message, refs)
231
232 self._pgs = which_pgs
233
234 self._which_osds = which_osds
235
236 self._original_pg_count = len(self._pgs)
237
238 self._original_bytes_recovered = None # type: Optional[Dict[PgId, float]]
239
240 self._progress = 0.0
241
242 # self._start_epoch = _module.get_osdmap().get_epoch()
243 self._start_epoch = start_epoch
244
245 self.id = str(uuid.uuid4()) # type: str
246 self._refresh()
247
248 @property
249 def which_osds(self):
250 return self. _which_osds
251
252 def pg_update(self, raw_pg_stats, pg_ready, log):
253 # type: (Dict, bool, Any) -> None
254 # FIXME: O(pg_num) in python
255 # FIXME: far more fields getting pythonized than we really care about
256 # Sanity check to see if there are any missing PGs and to assign
257 # empty array and dictionary if there hasn't been any recovery
258 pg_to_state = dict([(p['pgid'], p) for p in raw_pg_stats['pg_stats']]) # type: Dict[str, Any]
259 if self._original_bytes_recovered is None:
260 self._original_bytes_recovered = {}
261 missing_pgs = []
262 for pg in self._pgs:
263 pg_str = str(pg)
264 if pg_str in pg_to_state:
265 self._original_bytes_recovered[pg] = \
266 pg_to_state[pg_str]['stat_sum']['num_bytes_recovered']
267 else:
268 missing_pgs.append(pg)
269 if pg_ready:
270 for pg in missing_pgs:
271 self._pgs.remove(pg)
272
273 complete_accumulate = 0.0
274
275 # Calculating progress as the number of PGs recovered divided by the
276 # original where partially completed PGs count for something
277 # between 0.0-1.0. This is perhaps less faithful than looking at the
278 # total number of bytes recovered, but it does a better job of
279 # representing the work still to do if there are a number of very
280 # few-bytes PGs that still need the housekeeping of their recovery
281 # to be done. This is subjective...
282
283 complete = set()
284 for pg in self._pgs:
285 pg_str = str(pg)
286 try:
287 info = pg_to_state[pg_str]
288 except KeyError:
289 # The PG is gone! Probably a pool was deleted. Drop it.
290 complete.add(pg)
291 continue
292 # Only checks the state of each PGs when it's epoch >= the OSDMap's epoch
293 if int(info['reported_epoch']) < int(self._start_epoch):
294 continue
295
296 state = info['state']
297
298 states = state.split("+")
299
300 if "active" in states and "clean" in states:
301 complete.add(pg)
302 else:
303 if info['stat_sum']['num_bytes'] == 0:
304 # Empty PGs are considered 0% done until they are
305 # in the correct state.
306 pass
307 else:
308 recovered = info['stat_sum']['num_bytes_recovered']
309 total_bytes = info['stat_sum']['num_bytes']
310 if total_bytes > 0:
311 ratio = float(recovered -
312 self._original_bytes_recovered[pg]) / \
313 total_bytes
314 # Since the recovered bytes (over time) could perhaps
315 # exceed the contents of the PG (moment in time), we
316 # must clamp this
317 ratio = min(ratio, 1.0)
318 ratio = max(ratio, 0.0)
319
320 else:
321 # Dataless PGs (e.g. containing only OMAPs) count
322 # as half done.
323 ratio = 0.5
324 complete_accumulate += ratio
325
326 self._pgs = list(set(self._pgs) ^ complete)
327 completed_pgs = self._original_pg_count - len(self._pgs)
328 self._progress = (completed_pgs + complete_accumulate)\
329 / self._original_pg_count
330
331 self._refresh()
332 log.info("Updated progress to %s", self.summary())
333
334 @property
335 def progress(self):
336 # type: () -> float
337 return self._progress
338
339
340 class PgId(object):
341 def __init__(self, pool_id, ps):
342 # type: (str, int) -> None
343 self.pool_id = pool_id
344 self.ps = ps
345
346 def __cmp__(self, other):
347 return (self.pool_id, self.ps) == (other.pool_id, other.ps)
348
349 def __lt__(self, other):
350 return (self.pool_id, self.ps) < (other.pool_id, other.ps)
351
352 def __str__(self):
353 return "{0}.{1:x}".format(self.pool_id, self.ps)
354
355
356 class Module(MgrModule):
357 COMMANDS = [
358 {"cmd": "progress",
359 "desc": "Show progress of recovery operations",
360 "perm": "r"},
361 {"cmd": "progress json",
362 "desc": "Show machine readable progress information",
363 "perm": "r"},
364 {"cmd": "progress clear",
365 "desc": "Reset progress tracking",
366 "perm": "rw"}
367 ]
368
369 MODULE_OPTIONS = [
370 {
371 'name': 'max_completed_events',
372 'default': 50,
373 'type': 'int',
374 'desc': 'number of past completed events to remember',
375 'runtime': True,
376 },
377 {
378 'name': 'persist_interval',
379 'default': 5,
380 'type': 'secs',
381 'desc': 'how frequently to persist completed events',
382 'runtime': True,
383 },
384 ] # type: List[Dict[str, Any]]
385
386 def __init__(self, *args, **kwargs):
387 super(Module, self).__init__(*args, **kwargs)
388
389 self._events = {} # type: Dict[str, Union[RemoteEvent, PgRecoveryEvent]]
390 self._completed_events = [] # type: List[GhostEvent]
391
392 self._old_osd_map = None # type: Optional[OSDMap]
393
394 self._ready = threading.Event()
395 self._shutdown = threading.Event()
396
397 self._latest_osdmap = None # type: Optional[OSDMap]
398
399 self._dirty = False
400
401 global _module
402 _module = self
403
404 # only for mypy
405 if TYPE_CHECKING:
406 self.max_completed_events = 0
407 self.persist_interval = 0
408
409 def config_notify(self):
410 for opt in self.MODULE_OPTIONS:
411 setattr(self,
412 opt['name'],
413 self.get_module_option(opt['name']))
414 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
415
416 def _osd_in_out(self, old_map, old_dump, new_map, osd_id, marked):
417 # type: (OSDMap, Dict, OSDMap, str, str) -> None
418 # A function that will create or complete an event when an
419 # OSD is marked in or out according to the affected PGs
420 affected_pgs = []
421 unmoved_pgs = []
422 for pool in old_dump['pools']:
423 pool_id = pool['pool'] # type: str
424 for ps in range(0, pool['pg_num']):
425
426 # Was this OSD affected by the OSD coming in/out?
427 # Compare old and new osds using
428 # data from the json dump
429 old_up_acting = old_map.pg_to_up_acting_osds(pool['pool'], ps)
430 old_osds = set(old_up_acting['acting'])
431 new_up_acting = new_map.pg_to_up_acting_osds(pool['pool'], ps)
432 new_osds = set(new_up_acting['acting'])
433
434 # Check the osd_id being in the acting set for both old
435 # and new maps to cover both out and in cases
436 was_on_out_or_in_osd = osd_id in old_osds or osd_id in new_osds
437 if not was_on_out_or_in_osd:
438 continue
439
440 self.log.debug("pool_id, ps = {0}, {1}".format(
441 pool_id, ps
442 ))
443
444 self.log.debug(
445 "old_up_acting: {0}".format(json.dumps(old_up_acting, indent=4, sort_keys=True)))
446
447 # Has this OSD been assigned a new location?
448 # (it might not be if there is no suitable place to move
449 # after an OSD is marked in/out)
450 if marked == "in":
451 is_relocated = len(old_osds - new_osds) > 0
452 else:
453 is_relocated = len(new_osds - old_osds) > 0
454
455 self.log.debug(
456 "new_up_acting: {0}".format(json.dumps(new_up_acting,
457 indent=4,
458 sort_keys=True)))
459
460 if was_on_out_or_in_osd and is_relocated:
461 # This PG is now in motion, track its progress
462 affected_pgs.append(PgId(pool_id, ps))
463 elif not is_relocated:
464 # This PG didn't get a new location, we'll log it
465 unmoved_pgs.append(PgId(pool_id, ps))
466
467 # In the case that we ignored some PGs, log the reason why (we may
468 # not end up creating a progress event)
469 if len(unmoved_pgs):
470 self.log.warning("{0} PGs were on osd.{1}, but didn't get new locations".format(
471 len(unmoved_pgs), osd_id))
472
473 self.log.warning("{0} PGs affected by osd.{1} being marked {2}".format(
474 len(affected_pgs), osd_id, marked))
475
476
477 # In the case of the osd coming back in, we might need to cancel
478 # previous recovery event for that osd
479 if marked == "in":
480 for ev_id in list(self._events):
481 ev = self._events[ev_id]
482 if isinstance(ev, PgRecoveryEvent) and osd_id in ev.which_osds:
483 self.log.info("osd.{0} came back in, cancelling event".format(
484 osd_id
485 ))
486 self._complete(ev)
487
488 if len(affected_pgs) > 0:
489 r_ev = PgRecoveryEvent(
490 "Rebalancing after osd.{0} marked {1}".format(osd_id, marked),
491 refs=[("osd", osd_id)],
492 which_pgs=affected_pgs,
493 which_osds=[osd_id],
494 start_epoch=self.get_osdmap().get_epoch()
495 )
496 r_ev.pg_update(self.get("pg_stats"), self.get("pg_ready"), self.log)
497 self._events[r_ev.id] = r_ev
498
499 def _osdmap_changed(self, old_osdmap, new_osdmap):
500 # type: (OSDMap, OSDMap) -> None
501 old_dump = old_osdmap.dump()
502 new_dump = new_osdmap.dump()
503
504 old_osds = dict([(o['osd'], o) for o in old_dump['osds']])
505
506 for osd in new_dump['osds']:
507 osd_id = osd['osd']
508 new_weight = osd['in']
509 if osd_id in old_osds:
510 old_weight = old_osds[osd_id]['in']
511
512 if new_weight == 0.0 and old_weight > new_weight:
513 self.log.warning("osd.{0} marked out".format(osd_id))
514 self._osd_in_out(old_osdmap, old_dump, new_osdmap, osd_id, "out")
515 elif new_weight >= 1.0 and old_weight == 0.0:
516 # Only consider weight>=1.0 as "in" to avoid spawning
517 # individual recovery events on every adjustment
518 # in a gradual weight-in
519 self.log.warning("osd.{0} marked in".format(osd_id))
520 self._osd_in_out(old_osdmap, old_dump, new_osdmap, osd_id, "in")
521
522 def notify(self, notify_type, notify_data):
523 self._ready.wait()
524
525 if notify_type == "osd_map":
526 old_osdmap = self._latest_osdmap
527 self._latest_osdmap = self.get_osdmap()
528 assert old_osdmap
529 assert self._latest_osdmap
530
531 self.log.info("Processing OSDMap change {0}..{1}".format(
532 old_osdmap.get_epoch(), self._latest_osdmap.get_epoch()
533 ))
534 self._osdmap_changed(old_osdmap, self._latest_osdmap)
535 elif notify_type == "pg_summary":
536 # if there are no events we will skip this here to avoid
537 # expensive get calls
538 if len(self._events) == 0:
539 return
540 data = self.get("pg_stats")
541 ready = self.get("pg_ready")
542 for ev_id in list(self._events):
543 ev = self._events[ev_id]
544 if isinstance(ev, PgRecoveryEvent):
545 ev.pg_update(data, ready, self.log)
546 self.maybe_complete(ev)
547
548 def maybe_complete(self, event):
549 # type: (Event) -> None
550 if event.progress >= 1.0:
551 self._complete(event)
552
553 def _save(self):
554 self.log.info("Writing back {0} completed events".format(
555 len(self._completed_events)
556 ))
557 # TODO: bound the number we store.
558 encoded = json.dumps({
559 "events": [ev.to_json() for ev in self._completed_events],
560 "version": ENCODING_VERSION,
561 "compat_version": ENCODING_VERSION
562 })
563 self.set_store("completed", encoded)
564
565 def _load(self):
566 stored = self.get_store("completed")
567
568 if stored is None:
569 self.log.info("No stored events to load")
570 return
571
572 decoded = json.loads(stored)
573 if decoded['compat_version'] > ENCODING_VERSION:
574 raise RuntimeError("Cannot decode version {0}".format(
575 decoded['compat_version']))
576
577 if decoded['compat_version'] < ENCODING_VERSION:
578 # we need to add the "started_at" and "finished_at" attributes to the events
579 for ev in decoded['events']:
580 ev['started_at'] = None
581 ev['finished_at'] = None
582
583 for ev in decoded['events']:
584 self._completed_events.append(GhostEvent(ev['id'], ev['message'],
585 ev['refs'], ev['started_at'],
586 ev['finished_at'],
587 ev.get('failed', False),
588 ev.get('failure_message')))
589
590 self._prune_completed_events()
591
592 def _prune_completed_events(self):
593 length = len(self._completed_events)
594 if length > self.max_completed_events:
595 self._completed_events = self._completed_events[length - self.max_completed_events : length]
596 self._dirty = True
597
598 def serve(self):
599 self.config_notify()
600 self.clear_all_progress_events()
601 self.log.info("Loading...")
602
603 self._load()
604 self.log.info("Loaded {0} historic events".format(self._completed_events))
605
606 self._latest_osdmap = self.get_osdmap()
607 self.log.info("Loaded OSDMap, ready.")
608
609 self._ready.set()
610
611 while not self._shutdown.is_set():
612 # Lazy periodic write back of completed events
613 if self._dirty:
614 self._save()
615 self._dirty = False
616
617 self._shutdown.wait(timeout=self.persist_interval)
618
619 self._shutdown.wait()
620
621 def shutdown(self):
622 self._shutdown.set()
623 self.clear_all_progress_events()
624
625 def update(self, ev_id, ev_msg, ev_progress, refs=None):
626 # type: (str, str, float, Optional[list]) -> None
627 """
628 For calling from other mgr modules
629 """
630 if refs is None:
631 refs = []
632 try:
633
634 ev = self._events[ev_id]
635 assert isinstance(ev, RemoteEvent)
636 except KeyError:
637 ev = RemoteEvent(ev_id, ev_msg, refs)
638 self._events[ev_id] = ev
639 self.log.info("update: starting ev {0} ({1})".format(
640 ev_id, ev_msg))
641 else:
642 self.log.debug("update: {0} on {1}".format(
643 ev_progress, ev_msg))
644
645 ev.set_progress(ev_progress)
646 ev.set_message(ev_msg)
647
648 def _complete(self, ev):
649 # type: (Event) -> None
650 duration = (time.time() - ev.started_at)
651 self.log.info("Completed event {0} ({1}) in {2} seconds".format(
652 ev.id, ev.message, int(round(duration))
653 ))
654 self.complete_progress_event(ev.id)
655
656 self._completed_events.append(
657 GhostEvent(ev.id, ev.message, ev.refs, ev.started_at,
658 failed=ev.failed, failure_message=ev.failure_message))
659 assert ev.id
660 del self._events[ev.id]
661 self._prune_completed_events()
662 self._dirty = True
663
664 def complete(self, ev_id):
665 """
666 For calling from other mgr modules
667 """
668 try:
669 ev = self._events[ev_id]
670 assert isinstance(ev, RemoteEvent)
671 ev.set_progress(1.0)
672 self.log.info("complete: finished ev {0} ({1})".format(ev_id,
673 ev.message))
674 self._complete(ev)
675 except KeyError:
676 self.log.warning("complete: ev {0} does not exist".format(ev_id))
677 pass
678
679 def fail(self, ev_id, message):
680 """
681 For calling from other mgr modules to mark an event as failed (and
682 complete)
683 """
684 try:
685 ev = self._events[ev_id]
686 assert isinstance(ev, RemoteEvent)
687 ev.set_failed(message)
688 self.log.info("fail: finished ev {0} ({1}): {2}".format(ev_id,
689 ev.message,
690 message))
691 self._complete(ev)
692 except KeyError:
693 self.log.warning("fail: ev {0} does not exist".format(ev_id))
694
695 def _handle_ls(self):
696 if len(self._events) or len(self._completed_events):
697 out = ""
698 chrono_order = sorted(self._events.values(),
699 key=lambda x: x.started_at, reverse=True)
700 for ev in chrono_order:
701 out += ev.twoline_progress()
702 out += "\n"
703
704 if len(self._completed_events):
705 # TODO: limit number of completed events to show
706 out += "\n"
707 for ghost_ev in self._completed_events:
708 out += "[{0}]: {1}\n".format("Complete" if not ghost_ev.failed else "Failed",
709 ghost_ev.twoline_progress())
710
711 return 0, out, ""
712 else:
713 return 0, "", "Nothing in progress"
714
715 def _json(self):
716 return {
717 'events': [ev.to_json() for ev in self._events.values()],
718 'completed': [ev.to_json() for ev in self._completed_events]
719 }
720
721 def _handle_clear(self):
722 self._events = {}
723 self._completed_events = []
724 self._dirty = True
725 self._save()
726 self.clear_all_progress_events()
727
728 return 0, "", ""
729
730 def handle_command(self, _, cmd):
731 if cmd['prefix'] == "progress":
732 return self._handle_ls()
733 elif cmd['prefix'] == "progress clear":
734 # The clear command isn't usually needed - it's to enable
735 # the admin to "kick" this module if it seems to have done
736 # something wrong (e.g. we have a bug causing a progress event
737 # that never finishes)
738 return self._handle_clear()
739 elif cmd['prefix'] == "progress json":
740 return 0, json.dumps(self._json(), indent=4, sort_keys=True), ""
741 else:
742 raise NotImplementedError(cmd['prefix'])