]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/progress/module.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / progress / module.py
index 5db15c716c56509b40ad205b3475f76cd07d948a..470e4b0a61193351ff86d7fde4a3c06b73f81aa4 100644 (file)
@@ -1,16 +1,29 @@
-from mgr_module import MgrModule
+try:
+    from typing import List, Dict, Union, Any, Optional
+    from typing import TYPE_CHECKING
+except ImportError:
+    TYPE_CHECKING = False
+
+from mgr_module import MgrModule, OSDMap
+from mgr_util import to_pretty_timedelta
+from datetime import timedelta
+import os
 import threading
 import datetime
 import uuid
+import time
 
 import json
 
 
-ENCODING_VERSION = 1
+ENCODING_VERSION = 2
 
 # keep a global reference to the module so we can use it from Event methods
-_module = None
+_module = None  # type: Optional["Module"]
 
+# if unit test we want MgrModule to be blank
+if 'UNITTEST' in os.environ:
+    MgrModule = object  # type: ignore
 
 class Event(object):
     """
@@ -19,32 +32,42 @@ class Event(object):
     objects (osds, pools) this relates to.
     """
 
-    def __init__(self, message, refs):
+    def __init__(self, message, refs, started_at=None):
+        # type: (str, List[str], Optional[float]) -> None
         self._message = message
         self._refs = refs
-
-        self.started_at = datetime.datetime.utcnow()
-
-        self.id = None
+        self.started_at = started_at if started_at else time.time()
+        self.id = None  # type: Optional[str]
 
     def _refresh(self):
         global _module
+        assert _module
         _module.log.debug('refreshing mgr for %s (%s) at %f' % (self.id, self._message,
                                                                 self.progress))
-        _module.update_progress_event(self.id, self._message, self.progress)
+        _module.update_progress_event(
+            self.id, self.twoline_progress(6), self.progress)
 
     @property
     def message(self):
+        # type: () -> str
         return self._message
 
     @property
     def refs(self):
+        # type: () -> List[str]
         return self._refs
 
     @property
     def progress(self):
+        # type: () -> float
         raise NotImplementedError()
 
+    @property
+    def duration_str(self):
+        duration = time.time() - self.started_at
+        return "(%s)" % (
+            to_pretty_timedelta(timedelta(seconds=duration)))
+
     @property
     def failed(self):
         return False
@@ -54,7 +77,9 @@ class Event(object):
         return None
 
     def summary(self):
-        return "{0} {1}".format(self.progress, self.message)
+        # type: () -> str
+        return "{0} {1} {2}".format(self.progress, self.message,
+                                    self.duration_str)
 
     def _progress_str(self, width):
         inner_width = width - 2
@@ -66,24 +91,44 @@ class Event(object):
 
         return out
 
-    def twoline_progress(self):
+    def twoline_progress(self, indent=4):
         """
         e.g.
 
-        - Eating my delicious strudel
-            [===============..............]
+        - Eating my delicious strudel (since: 30s)
+            [===============..............] (remaining: 04m)
 
         """
-        return "{0}\n    {1}".format(
-            self._message, self._progress_str(30))
+        time_remaining = self.estimated_time_remaining()
+        if time_remaining:
+            remaining = "(remaining: %s)" % (
+                to_pretty_timedelta(timedelta(seconds=time_remaining)))
+        else:
+            remaining = ''
+        return "{0} {1}\n{2}{3} {4}".format(self._message,
+                                            self.duration_str,
+                                            " " * indent,
+                                            self._progress_str(30),
+                                            remaining)
 
     def to_json(self):
+        # type: () -> Dict[str, Any]
         return {
             "id": self.id,
             "message": self.message,
-            "refs": self._refs
+            "duration": self.duration_str,
+            "refs": self._refs,
+            "progress": self.progress,
+            "started_at": self.started_at,
+            "time_remaining": self.estimated_time_remaining()
         }
 
+    def estimated_time_remaining(self):
+        elapsed = time.time() - self.started_at
+        progress = self.progress
+        if progress == 0.0:
+            return None
+        return int(elapsed * (1 - progress) / progress)
 
 class GhostEvent(Event):
     """
@@ -91,9 +136,10 @@ class GhostEvent(Event):
     after the event is complete.
     """
 
-    def __init__(self, my_id, message, refs,
+    def __init__(self, my_id, message, refs, started_at, finished_at=None,
                  failed=False, failure_message=None):
-        super(GhostEvent, self).__init__(message, refs)
+        super(GhostEvent, self).__init__(message, refs, started_at)
+        self.finished_at = finished_at if finished_at else time.time()
         self.id = my_id
 
         if failed:
@@ -118,7 +164,9 @@ class GhostEvent(Event):
         d = {
             "id": self.id,
             "message": self.message,
-            "refs": self._refs
+            "refs": self._refs,
+            "started_at": self.started_at,
+            "finished_at": self.finished_at
         }
         if self._failed:
             d["failed"] = True
@@ -134,6 +182,7 @@ class RemoteEvent(Event):
     """
 
     def __init__(self, my_id, message, refs):
+        # type: (str, str, List[str]) -> None
         super(RemoteEvent, self).__init__(message, refs)
         self.id = my_id
         self._progress = 0.0
@@ -141,6 +190,7 @@ class RemoteEvent(Event):
         self._refresh()
 
     def set_progress(self, progress):
+        # type: (float) -> None
         self._progress = progress
         self._refresh()
 
@@ -150,6 +200,10 @@ class RemoteEvent(Event):
         self._failure_message = message
         self._refresh()
 
+    def set_message(self, message):
+        self._message = message
+        self._refresh()
+
     @property
     def progress(self):
         return self._progress
@@ -171,31 +225,37 @@ class PgRecoveryEvent(Event):
     Always call update() immediately after construction.
     """
 
-    def __init__(self, message, refs, which_pgs, evacuate_osds):
+    def __init__(self, message, refs, which_pgs, which_osds, start_epoch):
+        # type: (str, List[Any], List[PgId], List[str], int) -> None
         super(PgRecoveryEvent, self).__init__(message, refs)
 
         self._pgs = which_pgs
 
-        self._evacuate_osds = evacuate_osds
+        self._which_osds = which_osds
 
         self._original_pg_count = len(self._pgs)
 
-        self._original_bytes_recovered = None
+        self._original_bytes_recovered = None  # type: Optional[Dict[PgId, float]]
 
         self._progress = 0.0
 
-        self.id = str(uuid.uuid4())
+        # self._start_epoch = _module.get_osdmap().get_epoch()
+        self._start_epoch = start_epoch
+
+        self.id = str(uuid.uuid4())  # type: str
         self._refresh()
 
     @property
-    def evacuating_osds(self):
-        return self. _evacuate_osds
+    def which_osds(self):
+        return self. _which_osds
 
-    def pg_update(self, pg_dump, log):
+    def pg_update(self, raw_pg_stats, pg_ready, log):
+        # type: (Dict, bool, Any) -> None
         # FIXME: O(pg_num) in python
         # FIXME: far more fields getting pythonized than we really care about
-        pg_to_state = dict([(p['pgid'], p) for p in pg_dump['pg_stats']])
-
+        # Sanity check to see if there are any missing PGs and to assign
+        # empty array and dictionary if there hasn't been any recovery
+        pg_to_state = dict([(p['pgid'], p) for p in raw_pg_stats['pg_stats']]) # type: Dict[str, Any]
         if self._original_bytes_recovered is None:
             self._original_bytes_recovered = {}
             missing_pgs = []
@@ -206,7 +266,7 @@ class PgRecoveryEvent(Event):
                         pg_to_state[pg_str]['stat_sum']['num_bytes_recovered']
                 else:
                     missing_pgs.append(pg)
-            if pg_dump.get('pg_ready', False):
+            if pg_ready:
                 for pg in missing_pgs:
                     self._pgs.remove(pg)
 
@@ -229,15 +289,15 @@ class PgRecoveryEvent(Event):
                 # The PG is gone!  Probably a pool was deleted. Drop it.
                 complete.add(pg)
                 continue
+            # Only checks the state of each PGs when it's epoch >= the OSDMap's epoch
+            if int(info['reported_epoch']) < int(self._start_epoch):
+                continue
 
             state = info['state']
 
             states = state.split("+")
 
-            unmoved = bool(set(self._evacuate_osds) & (
-                set(info['up']) | set(info['acting'])))
-
-            if "active" in states and "clean" in states and not unmoved:
+            if "active" in states and "clean" in states:
                 complete.add(pg)
             else:
                 if info['stat_sum']['num_bytes'] == 0:
@@ -251,36 +311,35 @@ class PgRecoveryEvent(Event):
                         ratio = float(recovered -
                                       self._original_bytes_recovered[pg]) / \
                             total_bytes
-
                         # Since the recovered bytes (over time) could perhaps
                         # exceed the contents of the PG (moment in time), we
                         # must clamp this
                         ratio = min(ratio, 1.0)
+                        ratio = max(ratio, 0.0)
 
                     else:
                         # Dataless PGs (e.g. containing only OMAPs) count
                         # as half done.
                         ratio = 0.5
-
                     complete_accumulate += ratio
 
         self._pgs = list(set(self._pgs) ^ complete)
         completed_pgs = self._original_pg_count - len(self._pgs)
         self._progress = (completed_pgs + complete_accumulate)\
             / self._original_pg_count
-        self._refresh()
 
-        log.info("Updated progress to {0} ({1})".format(
-            self._progress, self._message
-        ))
+        self._refresh()
+        log.info("Updated progress to %s", self.summary())
 
     @property
     def progress(self):
+        # type: () -> float
         return self._progress
 
 
 class PgId(object):
     def __init__(self, pool_id, ps):
+        # type: (str, int) -> None
         self.pool_id = pool_id
         self.ps = ps
 
@@ -322,26 +381,31 @@ class Module(MgrModule):
             'desc': 'how frequently to persist completed events',
             'runtime': True,
         },
-    ]
+    ]  # type: List[Dict[str, Any]]
 
     def __init__(self, *args, **kwargs):
         super(Module, self).__init__(*args, **kwargs)
 
-        self._events = {}
-        self._completed_events = []
+        self._events = {}  # type: Dict[str, Union[RemoteEvent, PgRecoveryEvent]]
+        self._completed_events = [] # type: List[GhostEvent]
 
-        self._old_osd_map = None
+        self._old_osd_map = None  # type: Optional[OSDMap]
 
         self._ready = threading.Event()
         self._shutdown = threading.Event()
 
-        self._latest_osdmap = None
+        self._latest_osdmap = None  # type: Optional[OSDMap]
 
         self._dirty = False
 
         global _module
         _module = self
 
+        # only for mypy
+        if TYPE_CHECKING:
+            self.max_completed_events = 0
+            self.persist_interval = 0
+
     def config_notify(self):
         for opt in self.MODULE_OPTIONS:
             setattr(self,
@@ -349,18 +413,28 @@ class Module(MgrModule):
                     self.get_module_option(opt['name']))
             self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
 
-    def _osd_out(self, old_map, old_dump, new_map, osd_id):
+    def _osd_in_out(self, old_map, old_dump, new_map, osd_id, marked):
+        # type: (OSDMap, Dict, OSDMap, str, str) -> None
+        # A function that will create or complete an event when an
+        # OSD is marked in or out according to the affected PGs
         affected_pgs = []
         unmoved_pgs = []
         for pool in old_dump['pools']:
-            pool_id = pool['pool']
+            pool_id = pool['pool']  # type: str
             for ps in range(0, pool['pg_num']):
-                up_acting = old_map.pg_to_up_acting_osds(pool['pool'], ps)
 
-                # Was this OSD affected by the OSD going out?
-                old_osds = set(up_acting['up']) | set(up_acting['acting'])
-                was_on_out_osd = osd_id in old_osds
-                if not was_on_out_osd:
+                # Was this OSD affected by the OSD coming in/out?
+                # Compare old and new osds using
+                # data from the json dump
+                old_up_acting = old_map.pg_to_up_acting_osds(pool['pool'], ps)
+                old_osds = set(old_up_acting['acting'])
+                new_up_acting = new_map.pg_to_up_acting_osds(pool['pool'], ps)
+                new_osds = set(new_up_acting['acting'])
+
+                # Check the osd_id being in the acting set for both old
+                # and new maps to cover both out and in cases
+                was_on_out_or_in_osd = osd_id in old_osds or osd_id in new_osds
+                if not was_on_out_or_in_osd:
                     continue
 
                 self.log.debug("pool_id, ps = {0}, {1}".format(
@@ -368,21 +442,22 @@ class Module(MgrModule):
                 ))
 
                 self.log.debug(
-                    "up_acting: {0}".format(json.dumps(up_acting, indent=2)))
-
-                new_up_acting = new_map.pg_to_up_acting_osds(pool['pool'], ps)
-                new_osds = set(new_up_acting['up']) | set(new_up_acting['acting'])
+                    "old_up_acting: {0}".format(json.dumps(old_up_acting, indent=4, sort_keys=True)))
 
                 # Has this OSD been assigned a new location?
                 # (it might not be if there is no suitable place to move
-                #  after an OSD failure)
-                is_relocated = len(new_osds - old_osds) > 0
+                #  after an OSD is marked in/out)
+                if marked == "in":
+                    is_relocated = len(old_osds - new_osds) > 0
+                else:
+                    is_relocated = len(new_osds - old_osds) > 0
 
                 self.log.debug(
                     "new_up_acting: {0}".format(json.dumps(new_up_acting,
-                                                           indent=2)))
+                                                           indent=4,
+                                                           sort_keys=True)))
 
-                if was_on_out_osd and is_relocated:
+                if was_on_out_or_in_osd and is_relocated:
                     # This PG is now in motion, track its progress
                     affected_pgs.append(PgId(pool_id, ps))
                 elif not is_relocated:
@@ -392,35 +467,37 @@ class Module(MgrModule):
         # In the case that we ignored some PGs, log the reason why (we may
         # not end up creating a progress event)
         if len(unmoved_pgs):
-            self.log.warn("{0} PGs were on osd.{1}, but didn't get new locations".format(
+            self.log.warning("{0} PGs were on osd.{1}, but didn't get new locations".format(
                 len(unmoved_pgs), osd_id))
 
-        self.log.warn("{0} PGs affected by osd.{1} going out".format(
-            len(affected_pgs), osd_id))
-
-        if len(affected_pgs) == 0:
-            # Don't emit events if there were no PGs
-            return
-
-        # TODO: reconcile with existing events referring to this OSD going out
-        ev = PgRecoveryEvent(
-            "Rebalancing after osd.{0} marked out".format(osd_id),
-            refs=[("osd", osd_id)],
-            which_pgs=affected_pgs,
-            evacuate_osds=[osd_id]
-        )
-        ev.pg_update(self.get("pg_dump"), self.log)
-        self._events[ev.id] = ev
-
-    def _osd_in(self, osd_id):
-        for ev_id, ev in self._events.items():
-            if isinstance(ev, PgRecoveryEvent) and osd_id in ev.evacuating_osds:
-                self.log.info("osd.{0} came back in, cancelling event".format(
-                    osd_id
-                ))
-                self._complete(ev)
+        self.log.warning("{0} PGs affected by osd.{1} being marked {2}".format(
+            len(affected_pgs), osd_id, marked))
+
+
+        # In the case of the osd coming back in, we might need to cancel
+        # previous recovery event for that osd
+        if marked == "in":
+            for ev_id in list(self._events):
+                ev = self._events[ev_id]
+                if isinstance(ev, PgRecoveryEvent) and osd_id in ev.which_osds:
+                    self.log.info("osd.{0} came back in, cancelling event".format(
+                        osd_id
+                    ))
+                    self._complete(ev)
+
+        if len(affected_pgs) > 0:
+            r_ev = PgRecoveryEvent(
+                    "Rebalancing after osd.{0} marked {1}".format(osd_id, marked),
+                    refs=[("osd", osd_id)],
+                    which_pgs=affected_pgs,
+                    which_osds=[osd_id],
+                    start_epoch=self.get_osdmap().get_epoch()
+                    )
+            r_ev.pg_update(self.get("pg_stats"), self.get("pg_ready"), self.log)
+            self._events[r_ev.id] = r_ev
 
     def _osdmap_changed(self, old_osdmap, new_osdmap):
+        # type: (OSDMap, OSDMap) -> None
         old_dump = old_osdmap.dump()
         new_dump = new_osdmap.dump()
 
@@ -433,14 +510,14 @@ class Module(MgrModule):
                 old_weight = old_osds[osd_id]['in']
 
                 if new_weight == 0.0 and old_weight > new_weight:
-                    self.log.warn("osd.{0} marked out".format(osd_id))
-                    self._osd_out(old_osdmap, old_dump, new_osdmap, osd_id)
+                    self.log.warning("osd.{0} marked out".format(osd_id))
+                    self._osd_in_out(old_osdmap, old_dump, new_osdmap, osd_id, "out")
                 elif new_weight >= 1.0 and old_weight == 0.0:
                     # Only consider weight>=1.0 as "in" to avoid spawning
                     # individual recovery events on every adjustment
                     # in a gradual weight-in
-                    self.log.warn("osd.{0} marked in".format(osd_id))
-                    self._osd_in(osd_id)
+                    self.log.warning("osd.{0} marked in".format(osd_id))
+                    self._osd_in_out(old_osdmap, old_dump, new_osdmap, osd_id, "in")
 
     def notify(self, notify_type, notify_data):
         self._ready.wait()
@@ -448,19 +525,28 @@ class Module(MgrModule):
         if notify_type == "osd_map":
             old_osdmap = self._latest_osdmap
             self._latest_osdmap = self.get_osdmap()
+            assert old_osdmap
+            assert self._latest_osdmap
 
             self.log.info("Processing OSDMap change {0}..{1}".format(
                 old_osdmap.get_epoch(), self._latest_osdmap.get_epoch()
             ))
             self._osdmap_changed(old_osdmap, self._latest_osdmap)
         elif notify_type == "pg_summary":
-            data = self.get("pg_dump")
-            for ev_id, ev in self._events.items():
+            # if there are no events we will skip this here to avoid 
+            # expensive get calls
+            if len(self._events) == 0:
+                return
+            data = self.get("pg_stats")
+            ready = self.get("pg_ready")
+            for ev_id in list(self._events):
+                ev = self._events[ev_id]
                 if isinstance(ev, PgRecoveryEvent):
-                    ev.pg_update(data, self.log)
+                    ev.pg_update(data, ready, self.log)
                     self.maybe_complete(ev)
 
     def maybe_complete(self, event):
+        # type: (Event) -> None
         if event.progress >= 1.0:
             self._complete(event)
 
@@ -488,9 +574,16 @@ class Module(MgrModule):
             raise RuntimeError("Cannot decode version {0}".format(
                                decoded['compat_version']))
 
+        if decoded['compat_version'] < ENCODING_VERSION:
+            # we need to add the "started_at" and "finished_at" attributes to the events
+            for ev in decoded['events']:
+                ev['started_at'] = None
+                ev['finished_at'] = None
+
         for ev in decoded['events']:
             self._completed_events.append(GhostEvent(ev['id'], ev['message'],
-                                                     ev['refs'],
+                                                     ev['refs'], ev['started_at'],
+                                                     ev['finished_at'],
                                                      ev.get('failed', False),
                                                      ev.get('failure_message')))
 
@@ -530,13 +623,16 @@ class Module(MgrModule):
         self.clear_all_progress_events()
 
     def update(self, ev_id, ev_msg, ev_progress, refs=None):
+        # type: (str, str, float, Optional[list]) -> None
         """
         For calling from other mgr modules
         """
         if refs is None:
             refs = []
         try:
+
             ev = self._events[ev_id]
+            assert isinstance(ev, RemoteEvent)
         except KeyError:
             ev = RemoteEvent(ev_id, ev_msg, refs)
             self._events[ev_id] = ev
@@ -547,18 +643,20 @@ class Module(MgrModule):
                 ev_progress, ev_msg))
 
         ev.set_progress(ev_progress)
-        ev._refresh()
+        ev.set_message(ev_msg)
 
     def _complete(self, ev):
-        duration = (datetime.datetime.utcnow() - ev.started_at)
+        # type: (Event) -> None
+        duration = (time.time() - ev.started_at)
         self.log.info("Completed event {0} ({1}) in {2} seconds".format(
-            ev.id, ev.message, duration.seconds
+            ev.id, ev.message, int(round(duration))
         ))
         self.complete_progress_event(ev.id)
 
         self._completed_events.append(
-            GhostEvent(ev.id, ev.message, ev.refs,
+            GhostEvent(ev.id, ev.message, ev.refs, ev.started_at,
                        failed=ev.failed, failure_message=ev.failure_message))
+        assert ev.id
         del self._events[ev.id]
         self._prune_completed_events()
         self._dirty = True
@@ -569,12 +667,13 @@ class Module(MgrModule):
         """
         try:
             ev = self._events[ev_id]
+            assert isinstance(ev, RemoteEvent)
             ev.set_progress(1.0)
             self.log.info("complete: finished ev {0} ({1})".format(ev_id,
                                                                    ev.message))
             self._complete(ev)
         except KeyError:
-            self.log.warn("complete: ev {0} does not exist".format(ev_id))
+            self.log.warning("complete: ev {0} does not exist".format(ev_id))
             pass
 
     def fail(self, ev_id, message):
@@ -584,13 +683,14 @@ class Module(MgrModule):
         """
         try:
             ev = self._events[ev_id]
+            assert isinstance(ev, RemoteEvent)
             ev.set_failed(message)
             self.log.info("fail: finished ev {0} ({1}): {2}".format(ev_id,
                                                                     ev.message,
                                                                     message))
             self._complete(ev)
         except KeyError:
-            self.log.warn("fail: ev {0} does not exist".format(ev_id))
+            self.log.warning("fail: ev {0} does not exist".format(ev_id))
 
     def _handle_ls(self):
         if len(self._events) or len(self._completed_events):
@@ -604,9 +704,9 @@ class Module(MgrModule):
             if len(self._completed_events):
                 # TODO: limit number of completed events to show
                 out += "\n"
-                for ev in self._completed_events:
-                    out += "[{0}]: {1}\n".format("Complete" if not ev.failed else "Failed",
-                                                 ev.twoline_progress())
+                for ghost_ev in self._completed_events:
+                    out += "[{0}]: {1}\n".format("Complete" if not ghost_ev.failed else "Failed",
+                                                 ghost_ev.twoline_progress())
 
             return 0, out, ""
         else:
@@ -623,6 +723,7 @@ class Module(MgrModule):
         self._completed_events = []
         self._dirty = True
         self._save()
+        self.clear_all_progress_events()
 
         return 0, "", ""
 
@@ -636,6 +737,6 @@ class Module(MgrModule):
             # that never finishes)
             return self._handle_clear()
         elif cmd['prefix'] == "progress json":
-            return 0, json.dumps(self._json(), indent=2), ""
+            return 0, json.dumps(self._json(), indent=4, sort_keys=True), ""
         else:
             raise NotImplementedError(cmd['prefix'])