def which_osds(self):
return self. _which_osds
- def pg_update(self, raw_pg_stats, pg_ready, log):
- # type: (Dict, bool, Any) -> None
+ def pg_update(self, pg_progress: Dict, log: Any) -> None:
# FIXME: O(pg_num) in python
- # FIXME: far more fields getting pythonized than we really care about
# Sanity check to see if there are any missing PGs and to assign
# empty array and dictionary if there hasn't been any recovery
- pg_to_state = dict((p['pgid'], p) for p in raw_pg_stats['pg_stats']) # type: Dict[str, Any]
+ pg_to_state: Dict[str, Any] = pg_progress["pgs"]
+ pg_ready: bool = pg_progress["pg_ready"]
+
if self._original_bytes_recovered is None:
self._original_bytes_recovered = {}
missing_pgs = []
pg_str = str(pg)
if pg_str in pg_to_state:
self._original_bytes_recovered[pg] = \
- pg_to_state[pg_str]['stat_sum']['num_bytes_recovered']
+ pg_to_state[pg_str]['num_bytes_recovered']
else:
missing_pgs.append(pg)
if pg_ready:
if "active" in states and "clean" in states:
complete.add(pg)
else:
- if info['stat_sum']['num_bytes'] == 0:
+ if info['num_bytes'] == 0:
# Empty PGs are considered 0% done until they are
# in the correct state.
pass
else:
- recovered = info['stat_sum']['num_bytes_recovered']
- total_bytes = info['stat_sum']['num_bytes']
+ recovered = info['num_bytes_recovered']
+ total_bytes = info['num_bytes']
if total_bytes > 0:
ratio = float(recovered -
self._original_bytes_recovered[pg]) / \
'enabled',
default=True,
type='bool',
+ ),
+ Option(
+ 'allow_pg_recovery_event',
+ default=False,
+ type='bool',
+ desc='allow the module to show pg recovery progress',
+ runtime=True
)
]
self.max_completed_events = 0
self.sleep_interval = 0
self.enabled = True
+ self.allow_pg_recovery_event = False
def config_notify(self):
for opt in self.MODULE_OPTIONS:
# previous recovery event for that osd
if marked == "in":
for ev_id in list(self._events):
- ev = self._events[ev_id]
- if isinstance(ev, PgRecoveryEvent) and osd_id in ev.which_osds:
- self.log.info("osd.{0} came back in, cancelling event".format(
- osd_id
- ))
- self._complete(ev)
+ try:
+ ev = self._events[ev_id]
+ if isinstance(ev, PgRecoveryEvent) and osd_id in ev.which_osds:
+ self.log.info("osd.{0} came back in, cancelling event".format(
+ osd_id
+ ))
+ self._complete(ev)
+ except KeyError:
+ self.log.warning("_osd_in_out: ev {0} does not exist".format(ev_id))
if len(affected_pgs) > 0:
r_ev = PgRecoveryEvent(
start_epoch=self.get_osdmap().get_epoch(),
add_to_ceph_s=False
)
- r_ev.pg_update(self.get("pg_stats"), self.get("pg_ready"), self.log)
+ r_ev.pg_update(self.get("pg_progress"), self.log)
self._events[r_ev.id] = r_ev
def _osdmap_changed(self, old_osdmap, new_osdmap):
return
global_event = False
- data = self.get("pg_stats")
- ready = self.get("pg_ready")
+ data = self.get("pg_progress")
for ev_id in list(self._events):
- ev = self._events[ev_id]
- # Check for types of events
- # we have to update
- if isinstance(ev, PgRecoveryEvent):
- ev.pg_update(data, ready, self.log)
- self.maybe_complete(ev)
- elif isinstance(ev, GlobalRecoveryEvent):
- global_event = True
- ev.global_event_update_progress(self.log)
- self.maybe_complete(ev)
+ try:
+ ev = self._events[ev_id]
+ # Check for types of events
+ # we have to update
+ if isinstance(ev, PgRecoveryEvent):
+ ev.pg_update(data, self.log)
+ self.maybe_complete(ev)
+ elif isinstance(ev, GlobalRecoveryEvent):
+ global_event = True
+ ev.global_event_update_progress(self.log)
+ self.maybe_complete(ev)
+ except KeyError:
+ self.log.warning("_process_pg_summary: ev {0} does not exist".format(ev_id))
+ continue
if not global_event:
# If there is no global event
self._dirty = False
if self.enabled:
- self._process_osdmap()
+ if self.allow_pg_recovery_event:
+ self._process_osdmap()
self._process_pg_summary()
self._shutdown.wait(timeout=self.sleep_interval)
ev = self._events[ev_id]
assert isinstance(ev, RemoteEvent)
except KeyError:
+ # if key doesn't exist we create an event
ev = RemoteEvent(ev_id, ev_msg, refs, add_to_ceph_s)
self._events[ev_id] = ev
self.log.info("update: starting ev {0} ({1})".format(