-from mgr_module import MgrModule
+try:
+ from typing import List, Dict, Union, Any, Optional
+ from typing import TYPE_CHECKING
+except ImportError:
+ TYPE_CHECKING = False
+
+from mgr_module import MgrModule, OSDMap
+from mgr_util import to_pretty_timedelta
+from datetime import timedelta
+import os
import threading
import datetime
import uuid
+import time
import json
-ENCODING_VERSION = 1
+ENCODING_VERSION = 2
# keep a global reference to the module so we can use it from Event methods
-_module = None
+_module = None # type: Optional["Module"]
+# if unit test we want MgrModule to be blank
+if 'UNITTEST' in os.environ:
+ MgrModule = object # type: ignore
class Event(object):
"""
objects (osds, pools) this relates to.
"""
- def __init__(self, message, refs):
+ def __init__(self, message, refs, started_at=None):
+ # type: (str, List[str], Optional[float]) -> None
self._message = message
self._refs = refs
-
- self.started_at = datetime.datetime.utcnow()
-
- self.id = None
+ self.started_at = started_at if started_at else time.time()
+ self.id = None # type: Optional[str]
def _refresh(self):
global _module
+ assert _module
_module.log.debug('refreshing mgr for %s (%s) at %f' % (self.id, self._message,
- self._progress))
- _module.update_progress_event(self.id, self._message, self._progress)
+ self.progress))
+ _module.update_progress_event(
+ self.id, self.twoline_progress(6), self.progress)
@property
def message(self):
+ # type: () -> str
return self._message
@property
def refs(self):
+ # type: () -> List[str]
return self._refs
@property
def progress(self):
+ # type: () -> float
raise NotImplementedError()
+ @property
+ def duration_str(self):
+ duration = time.time() - self.started_at
+ return "(%s)" % (
+ to_pretty_timedelta(timedelta(seconds=duration)))
+
+ @property
+ def failed(self):
+ return False
+
+ @property
+ def failure_message(self):
+ return None
+
def summary(self):
- return "{0} {1}".format(self.progress, self._message)
+ # type: () -> str
+ return "{0} {1} {2}".format(self.progress, self.message,
+ self.duration_str)
def _progress_str(self, width):
inner_width = width - 2
return out
- def twoline_progress(self):
+ def twoline_progress(self, indent=4):
"""
e.g.
- - Eating my delicious strudel
- [===============..............]
+ - Eating my delicious strudel (since: 30s)
+ [===============..............] (remaining: 04m)
"""
- return "{0}\n {1}".format(
- self._message, self._progress_str(30))
+ time_remaining = self.estimated_time_remaining()
+ if time_remaining:
+ remaining = "(remaining: %s)" % (
+ to_pretty_timedelta(timedelta(seconds=time_remaining)))
+ else:
+ remaining = ''
+ return "{0} {1}\n{2}{3} {4}".format(self._message,
+ self.duration_str,
+ " " * indent,
+ self._progress_str(30),
+ remaining)
def to_json(self):
+ # type: () -> Dict[str, Any]
return {
"id": self.id,
"message": self.message,
- "refs": self._refs
+ "duration": self.duration_str,
+ "refs": self._refs,
+ "progress": self.progress,
+ "started_at": self.started_at,
+ "time_remaining": self.estimated_time_remaining()
}
+ def estimated_time_remaining(self):
+ elapsed = time.time() - self.started_at
+ progress = self.progress
+ if progress == 0.0:
+ return None
+ return int(elapsed * (1 - progress) / progress)
class GhostEvent(Event):
"""
after the event is complete.
"""
- def __init__(self, my_id, message, refs):
- super(GhostEvent, self).__init__(message, refs)
+ def __init__(self, my_id, message, refs, started_at, finished_at=None,
+ failed=False, failure_message=None):
+ super(GhostEvent, self).__init__(message, refs, started_at)
+ self.finished_at = finished_at if finished_at else time.time()
self.id = my_id
+ if failed:
+ self._failed = True
+ self._failure_message = failure_message
+ else:
+ self._failed = False
+
@property
def progress(self):
return 1.0
+ @property
+ def failed(self):
+ return self._failed
+
+ @property
+ def failure_message(self):
+ return self._failure_message if self._failed else None
+
+ def to_json(self):
+ d = {
+ "id": self.id,
+ "message": self.message,
+ "refs": self._refs,
+ "started_at": self.started_at,
+ "finished_at": self.finished_at
+ }
+ if self._failed:
+ d["failed"] = True
+ d["failure_message"] = self._failure_message
+ return d
+
class RemoteEvent(Event):
"""
"""
def __init__(self, my_id, message, refs):
+ # type: (str, str, List[str]) -> None
super(RemoteEvent, self).__init__(message, refs)
self.id = my_id
self._progress = 0.0
+ self._failed = False
self._refresh()
def set_progress(self, progress):
+ # type: (float) -> None
self._progress = progress
self._refresh()
+ def set_failed(self, message):
+ self._progress = 1.0
+ self._failed = True
+ self._failure_message = message
+ self._refresh()
+
+ def set_message(self, message):
+ self._message = message
+ self._refresh()
+
@property
def progress(self):
return self._progress
+ @property
+ def failed(self):
+ return self._failed
+
+ @property
+ def failure_message(self):
+ return self._failure_message if self._failed else None
+
class PgRecoveryEvent(Event):
"""
Always call update() immediately after construction.
"""
- def __init__(self, message, refs, which_pgs, evactuate_osds):
+ def __init__(self, message, refs, which_pgs, which_osds, start_epoch):
+ # type: (str, List[Any], List[PgId], List[str], int) -> None
super(PgRecoveryEvent, self).__init__(message, refs)
self._pgs = which_pgs
- self._evacuate_osds = evactuate_osds
+ self._which_osds = which_osds
self._original_pg_count = len(self._pgs)
- self._original_bytes_recovered = None
+ self._original_bytes_recovered = None # type: Optional[Dict[PgId, float]]
self._progress = 0.0
- self.id = str(uuid.uuid4())
+ # self._start_epoch = _module.get_osdmap().get_epoch()
+ self._start_epoch = start_epoch
+
+ self.id = str(uuid.uuid4()) # type: str
self._refresh()
@property
- def evacuating_osds(self):
- return self. _evacuate_osds
+ def which_osds(self):
+ return self. _which_osds
- def pg_update(self, pg_dump, log):
+ def pg_update(self, raw_pg_stats, pg_ready, log):
+ # type: (Dict, bool, Any) -> None
# FIXME: O(pg_num) in python
# FIXME: far more fields getting pythonized than we really care about
- pg_to_state = dict([(p['pgid'], p) for p in pg_dump['pg_stats']])
-
+ # Sanity check to see if there are any missing PGs and to assign
+ # empty array and dictionary if there hasn't been any recovery
+ pg_to_state = dict([(p['pgid'], p) for p in raw_pg_stats['pg_stats']]) # type: Dict[str, Any]
if self._original_bytes_recovered is None:
self._original_bytes_recovered = {}
+ missing_pgs = []
for pg in self._pgs:
pg_str = str(pg)
- self._original_bytes_recovered[pg] = \
- pg_to_state[pg_str]['stat_sum']['num_bytes_recovered']
+ if pg_str in pg_to_state:
+ self._original_bytes_recovered[pg] = \
+ pg_to_state[pg_str]['stat_sum']['num_bytes_recovered']
+ else:
+ missing_pgs.append(pg)
+ if pg_ready:
+ for pg in missing_pgs:
+ self._pgs.remove(pg)
complete_accumulate = 0.0
# The PG is gone! Probably a pool was deleted. Drop it.
complete.add(pg)
continue
+ # Only checks the state of each PGs when it's epoch >= the OSDMap's epoch
+ if int(info['reported_epoch']) < int(self._start_epoch):
+ continue
state = info['state']
states = state.split("+")
- unmoved = bool(set(self._evacuate_osds) & (
- set(info['up']) | set(info['acting'])))
-
- if "active" in states and "clean" in states and not unmoved:
+ if "active" in states and "clean" in states:
complete.add(pg)
else:
if info['stat_sum']['num_bytes'] == 0:
ratio = float(recovered -
self._original_bytes_recovered[pg]) / \
total_bytes
-
# Since the recovered bytes (over time) could perhaps
# exceed the contents of the PG (moment in time), we
# must clamp this
ratio = min(ratio, 1.0)
+ ratio = max(ratio, 0.0)
else:
# Dataless PGs (e.g. containing only OMAPs) count
# as half done.
ratio = 0.5
-
complete_accumulate += ratio
self._pgs = list(set(self._pgs) ^ complete)
completed_pgs = self._original_pg_count - len(self._pgs)
self._progress = (completed_pgs + complete_accumulate)\
/ self._original_pg_count
- self._refresh()
- log.info("Updated progress to {0} ({1})".format(
- self._progress, self._message
- ))
+ self._refresh()
+ log.info("Updated progress to %s", self.summary())
@property
def progress(self):
+ # type: () -> float
return self._progress
class PgId(object):
def __init__(self, pool_id, ps):
+ # type: (str, int) -> None
self.pool_id = pool_id
self.ps = ps
'desc': 'how frequently to persist completed events',
'runtime': True,
},
- ]
+ ] # type: List[Dict[str, Any]]
def __init__(self, *args, **kwargs):
super(Module, self).__init__(*args, **kwargs)
- self._events = {}
- self._completed_events = []
+ self._events = {} # type: Dict[str, Union[RemoteEvent, PgRecoveryEvent]]
+ self._completed_events = [] # type: List[GhostEvent]
- self._old_osd_map = None
+ self._old_osd_map = None # type: Optional[OSDMap]
self._ready = threading.Event()
self._shutdown = threading.Event()
- self._latest_osdmap = None
+ self._latest_osdmap = None # type: Optional[OSDMap]
self._dirty = False
global _module
_module = self
+ # only for mypy
+ if TYPE_CHECKING:
+ self.max_completed_events = 0
+ self.persist_interval = 0
+
def config_notify(self):
for opt in self.MODULE_OPTIONS:
setattr(self,
self.get_module_option(opt['name']))
self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
- def _osd_out(self, old_map, old_dump, new_map, osd_id):
+ def _osd_in_out(self, old_map, old_dump, new_map, osd_id, marked):
+ # type: (OSDMap, Dict, OSDMap, str, str) -> None
+ # A function that will create or complete an event when an
+ # OSD is marked in or out according to the affected PGs
affected_pgs = []
unmoved_pgs = []
for pool in old_dump['pools']:
- pool_id = pool['pool']
+ pool_id = pool['pool'] # type: str
for ps in range(0, pool['pg_num']):
- up_acting = old_map.pg_to_up_acting_osds(pool['pool'], ps)
- # Was this OSD affected by the OSD going out?
- old_osds = set(up_acting['up']) | set(up_acting['acting'])
- was_on_out_osd = osd_id in old_osds
- if not was_on_out_osd:
+ # Was this OSD affected by the OSD coming in/out?
+ # Compare old and new osds using
+ # data from the json dump
+ old_up_acting = old_map.pg_to_up_acting_osds(pool['pool'], ps)
+ old_osds = set(old_up_acting['acting'])
+ new_up_acting = new_map.pg_to_up_acting_osds(pool['pool'], ps)
+ new_osds = set(new_up_acting['acting'])
+
+ # Check the osd_id being in the acting set for both old
+ # and new maps to cover both out and in cases
+ was_on_out_or_in_osd = osd_id in old_osds or osd_id in new_osds
+ if not was_on_out_or_in_osd:
continue
self.log.debug("pool_id, ps = {0}, {1}".format(
))
self.log.debug(
- "up_acting: {0}".format(json.dumps(up_acting, indent=2)))
-
- new_up_acting = new_map.pg_to_up_acting_osds(pool['pool'], ps)
- new_osds = set(new_up_acting['up']) | set(new_up_acting['acting'])
+ "old_up_acting: {0}".format(json.dumps(old_up_acting, indent=4, sort_keys=True)))
# Has this OSD been assigned a new location?
# (it might not be if there is no suitable place to move
- # after an OSD failure)
- is_relocated = len(new_osds - old_osds) > 0
+ # after an OSD is marked in/out)
+ if marked == "in":
+ is_relocated = len(old_osds - new_osds) > 0
+ else:
+ is_relocated = len(new_osds - old_osds) > 0
self.log.debug(
"new_up_acting: {0}".format(json.dumps(new_up_acting,
- indent=2)))
+ indent=4,
+ sort_keys=True)))
- if was_on_out_osd and is_relocated:
+ if was_on_out_or_in_osd and is_relocated:
# This PG is now in motion, track its progress
affected_pgs.append(PgId(pool_id, ps))
elif not is_relocated:
# In the case that we ignored some PGs, log the reason why (we may
# not end up creating a progress event)
if len(unmoved_pgs):
- self.log.warn("{0} PGs were on osd.{1}, but didn't get new locations".format(
+ self.log.warning("{0} PGs were on osd.{1}, but didn't get new locations".format(
len(unmoved_pgs), osd_id))
- self.log.warn("{0} PGs affected by osd.{1} going out".format(
- len(affected_pgs), osd_id))
-
- if len(affected_pgs) == 0:
- # Don't emit events if there were no PGs
- return
-
- # TODO: reconcile with existing events referring to this OSD going out
- ev = PgRecoveryEvent(
- "Rebalancing after osd.{0} marked out".format(osd_id),
- refs=[("osd", osd_id)],
- which_pgs=affected_pgs,
- evactuate_osds=[osd_id]
- )
- ev.pg_update(self.get("pg_dump"), self.log)
- self._events[ev.id] = ev
-
- def _osd_in(self, osd_id):
- for ev_id, ev in self._events.items():
- if isinstance(ev, PgRecoveryEvent) and osd_id in ev.evacuating_osds:
- self.log.info("osd.{0} came back in, cancelling event".format(
- osd_id
- ))
- self._complete(ev)
+ self.log.warning("{0} PGs affected by osd.{1} being marked {2}".format(
+ len(affected_pgs), osd_id, marked))
+
+
+ # In the case of the osd coming back in, we might need to cancel
+ # previous recovery event for that osd
+ if marked == "in":
+ for ev_id in list(self._events):
+ ev = self._events[ev_id]
+ if isinstance(ev, PgRecoveryEvent) and osd_id in ev.which_osds:
+ self.log.info("osd.{0} came back in, cancelling event".format(
+ osd_id
+ ))
+ self._complete(ev)
+
+ if len(affected_pgs) > 0:
+ r_ev = PgRecoveryEvent(
+ "Rebalancing after osd.{0} marked {1}".format(osd_id, marked),
+ refs=[("osd", osd_id)],
+ which_pgs=affected_pgs,
+ which_osds=[osd_id],
+ start_epoch=self.get_osdmap().get_epoch()
+ )
+ r_ev.pg_update(self.get("pg_stats"), self.get("pg_ready"), self.log)
+ self._events[r_ev.id] = r_ev
def _osdmap_changed(self, old_osdmap, new_osdmap):
+ # type: (OSDMap, OSDMap) -> None
old_dump = old_osdmap.dump()
new_dump = new_osdmap.dump()
old_weight = old_osds[osd_id]['in']
if new_weight == 0.0 and old_weight > new_weight:
- self.log.warn("osd.{0} marked out".format(osd_id))
- self._osd_out(old_osdmap, old_dump, new_osdmap, osd_id)
+ self.log.warning("osd.{0} marked out".format(osd_id))
+ self._osd_in_out(old_osdmap, old_dump, new_osdmap, osd_id, "out")
elif new_weight >= 1.0 and old_weight == 0.0:
# Only consider weight>=1.0 as "in" to avoid spawning
# individual recovery events on every adjustment
# in a gradual weight-in
- self.log.warn("osd.{0} marked in".format(osd_id))
- self._osd_in(osd_id)
+ self.log.warning("osd.{0} marked in".format(osd_id))
+ self._osd_in_out(old_osdmap, old_dump, new_osdmap, osd_id, "in")
def notify(self, notify_type, notify_data):
self._ready.wait()
if notify_type == "osd_map":
old_osdmap = self._latest_osdmap
self._latest_osdmap = self.get_osdmap()
+ assert old_osdmap
+ assert self._latest_osdmap
self.log.info("Processing OSDMap change {0}..{1}".format(
old_osdmap.get_epoch(), self._latest_osdmap.get_epoch()
))
self._osdmap_changed(old_osdmap, self._latest_osdmap)
elif notify_type == "pg_summary":
- data = self.get("pg_dump")
- for ev_id, ev in self._events.items():
+ # if there are no events we will skip this here to avoid
+ # expensive get calls
+ if len(self._events) == 0:
+ return
+ data = self.get("pg_stats")
+ ready = self.get("pg_ready")
+ for ev_id in list(self._events):
+ ev = self._events[ev_id]
if isinstance(ev, PgRecoveryEvent):
- ev.pg_update(data, self.log)
+ ev.pg_update(data, ready, self.log)
self.maybe_complete(ev)
def maybe_complete(self, event):
+ # type: (Event) -> None
if event.progress >= 1.0:
self._complete(event)
raise RuntimeError("Cannot decode version {0}".format(
decoded['compat_version']))
+ if decoded['compat_version'] < ENCODING_VERSION:
+ # we need to add the "started_at" and "finished_at" attributes to the events
+ for ev in decoded['events']:
+ ev['started_at'] = None
+ ev['finished_at'] = None
+
for ev in decoded['events']:
- self._completed_events.append(GhostEvent(ev['id'], ev['message'], ev['refs']))
+ self._completed_events.append(GhostEvent(ev['id'], ev['message'],
+ ev['refs'], ev['started_at'],
+ ev['finished_at'],
+ ev.get('failed', False),
+ ev.get('failure_message')))
self._prune_completed_events()
self._shutdown.set()
self.clear_all_progress_events()
- def update(self, ev_id, ev_msg, ev_progress):
+ def update(self, ev_id, ev_msg, ev_progress, refs=None):
+ # type: (str, str, float, Optional[list]) -> None
"""
For calling from other mgr modules
"""
+ if refs is None:
+ refs = []
try:
+
ev = self._events[ev_id]
+ assert isinstance(ev, RemoteEvent)
except KeyError:
- ev = RemoteEvent(ev_id, ev_msg, [])
+ ev = RemoteEvent(ev_id, ev_msg, refs)
self._events[ev_id] = ev
self.log.info("update: starting ev {0} ({1})".format(
ev_id, ev_msg))
ev_progress, ev_msg))
ev.set_progress(ev_progress)
- ev._refresh()
+ ev.set_message(ev_msg)
def _complete(self, ev):
- duration = (datetime.datetime.utcnow() - ev.started_at)
+ # type: (Event) -> None
+ duration = (time.time() - ev.started_at)
self.log.info("Completed event {0} ({1}) in {2} seconds".format(
- ev.id, ev.message, duration.seconds
+ ev.id, ev.message, int(round(duration))
))
self.complete_progress_event(ev.id)
self._completed_events.append(
- GhostEvent(ev.id, ev.message, ev.refs))
+ GhostEvent(ev.id, ev.message, ev.refs, ev.started_at,
+ failed=ev.failed, failure_message=ev.failure_message))
+ assert ev.id
del self._events[ev.id]
self._prune_completed_events()
self._dirty = True
"""
try:
ev = self._events[ev_id]
+ assert isinstance(ev, RemoteEvent)
ev.set_progress(1.0)
self.log.info("complete: finished ev {0} ({1})".format(ev_id,
ev.message))
self._complete(ev)
except KeyError:
- self.log.warn("complete: ev {0} does not exist".format(ev_id))
+ self.log.warning("complete: ev {0} does not exist".format(ev_id))
pass
+ def fail(self, ev_id, message):
+ """
+ For calling from other mgr modules to mark an event as failed (and
+ complete)
+ """
+ try:
+ ev = self._events[ev_id]
+ assert isinstance(ev, RemoteEvent)
+ ev.set_failed(message)
+ self.log.info("fail: finished ev {0} ({1}): {2}".format(ev_id,
+ ev.message,
+ message))
+ self._complete(ev)
+ except KeyError:
+ self.log.warning("fail: ev {0} does not exist".format(ev_id))
+
def _handle_ls(self):
if len(self._events) or len(self._completed_events):
out = ""
if len(self._completed_events):
# TODO: limit number of completed events to show
out += "\n"
- for ev in self._completed_events:
- out += "[Complete]: {0}\n".format(ev.twoline_progress())
+ for ghost_ev in self._completed_events:
+ out += "[{0}]: {1}\n".format("Complete" if not ghost_ev.failed else "Failed",
+ ghost_ev.twoline_progress())
return 0, out, ""
else:
self._completed_events = []
self._dirty = True
self._save()
+ self.clear_all_progress_events()
return 0, "", ""
# that never finishes)
return self._handle_clear()
elif cmd['prefix'] == "progress json":
- return 0, json.dumps(self._json(), indent=2), ""
+ return 0, json.dumps(self._json(), indent=4, sort_keys=True), ""
else:
raise NotImplementedError(cmd['prefix'])