class TrashPurgeScheduleHandler:
MODULE_OPTION_NAME = "trash_purge_schedule"
SCHEDULE_OID = "rbd_trash_purge_schedule"
+ REFRESH_DELAY_SECONDS = 60.0
lock = Lock()
condition = Condition(lock)
try:
self.log.info("TrashPurgeScheduleHandler: starting")
while True:
- self.refresh_pools()
+ refresh_delay = self.refresh_pools()
with self.lock:
(ns_spec, wait_time) = self.dequeue()
if not ns_spec:
- self.condition.wait(min(wait_time, 60))
+ self.condition.wait(min(wait_time, refresh_delay))
continue
pool_id, namespace = ns_spec
self.trash_purge(pool_id, namespace)
# pool_id => {namespace => pool_name}
self.pools: Dict[str, Dict[str, str]] = {}
self.refresh_pools()
- self.log.debug("scheduler queue is initialized")
+ self.log.debug("TrashPurgeScheduleHandler: queue is initialized")
def load_schedules(self) -> None:
self.log.info("TrashPurgeScheduleHandler: load_schedules")
schedules = Schedules(self)
schedules.load()
- with self.lock:
- self.schedules = schedules
+ self.schedules = schedules
- def refresh_pools(self) -> None:
- if (datetime.now() - self.last_refresh_pools).seconds < 60:
- return
+ def refresh_pools(self) -> float:
+ elapsed = (datetime.now() - self.last_refresh_pools).total_seconds()
+ if elapsed < self.REFRESH_DELAY_SECONDS:
+ return self.REFRESH_DELAY_SECONDS - elapsed
self.log.debug("TrashPurgeScheduleHandler: refresh_pools")
- self.load_schedules()
+ with self.lock:
+ self.load_schedules()
+ if not self.schedules:
+ self.log.debug("TrashPurgeScheduleHandler: no schedules")
+ self.pools = {}
+ self.queue = {}
+ self.last_refresh_pools = datetime.now()
+ return self.REFRESH_DELAY_SECONDS
pools: Dict[str, Dict[str, str]] = {}
self.pools = pools
self.last_refresh_pools = datetime.now()
+ return self.REFRESH_DELAY_SECONDS
def load_pool(self, ioctx: rados.Ioctx, pools: Dict[str, Dict[str, str]]) -> None:
pool_id = str(ioctx.get_pool_id())
pools[pool_id][namespace] = pool_name
def rebuild_queue(self) -> None:
- with self.lock:
- now = datetime.now()
+ now = datetime.now()
- # don't remove from queue "due" images
- now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
+ # don't remove from queue "due" images
+ now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
- for schedule_time in list(self.queue):
- if schedule_time > now_string:
- del self.queue[schedule_time]
+ for schedule_time in list(self.queue):
+ if schedule_time > now_string:
+ del self.queue[schedule_time]
- if not self.schedules:
- return
+ if not self.schedules:
+ return
- for pool_id, namespaces in self.pools.items():
- for namespace in namespaces:
- self.enqueue(now, pool_id, namespace)
+ for pool_id, namespaces in self.pools.items():
+ for namespace in namespaces:
+ self.enqueue(now, pool_id, namespace)
- self.condition.notify()
+ self.condition.notify()
def refresh_queue(self, current_pools: Dict[str, Dict[str, str]]) -> None:
now = datetime.now()
self.condition.notify()
def enqueue(self, now: datetime, pool_id: str, namespace: str) -> None:
-
schedule = self.schedules.find(pool_id, namespace)
if not schedule:
+ self.log.debug(
+ "TrashPurgeScheduleHandler: no schedule for {}/{}".format(
+ pool_id, namespace))
return
schedule_time = schedule.next_run(now)
if schedule_time not in self.queue:
self.queue[schedule_time] = []
- self.log.debug("schedule {}/{} at {}".format(
- pool_id, namespace, schedule_time))
+ self.log.debug(
+ "TrashPurgeScheduleHandler: scheduling {}/{} at {}".format(
+ pool_id, namespace, schedule_time))
ns_spec = (pool_id, namespace)
if ns_spec not in self.queue[schedule_time]:
self.queue[schedule_time].append((pool_id, namespace))
return namespace, 0.0
def remove_from_queue(self, pool_id: str, namespace: str) -> None:
+ self.log.debug(
+ "TrashPurgeScheduleHandler: descheduling {}/{}".format(
+ pool_id, namespace))
+
empty_slots = []
for schedule_time, namespaces in self.queue.items():
if (pool_id, namespace) in namespaces:
interval: str,
start_time: Optional[str]) -> Tuple[int, str, str]:
self.log.debug(
- "add_schedule: level_spec={}, interval={}, start_time={}".format(
+ "TrashPurgeScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
level_spec.name, interval, start_time))
+ # TODO: optimize to rebuild only affected part of the queue
with self.lock:
self.schedules.add(level_spec, interval, start_time)
-
- # TODO: optimize to rebuild only affected part of the queue
- self.rebuild_queue()
+ self.rebuild_queue()
return 0, "", ""
def remove_schedule(self,
interval: Optional[str],
start_time: Optional[str]) -> Tuple[int, str, str]:
self.log.debug(
- "remove_schedule: level_spec={}, interval={}, start_time={}".format(
+ "TrashPurgeScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
level_spec.name, interval, start_time))
+ # TODO: optimize to rebuild only affected part of the queue
with self.lock:
self.schedules.remove(level_spec, interval, start_time)
-
- # TODO: optimize to rebuild only affected part of the queue
- self.rebuild_queue()
+ self.rebuild_queue()
return 0, "", ""
def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
- self.log.debug("list: level_spec={}".format(level_spec.name))
+ self.log.debug(
+ "TrashPurgeScheduleHandler: list: level_spec={}".format(
+ level_spec.name))
with self.lock:
result = self.schedules.to_list(level_spec)
return 0, json.dumps(result, indent=4, sort_keys=True), ""
def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
- self.log.debug("status: level_spec={}".format(level_spec.name))
+ self.log.debug(
+ "TrashPurgeScheduleHandler: status: level_spec={}".format(
+ level_spec.name))
scheduled = []
with self.lock: