]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/rbd_support/trash_purge_schedule.py
import ceph quincy 17.2.4
[ceph.git] / ceph / src / pybind / mgr / rbd_support / trash_purge_schedule.py
index 2eaad833c9b6f0c13859cd6cd1ae6fe0548d2489..d9bf24cf27cd49cd4ed3398c28e02286e861a6e0 100644 (file)
@@ -16,6 +16,7 @@ from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
 class TrashPurgeScheduleHandler:
     MODULE_OPTION_NAME = "trash_purge_schedule"
     SCHEDULE_OID = "rbd_trash_purge_schedule"
+    REFRESH_DELAY_SECONDS = 60.0
 
     lock = Lock()
     condition = Condition(lock)
@@ -35,11 +36,11 @@ class TrashPurgeScheduleHandler:
         try:
             self.log.info("TrashPurgeScheduleHandler: starting")
             while True:
-                self.refresh_pools()
+                refresh_delay = self.refresh_pools()
                 with self.lock:
                     (ns_spec, wait_time) = self.dequeue()
                     if not ns_spec:
-                        self.condition.wait(min(wait_time, 60))
+                        self.condition.wait(min(wait_time, refresh_delay))
                         continue
                 pool_id, namespace = ns_spec
                 self.trash_purge(pool_id, namespace)
@@ -64,23 +65,30 @@ class TrashPurgeScheduleHandler:
         # pool_id => {namespace => pool_name}
         self.pools: Dict[str, Dict[str, str]] = {}
         self.refresh_pools()
-        self.log.debug("scheduler queue is initialized")
+        self.log.debug("TrashPurgeScheduleHandler: queue is initialized")
 
     def load_schedules(self) -> None:
         self.log.info("TrashPurgeScheduleHandler: load_schedules")
 
         schedules = Schedules(self)
         schedules.load()
-        with self.lock:
-            self.schedules = schedules
+        self.schedules = schedules
 
-    def refresh_pools(self) -> None:
-        if (datetime.now() - self.last_refresh_pools).seconds < 60:
-            return
+    def refresh_pools(self) -> float:
+        elapsed = (datetime.now() - self.last_refresh_pools).total_seconds()
+        if elapsed < self.REFRESH_DELAY_SECONDS:
+            return self.REFRESH_DELAY_SECONDS - elapsed
 
         self.log.debug("TrashPurgeScheduleHandler: refresh_pools")
 
-        self.load_schedules()
+        with self.lock:
+            self.load_schedules()
+            if not self.schedules:
+                self.log.debug("TrashPurgeScheduleHandler: no schedules")
+                self.pools = {}
+                self.queue = {}
+                self.last_refresh_pools = datetime.now()
+                return self.REFRESH_DELAY_SECONDS
 
         pools: Dict[str, Dict[str, str]] = {}
 
@@ -96,6 +104,7 @@ class TrashPurgeScheduleHandler:
             self.pools = pools
 
         self.last_refresh_pools = datetime.now()
+        return self.REFRESH_DELAY_SECONDS
 
     def load_pool(self, ioctx: rados.Ioctx, pools: Dict[str, Dict[str, str]]) -> None:
         pool_id = str(ioctx.get_pool_id())
@@ -117,24 +126,23 @@ class TrashPurgeScheduleHandler:
             pools[pool_id][namespace] = pool_name
 
     def rebuild_queue(self) -> None:
-        with self.lock:
-            now = datetime.now()
+        now = datetime.now()
 
-            # don't remove from queue "due" images
-            now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
+        # don't remove from queue "due" images
+        now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
 
-            for schedule_time in list(self.queue):
-                if schedule_time > now_string:
-                    del self.queue[schedule_time]
+        for schedule_time in list(self.queue):
+            if schedule_time > now_string:
+                del self.queue[schedule_time]
 
-            if not self.schedules:
-                return
+        if not self.schedules:
+            return
 
-            for pool_id, namespaces in self.pools.items():
-                for namespace in namespaces:
-                    self.enqueue(now, pool_id, namespace)
+        for pool_id, namespaces in self.pools.items():
+            for namespace in namespaces:
+                self.enqueue(now, pool_id, namespace)
 
-            self.condition.notify()
+        self.condition.notify()
 
     def refresh_queue(self, current_pools: Dict[str, Dict[str, str]]) -> None:
         now = datetime.now()
@@ -154,16 +162,19 @@ class TrashPurgeScheduleHandler:
         self.condition.notify()
 
     def enqueue(self, now: datetime, pool_id: str, namespace: str) -> None:
-
         schedule = self.schedules.find(pool_id, namespace)
         if not schedule:
+            self.log.debug(
+                "TrashPurgeScheduleHandler: no schedule for {}/{}".format(
+                    pool_id, namespace))
             return
 
         schedule_time = schedule.next_run(now)
         if schedule_time not in self.queue:
             self.queue[schedule_time] = []
-        self.log.debug("schedule {}/{} at {}".format(
-            pool_id, namespace, schedule_time))
+        self.log.debug(
+            "TrashPurgeScheduleHandler: scheduling {}/{} at {}".format(
+                pool_id, namespace, schedule_time))
         ns_spec = (pool_id, namespace)
         if ns_spec not in self.queue[schedule_time]:
             self.queue[schedule_time].append((pool_id, namespace))
@@ -187,6 +198,10 @@ class TrashPurgeScheduleHandler:
         return namespace, 0.0
 
     def remove_from_queue(self, pool_id: str, namespace: str) -> None:
+        self.log.debug(
+            "TrashPurgeScheduleHandler: descheduling {}/{}".format(
+                pool_id, namespace))
+
         empty_slots = []
         for schedule_time, namespaces in self.queue.items():
             if (pool_id, namespace) in namespaces:
@@ -201,14 +216,13 @@ class TrashPurgeScheduleHandler:
                      interval: str,
                      start_time: Optional[str]) -> Tuple[int, str, str]:
         self.log.debug(
-            "add_schedule: level_spec={}, interval={}, start_time={}".format(
+            "TrashPurgeScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
                 level_spec.name, interval, start_time))
 
+        # TODO: optimize to rebuild only affected part of the queue
         with self.lock:
             self.schedules.add(level_spec, interval, start_time)
-
-        # TODO: optimize to rebuild only affected part of the queue
-        self.rebuild_queue()
+            self.rebuild_queue()
         return 0, "", ""
 
     def remove_schedule(self,
@@ -216,18 +230,19 @@ class TrashPurgeScheduleHandler:
                         interval: Optional[str],
                         start_time: Optional[str]) -> Tuple[int, str, str]:
         self.log.debug(
-            "remove_schedule: level_spec={}, interval={}, start_time={}".format(
+            "TrashPurgeScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
                 level_spec.name, interval, start_time))
 
+        # TODO: optimize to rebuild only affected part of the queue
         with self.lock:
             self.schedules.remove(level_spec, interval, start_time)
-
-        # TODO: optimize to rebuild only affected part of the queue
-        self.rebuild_queue()
+            self.rebuild_queue()
         return 0, "", ""
 
     def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
-        self.log.debug("list: level_spec={}".format(level_spec.name))
+        self.log.debug(
+            "TrashPurgeScheduleHandler: list: level_spec={}".format(
+                level_spec.name))
 
         with self.lock:
             result = self.schedules.to_list(level_spec)
@@ -235,7 +250,9 @@ class TrashPurgeScheduleHandler:
         return 0, json.dumps(result, indent=4, sort_keys=True), ""
 
     def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
-        self.log.debug("status: level_spec={}".format(level_spec.name))
+        self.log.debug(
+            "TrashPurgeScheduleHandler: status: level_spec={}".format(
+                level_spec.name))
 
         scheduled = []
         with self.lock: