]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/progress/module.py
c37d7f2ce8fd2f96d1210593a3dd8d306f185aa0
2 from typing
import List
, Dict
, Union
, Any
, Optional
3 from typing
import TYPE_CHECKING
7 from mgr_module
import MgrModule
, OSDMap
8 from mgr_util
import to_pretty_timedelta
9 from datetime
import timedelta
21 # keep a global reference to the module so we can use it from Event methods
22 _module
= None # type: Optional["Module"]
24 # if unit test we want MgrModule to be blank
25 if 'UNITTEST' in os
.environ
:
26 MgrModule
= object # type: ignore
30 A generic "event" that has a start time, completion percentage,
31 and a list of "refs" that are (type, id) tuples describing which
32 objects (osds, pools) this relates to.
35 def __init__(self
, message
, refs
, started_at
=None):
36 # type: (str, List[str], Optional[float]) -> None
37 self
._message
= message
39 self
.started_at
= started_at
if started_at
else time
.time()
40 self
.id = None # type: Optional[str]
45 _module
.log
.debug('refreshing mgr for %s (%s) at %f' % (self
.id, self
._message
,
47 _module
.update_progress_event(
48 self
.id, self
.twoline_progress(6), self
.progress
)
57 # type: () -> List[str]
63 raise NotImplementedError()
66 def duration_str(self
):
67 duration
= time
.time() - self
.started_at
69 to_pretty_timedelta(timedelta(seconds
=duration
)))
76 def failure_message(self
):
81 return "{0} {1} {2}".format(self
.progress
, self
.message
,
84 def _progress_str(self
, width
):
85 inner_width
= width
- 2
87 done_chars
= int(self
.progress
* inner_width
)
88 out
+= done_chars
* '='
89 out
+= (inner_width
- done_chars
) * '.'
94 def twoline_progress(self
, indent
=4):
98 - Eating my delicious strudel (since: 30s)
99 [===============..............] (remaining: 04m)
102 time_remaining
= self
.estimated_time_remaining()
104 remaining
= "(remaining: %s)" % (
105 to_pretty_timedelta(timedelta(seconds
=time_remaining
)))
108 return "{0} {1}\n{2}{3} {4}".format(self
._message
,
111 self
._progress
_str
(30),
115 # type: () -> Dict[str, Any]
118 "message": self
.message
,
119 "duration": self
.duration_str
,
121 "progress": self
.progress
,
122 "started_at": self
.started_at
,
123 "time_remaining": self
.estimated_time_remaining()
126 def estimated_time_remaining(self
):
127 elapsed
= time
.time() - self
.started_at
128 progress
= self
.progress
131 return int(elapsed
* (1 - progress
) / progress
)
133 class GhostEvent(Event
):
135 The ghost of a completed event: these are the fields that we persist
136 after the event is complete.
139 def __init__(self
, my_id
, message
, refs
, started_at
, finished_at
=None,
140 failed
=False, failure_message
=None):
141 super(GhostEvent
, self
).__init
__(message
, refs
, started_at
)
142 self
.finished_at
= finished_at
if finished_at
else time
.time()
147 self
._failure
_message
= failure_message
160 def failure_message(self
):
161 return self
._failure
_message
if self
._failed
else None
166 "message": self
.message
,
168 "started_at": self
.started_at
,
169 "finished_at": self
.finished_at
173 d
["failure_message"] = self
._failure
_message
177 class RemoteEvent(Event
):
179 An event that was published by another module: we know nothing about
180 this, rely on the other module to continuously update us with
181 progress information as it emerges.
184 def __init__(self
, my_id
, message
, refs
):
185 # type: (str, str, List[str]) -> None
186 super(RemoteEvent
, self
).__init
__(message
, refs
)
192 def set_progress(self
, progress
):
193 # type: (float) -> None
194 self
._progress
= progress
197 def set_failed(self
, message
):
200 self
._failure
_message
= message
203 def set_message(self
, message
):
204 self
._message
= message
209 return self
._progress
216 def failure_message(self
):
217 return self
._failure
_message
if self
._failed
else None
220 class PgRecoveryEvent(Event
):
222 An event whose completion is determined by the recovery of a set of
223 PGs to a healthy state.
225 Always call update() immediately after construction.
228 def __init__(self
, message
, refs
, which_pgs
, which_osds
, start_epoch
):
229 # type: (str, List[Any], List[PgId], List[str], int) -> None
230 super(PgRecoveryEvent
, self
).__init
__(message
, refs
)
232 self
._pgs
= which_pgs
234 self
._which
_osds
= which_osds
236 self
._original
_pg
_count
= len(self
._pgs
)
238 self
._original
_bytes
_recovered
= None # type: Optional[Dict[PgId, float]]
242 # self._start_epoch = _module.get_osdmap().get_epoch()
243 self
._start
_epoch
= start_epoch
245 self
.id = str(uuid
.uuid4()) # type: str
249 def which_osds(self
):
250 return self
. _which_osds
252 def pg_update(self
, raw_pg_stats
, pg_ready
, log
):
253 # type: (Dict, bool, Any) -> None
254 # FIXME: O(pg_num) in python
255 # FIXME: far more fields getting pythonized than we really care about
256 # Sanity check to see if there are any missing PGs and to assign
257 # empty array and dictionary if there hasn't been any recovery
258 pg_to_state
= dict([(p
['pgid'], p
) for p
in raw_pg_stats
['pg_stats']]) # type: Dict[str, Any]
259 if self
._original
_bytes
_recovered
is None:
260 self
._original
_bytes
_recovered
= {}
264 if pg_str
in pg_to_state
:
265 self
._original
_bytes
_recovered
[pg
] = \
266 pg_to_state
[pg_str
]['stat_sum']['num_bytes_recovered']
268 missing_pgs
.append(pg
)
270 for pg
in missing_pgs
:
273 complete_accumulate
= 0.0
275 # Calculating progress as the number of PGs recovered divided by the
276 # original where partially completed PGs count for something
277 # between 0.0-1.0. This is perhaps less faithful than looking at the
278 # total number of bytes recovered, but it does a better job of
279 # representing the work still to do if there are a number of very
280 # few-bytes PGs that still need the housekeeping of their recovery
281 # to be done. This is subjective...
287 info
= pg_to_state
[pg_str
]
289 # The PG is gone! Probably a pool was deleted. Drop it.
292 # Only checks the state of each PGs when it's epoch >= the OSDMap's epoch
293 if int(info
['reported_epoch']) < int(self
._start
_epoch
):
296 state
= info
['state']
298 states
= state
.split("+")
300 if "active" in states
and "clean" in states
:
303 if info
['stat_sum']['num_bytes'] == 0:
304 # Empty PGs are considered 0% done until they are
305 # in the correct state.
308 recovered
= info
['stat_sum']['num_bytes_recovered']
309 total_bytes
= info
['stat_sum']['num_bytes']
311 ratio
= float(recovered
-
312 self
._original
_bytes
_recovered
[pg
]) / \
314 # Since the recovered bytes (over time) could perhaps
315 # exceed the contents of the PG (moment in time), we
317 ratio
= min(ratio
, 1.0)
318 ratio
= max(ratio
, 0.0)
321 # Dataless PGs (e.g. containing only OMAPs) count
324 complete_accumulate
+= ratio
326 self
._pgs
= list(set(self
._pgs
) ^ complete
)
327 completed_pgs
= self
._original
_pg
_count
- len(self
._pgs
)
328 self
._progress
= (completed_pgs
+ complete_accumulate
)\
329 / self
._original
_pg
_count
332 log
.info("Updated progress to %s", self
.summary())
337 return self
._progress
341 def __init__(self
, pool_id
, ps
):
342 # type: (str, int) -> None
343 self
.pool_id
= pool_id
346 def __cmp__(self
, other
):
347 return (self
.pool_id
, self
.ps
) == (other
.pool_id
, other
.ps
)
349 def __lt__(self
, other
):
350 return (self
.pool_id
, self
.ps
) < (other
.pool_id
, other
.ps
)
353 return "{0}.{1:x}".format(self
.pool_id
, self
.ps
)
356 class Module(MgrModule
):
359 "desc": "Show progress of recovery operations",
361 {"cmd": "progress json",
362 "desc": "Show machine readable progress information",
364 {"cmd": "progress clear",
365 "desc": "Reset progress tracking",
371 'name': 'max_completed_events',
374 'desc': 'number of past completed events to remember',
378 'name': 'persist_interval',
381 'desc': 'how frequently to persist completed events',
384 ] # type: List[Dict[str, Any]]
386 def __init__(self
, *args
, **kwargs
):
387 super(Module
, self
).__init
__(*args
, **kwargs
)
389 self
._events
= {} # type: Dict[str, Union[RemoteEvent, PgRecoveryEvent]]
390 self
._completed
_events
= [] # type: List[GhostEvent]
392 self
._old
_osd
_map
= None # type: Optional[OSDMap]
394 self
._ready
= threading
.Event()
395 self
._shutdown
= threading
.Event()
397 self
._latest
_osdmap
= None # type: Optional[OSDMap]
406 self
.max_completed_events
= 0
407 self
.persist_interval
= 0
409 def config_notify(self
):
410 for opt
in self
.MODULE_OPTIONS
:
413 self
.get_module_option(opt
['name']))
414 self
.log
.debug(' %s = %s', opt
['name'], getattr(self
, opt
['name']))
416 def _osd_in_out(self
, old_map
, old_dump
, new_map
, osd_id
, marked
):
417 # type: (OSDMap, Dict, OSDMap, str, str) -> None
418 # A function that will create or complete an event when an
419 # OSD is marked in or out according to the affected PGs
422 for pool
in old_dump
['pools']:
423 pool_id
= pool
['pool'] # type: str
424 for ps
in range(0, pool
['pg_num']):
426 # Was this OSD affected by the OSD coming in/out?
427 # Compare old and new osds using
428 # data from the json dump
429 old_up_acting
= old_map
.pg_to_up_acting_osds(pool
['pool'], ps
)
430 old_osds
= set(old_up_acting
['acting'])
431 new_up_acting
= new_map
.pg_to_up_acting_osds(pool
['pool'], ps
)
432 new_osds
= set(new_up_acting
['acting'])
434 # Check the osd_id being in the acting set for both old
435 # and new maps to cover both out and in cases
436 was_on_out_or_in_osd
= osd_id
in old_osds
or osd_id
in new_osds
437 if not was_on_out_or_in_osd
:
440 self
.log
.debug("pool_id, ps = {0}, {1}".format(
445 "old_up_acting: {0}".format(json
.dumps(old_up_acting
, indent
=4, sort_keys
=True)))
447 # Has this OSD been assigned a new location?
448 # (it might not be if there is no suitable place to move
449 # after an OSD is marked in/out)
451 is_relocated
= len(old_osds
- new_osds
) > 0
453 is_relocated
= len(new_osds
- old_osds
) > 0
456 "new_up_acting: {0}".format(json
.dumps(new_up_acting
,
460 if was_on_out_or_in_osd
and is_relocated
:
461 # This PG is now in motion, track its progress
462 affected_pgs
.append(PgId(pool_id
, ps
))
463 elif not is_relocated
:
464 # This PG didn't get a new location, we'll log it
465 unmoved_pgs
.append(PgId(pool_id
, ps
))
467 # In the case that we ignored some PGs, log the reason why (we may
468 # not end up creating a progress event)
470 self
.log
.warn("{0} PGs were on osd.{1}, but didn't get new locations".format(
471 len(unmoved_pgs
), osd_id
))
473 self
.log
.warn("{0} PGs affected by osd.{1} being marked {2}".format(
474 len(affected_pgs
), osd_id
, marked
))
477 # In the case of the osd coming back in, we might need to cancel
478 # previous recovery event for that osd
480 for ev_id
in list(self
._events
):
481 ev
= self
._events
[ev_id
]
482 if isinstance(ev
, PgRecoveryEvent
) and osd_id
in ev
.which_osds
:
483 self
.log
.info("osd.{0} came back in, cancelling event".format(
488 if len(affected_pgs
) > 0:
489 r_ev
= PgRecoveryEvent(
490 "Rebalancing after osd.{0} marked {1}".format(osd_id
, marked
),
491 refs
=[("osd", osd_id
)],
492 which_pgs
=affected_pgs
,
494 start_epoch
=self
.get_osdmap().get_epoch()
496 r_ev
.pg_update(self
.get("pg_stats"), self
.get("pg_ready"), self
.log
)
497 self
._events
[r_ev
.id] = r_ev
499 def _osdmap_changed(self
, old_osdmap
, new_osdmap
):
500 # type: (OSDMap, OSDMap) -> None
501 old_dump
= old_osdmap
.dump()
502 new_dump
= new_osdmap
.dump()
504 old_osds
= dict([(o
['osd'], o
) for o
in old_dump
['osds']])
506 for osd
in new_dump
['osds']:
508 new_weight
= osd
['in']
509 if osd_id
in old_osds
:
510 old_weight
= old_osds
[osd_id
]['in']
512 if new_weight
== 0.0 and old_weight
> new_weight
:
513 self
.log
.warn("osd.{0} marked out".format(osd_id
))
514 self
._osd
_in
_out
(old_osdmap
, old_dump
, new_osdmap
, osd_id
, "out")
515 elif new_weight
>= 1.0 and old_weight
== 0.0:
516 # Only consider weight>=1.0 as "in" to avoid spawning
517 # individual recovery events on every adjustment
518 # in a gradual weight-in
519 self
.log
.warn("osd.{0} marked in".format(osd_id
))
520 self
._osd
_in
_out
(old_osdmap
, old_dump
, new_osdmap
, osd_id
, "in")
522 def notify(self
, notify_type
, notify_data
):
525 if notify_type
== "osd_map":
526 old_osdmap
= self
._latest
_osdmap
527 self
._latest
_osdmap
= self
.get_osdmap()
529 assert self
._latest
_osdmap
531 self
.log
.info("Processing OSDMap change {0}..{1}".format(
532 old_osdmap
.get_epoch(), self
._latest
_osdmap
.get_epoch()
534 self
._osdmap
_changed
(old_osdmap
, self
._latest
_osdmap
)
535 elif notify_type
== "pg_summary":
536 data
= self
.get("pg_stats")
537 ready
= self
.get("pg_ready")
538 for ev_id
in list(self
._events
):
539 ev
= self
._events
[ev_id
]
540 if isinstance(ev
, PgRecoveryEvent
):
541 ev
.pg_update(data
, ready
, self
.log
)
542 self
.maybe_complete(ev
)
544 def maybe_complete(self
, event
):
545 # type: (Event) -> None
546 if event
.progress
>= 1.0:
547 self
._complete
(event
)
550 self
.log
.info("Writing back {0} completed events".format(
551 len(self
._completed
_events
)
553 # TODO: bound the number we store.
554 encoded
= json
.dumps({
555 "events": [ev
.to_json() for ev
in self
._completed
_events
],
556 "version": ENCODING_VERSION
,
557 "compat_version": ENCODING_VERSION
559 self
.set_store("completed", encoded
)
562 stored
= self
.get_store("completed")
565 self
.log
.info("No stored events to load")
568 decoded
= json
.loads(stored
)
569 if decoded
['compat_version'] > ENCODING_VERSION
:
570 raise RuntimeError("Cannot decode version {0}".format(
571 decoded
['compat_version']))
573 if decoded
['compat_version'] < ENCODING_VERSION
:
574 # we need to add the "started_at" and "finished_at" attributes to the events
575 for ev
in decoded
['events']:
576 ev
['started_at'] = None
577 ev
['finished_at'] = None
579 for ev
in decoded
['events']:
580 self
._completed
_events
.append(GhostEvent(ev
['id'], ev
['message'],
581 ev
['refs'], ev
['started_at'],
583 ev
.get('failed', False),
584 ev
.get('failure_message')))
586 self
._prune
_completed
_events
()
588 def _prune_completed_events(self
):
589 length
= len(self
._completed
_events
)
590 if length
> self
.max_completed_events
:
591 self
._completed
_events
= self
._completed
_events
[length
- self
.max_completed_events
: length
]
596 self
.clear_all_progress_events()
597 self
.log
.info("Loading...")
600 self
.log
.info("Loaded {0} historic events".format(self
._completed
_events
))
602 self
._latest
_osdmap
= self
.get_osdmap()
603 self
.log
.info("Loaded OSDMap, ready.")
607 while not self
._shutdown
.is_set():
608 # Lazy periodic write back of completed events
613 self
._shutdown
.wait(timeout
=self
.persist_interval
)
615 self
._shutdown
.wait()
619 self
.clear_all_progress_events()
621 def update(self
, ev_id
, ev_msg
, ev_progress
, refs
=None):
622 # type: (str, str, float, Optional[list]) -> None
624 For calling from other mgr modules
630 ev
= self
._events
[ev_id
]
631 assert isinstance(ev
, RemoteEvent
)
633 ev
= RemoteEvent(ev_id
, ev_msg
, refs
)
634 self
._events
[ev_id
] = ev
635 self
.log
.info("update: starting ev {0} ({1})".format(
638 self
.log
.debug("update: {0} on {1}".format(
639 ev_progress
, ev_msg
))
641 ev
.set_progress(ev_progress
)
642 ev
.set_message(ev_msg
)
644 def _complete(self
, ev
):
645 # type: (Event) -> None
646 duration
= (time
.time() - ev
.started_at
)
647 self
.log
.info("Completed event {0} ({1}) in {2} seconds".format(
648 ev
.id, ev
.message
, int(round(duration
))
650 self
.complete_progress_event(ev
.id)
652 self
._completed
_events
.append(
653 GhostEvent(ev
.id, ev
.message
, ev
.refs
, ev
.started_at
,
654 failed
=ev
.failed
, failure_message
=ev
.failure_message
))
656 del self
._events
[ev
.id]
657 self
._prune
_completed
_events
()
660 def complete(self
, ev_id
):
662 For calling from other mgr modules
665 ev
= self
._events
[ev_id
]
666 assert isinstance(ev
, RemoteEvent
)
668 self
.log
.info("complete: finished ev {0} ({1})".format(ev_id
,
672 self
.log
.warn("complete: ev {0} does not exist".format(ev_id
))
675 def fail(self
, ev_id
, message
):
677 For calling from other mgr modules to mark an event as failed (and
681 ev
= self
._events
[ev_id
]
682 assert isinstance(ev
, RemoteEvent
)
683 ev
.set_failed(message
)
684 self
.log
.info("fail: finished ev {0} ({1}): {2}".format(ev_id
,
689 self
.log
.warn("fail: ev {0} does not exist".format(ev_id
))
691 def _handle_ls(self
):
692 if len(self
._events
) or len(self
._completed
_events
):
694 chrono_order
= sorted(self
._events
.values(),
695 key
=lambda x
: x
.started_at
, reverse
=True)
696 for ev
in chrono_order
:
697 out
+= ev
.twoline_progress()
700 if len(self
._completed
_events
):
701 # TODO: limit number of completed events to show
703 for ghost_ev
in self
._completed
_events
:
704 out
+= "[{0}]: {1}\n".format("Complete" if not ghost_ev
.failed
else "Failed",
705 ghost_ev
.twoline_progress())
709 return 0, "", "Nothing in progress"
713 'events': [ev
.to_json() for ev
in self
._events
.values()],
714 'completed': [ev
.to_json() for ev
in self
._completed
_events
]
717 def _handle_clear(self
):
719 self
._completed
_events
= []
722 self
.clear_all_progress_events()
726 def handle_command(self
, _
, cmd
):
727 if cmd
['prefix'] == "progress":
728 return self
._handle
_ls
()
729 elif cmd
['prefix'] == "progress clear":
730 # The clear command isn't usually needed - it's to enable
731 # the admin to "kick" this module if it seems to have done
732 # something wrong (e.g. we have a bug causing a progress event
733 # that never finishes)
734 return self
._handle
_clear
()
735 elif cmd
['prefix'] == "progress json":
736 return 0, json
.dumps(self
._json
(), indent
=4, sort_keys
=True), ""
738 raise NotImplementedError(cmd
['prefix'])