X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fpybind%2Fmgr%2Frbd_support%2Ftrash_purge_schedule.py;h=d9bf24cf27cd49cd4ed3398c28e02286e861a6e0;hb=2a845540123ad00df2e55947b8080306ebdcf410;hp=2eaad833c9b6f0c13859cd6cd1ae6fe0548d2489;hpb=0948533fc3b372aaa92e1cd3da22f2258220e199;p=ceph.git diff --git a/ceph/src/pybind/mgr/rbd_support/trash_purge_schedule.py b/ceph/src/pybind/mgr/rbd_support/trash_purge_schedule.py index 2eaad833c..d9bf24cf2 100644 --- a/ceph/src/pybind/mgr/rbd_support/trash_purge_schedule.py +++ b/ceph/src/pybind/mgr/rbd_support/trash_purge_schedule.py @@ -16,6 +16,7 @@ from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules class TrashPurgeScheduleHandler: MODULE_OPTION_NAME = "trash_purge_schedule" SCHEDULE_OID = "rbd_trash_purge_schedule" + REFRESH_DELAY_SECONDS = 60.0 lock = Lock() condition = Condition(lock) @@ -35,11 +36,11 @@ class TrashPurgeScheduleHandler: try: self.log.info("TrashPurgeScheduleHandler: starting") while True: - self.refresh_pools() + refresh_delay = self.refresh_pools() with self.lock: (ns_spec, wait_time) = self.dequeue() if not ns_spec: - self.condition.wait(min(wait_time, 60)) + self.condition.wait(min(wait_time, refresh_delay)) continue pool_id, namespace = ns_spec self.trash_purge(pool_id, namespace) @@ -64,23 +65,30 @@ class TrashPurgeScheduleHandler: # pool_id => {namespace => pool_name} self.pools: Dict[str, Dict[str, str]] = {} self.refresh_pools() - self.log.debug("scheduler queue is initialized") + self.log.debug("TrashPurgeScheduleHandler: queue is initialized") def load_schedules(self) -> None: self.log.info("TrashPurgeScheduleHandler: load_schedules") schedules = Schedules(self) schedules.load() - with self.lock: - self.schedules = schedules + self.schedules = schedules - def refresh_pools(self) -> None: - if (datetime.now() - self.last_refresh_pools).seconds < 60: - return + def refresh_pools(self) -> float: + elapsed = (datetime.now() - self.last_refresh_pools).total_seconds() + if elapsed < self.REFRESH_DELAY_SECONDS: + return self.REFRESH_DELAY_SECONDS - elapsed self.log.debug("TrashPurgeScheduleHandler: refresh_pools") - self.load_schedules() + with self.lock: + self.load_schedules() + if not self.schedules: + self.log.debug("TrashPurgeScheduleHandler: no schedules") + self.pools = {} + self.queue = {} + self.last_refresh_pools = datetime.now() + return self.REFRESH_DELAY_SECONDS pools: Dict[str, Dict[str, str]] = {} @@ -96,6 +104,7 @@ class TrashPurgeScheduleHandler: self.pools = pools self.last_refresh_pools = datetime.now() + return self.REFRESH_DELAY_SECONDS def load_pool(self, ioctx: rados.Ioctx, pools: Dict[str, Dict[str, str]]) -> None: pool_id = str(ioctx.get_pool_id()) @@ -117,24 +126,23 @@ class TrashPurgeScheduleHandler: pools[pool_id][namespace] = pool_name def rebuild_queue(self) -> None: - with self.lock: - now = datetime.now() + now = datetime.now() - # don't remove from queue "due" images - now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00") + # don't remove from queue "due" images + now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00") - for schedule_time in list(self.queue): - if schedule_time > now_string: - del self.queue[schedule_time] + for schedule_time in list(self.queue): + if schedule_time > now_string: + del self.queue[schedule_time] - if not self.schedules: - return + if not self.schedules: + return - for pool_id, namespaces in self.pools.items(): - for namespace in namespaces: - self.enqueue(now, pool_id, namespace) + for pool_id, namespaces in self.pools.items(): + for namespace in namespaces: + self.enqueue(now, pool_id, namespace) - self.condition.notify() + self.condition.notify() def refresh_queue(self, current_pools: Dict[str, Dict[str, str]]) -> None: now = datetime.now() @@ -154,16 +162,19 @@ class TrashPurgeScheduleHandler: self.condition.notify() def enqueue(self, now: datetime, pool_id: str, namespace: str) -> None: - schedule = self.schedules.find(pool_id, namespace) if not schedule: + self.log.debug( + "TrashPurgeScheduleHandler: no schedule for {}/{}".format( + pool_id, namespace)) return schedule_time = schedule.next_run(now) if schedule_time not in self.queue: self.queue[schedule_time] = [] - self.log.debug("schedule {}/{} at {}".format( - pool_id, namespace, schedule_time)) + self.log.debug( + "TrashPurgeScheduleHandler: scheduling {}/{} at {}".format( + pool_id, namespace, schedule_time)) ns_spec = (pool_id, namespace) if ns_spec not in self.queue[schedule_time]: self.queue[schedule_time].append((pool_id, namespace)) @@ -187,6 +198,10 @@ class TrashPurgeScheduleHandler: return namespace, 0.0 def remove_from_queue(self, pool_id: str, namespace: str) -> None: + self.log.debug( + "TrashPurgeScheduleHandler: descheduling {}/{}".format( + pool_id, namespace)) + empty_slots = [] for schedule_time, namespaces in self.queue.items(): if (pool_id, namespace) in namespaces: @@ -201,14 +216,13 @@ class TrashPurgeScheduleHandler: interval: str, start_time: Optional[str]) -> Tuple[int, str, str]: self.log.debug( - "add_schedule: level_spec={}, interval={}, start_time={}".format( + "TrashPurgeScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format( level_spec.name, interval, start_time)) + # TODO: optimize to rebuild only affected part of the queue with self.lock: self.schedules.add(level_spec, interval, start_time) - - # TODO: optimize to rebuild only affected part of the queue - self.rebuild_queue() + self.rebuild_queue() return 0, "", "" def remove_schedule(self, @@ -216,18 +230,19 @@ class TrashPurgeScheduleHandler: interval: Optional[str], start_time: Optional[str]) -> Tuple[int, str, str]: self.log.debug( - "remove_schedule: level_spec={}, interval={}, start_time={}".format( + "TrashPurgeScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format( level_spec.name, interval, start_time)) + # TODO: optimize to rebuild only affected part of the queue with self.lock: self.schedules.remove(level_spec, interval, start_time) - - # TODO: optimize to rebuild only affected part of the queue - self.rebuild_queue() + self.rebuild_queue() return 0, "", "" def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]: - self.log.debug("list: level_spec={}".format(level_spec.name)) + self.log.debug( + "TrashPurgeScheduleHandler: list: level_spec={}".format( + level_spec.name)) with self.lock: result = self.schedules.to_list(level_spec) @@ -235,7 +250,9 @@ class TrashPurgeScheduleHandler: return 0, json.dumps(result, indent=4, sort_keys=True), "" def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]: - self.log.debug("status: level_spec={}".format(level_spec.name)) + self.log.debug( + "TrashPurgeScheduleHandler: status: level_spec={}".format( + level_spec.name)) scheduled = [] with self.lock: