6 from datetime
import datetime
7 from threading
import Condition
, Lock
, Thread
8 from typing
import Any
, Dict
, List
, Optional
, Tuple
10 from .common
import get_rbd_pools
11 from .schedule
import LevelSpec
, Schedules
14 class TrashPurgeScheduleHandler
:
15 MODULE_OPTION_NAME
= "trash_purge_schedule"
16 SCHEDULE_OID
= "rbd_trash_purge_schedule"
17 REFRESH_DELAY_SECONDS
= 60.0
19 def __init__(self
, module
: Any
) -> None:
21 self
.condition
= Condition(self
.lock
)
24 self
.last_refresh_pools
= datetime(1970, 1, 1)
26 self
.stop_thread
= False
27 self
.thread
= Thread(target
=self
.run
)
29 def setup(self
) -> None:
30 self
.init_schedule_queue()
33 def shutdown(self
) -> None:
34 self
.log
.info("TrashPurgeScheduleHandler: shutting down")
35 self
.stop_thread
= True
36 if self
.thread
.is_alive():
37 self
.log
.debug("TrashPurgeScheduleHandler: joining thread")
39 self
.log
.info("TrashPurgeScheduleHandler: shut down")
41 def run(self
) -> None:
43 self
.log
.info("TrashPurgeScheduleHandler: starting")
44 while not self
.stop_thread
:
45 refresh_delay
= self
.refresh_pools()
47 (ns_spec
, wait_time
) = self
.dequeue()
49 self
.condition
.wait(min(wait_time
, refresh_delay
))
51 pool_id
, namespace
= ns_spec
52 self
.trash_purge(pool_id
, namespace
)
54 self
.enqueue(datetime
.now(), pool_id
, namespace
)
56 except (rados
.ConnectionShutdown
, rbd
.ConnectionShutdown
):
57 self
.log
.exception("TrashPurgeScheduleHandler: client blocklisted")
58 self
.module
.client_blocklisted
.set()
59 except Exception as ex
:
60 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
61 ex
, traceback
.format_exc()))
63 def trash_purge(self
, pool_id
: str, namespace
: str) -> None:
65 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
66 ioctx
.set_namespace(namespace
)
67 rbd
.RBD().trash_purge(ioctx
, datetime
.now())
68 except (rados
.ConnectionShutdown
, rbd
.ConnectionShutdown
):
70 except Exception as e
:
71 self
.log
.error("exception when purging {}/{}: {}".format(
72 pool_id
, namespace
, e
))
74 def init_schedule_queue(self
) -> None:
75 self
.queue
: Dict
[str, List
[Tuple
[str, str]]] = {}
76 # pool_id => {namespace => pool_name}
77 self
.pools
: Dict
[str, Dict
[str, str]] = {}
78 self
.schedules
= Schedules(self
)
80 self
.log
.debug("TrashPurgeScheduleHandler: queue is initialized")
82 def load_schedules(self
) -> None:
83 self
.log
.info("TrashPurgeScheduleHandler: load_schedules")
86 def refresh_pools(self
) -> float:
87 elapsed
= (datetime
.now() - self
.last_refresh_pools
).total_seconds()
88 if elapsed
< self
.REFRESH_DELAY_SECONDS
:
89 return self
.REFRESH_DELAY_SECONDS
- elapsed
91 self
.log
.debug("TrashPurgeScheduleHandler: refresh_pools")
95 if not self
.schedules
:
96 self
.log
.debug("TrashPurgeScheduleHandler: no schedules")
99 self
.last_refresh_pools
= datetime
.now()
100 return self
.REFRESH_DELAY_SECONDS
102 pools
: Dict
[str, Dict
[str, str]] = {}
104 for pool_id
, pool_name
in get_rbd_pools(self
.module
).items():
105 if not self
.schedules
.intersects(
106 LevelSpec
.from_pool_spec(pool_id
, pool_name
)):
108 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
109 self
.load_pool(ioctx
, pools
)
112 self
.refresh_queue(pools
)
115 self
.last_refresh_pools
= datetime
.now()
116 return self
.REFRESH_DELAY_SECONDS
118 def load_pool(self
, ioctx
: rados
.Ioctx
, pools
: Dict
[str, Dict
[str, str]]) -> None:
119 pool_id
= str(ioctx
.get_pool_id())
120 pool_name
= ioctx
.get_pool_name()
122 pool_namespaces
= ['']
124 self
.log
.debug("load_pool: {}".format(pool_name
))
127 pool_namespaces
+= rbd
.RBD().namespace_list(ioctx
)
128 except rbd
.OperationNotSupported
:
129 self
.log
.debug("namespaces not supported")
130 except rbd
.ConnectionShutdown
:
132 except Exception as e
:
133 self
.log
.error("exception when scanning pool {}: {}".format(
136 for namespace
in pool_namespaces
:
137 pools
[pool_id
][namespace
] = pool_name
139 def rebuild_queue(self
) -> None:
142 # don't remove from queue "due" images
143 now_string
= datetime
.strftime(now
, "%Y-%m-%d %H:%M:00")
145 for schedule_time
in list(self
.queue
):
146 if schedule_time
> now_string
:
147 del self
.queue
[schedule_time
]
149 if not self
.schedules
:
152 for pool_id
, namespaces
in self
.pools
.items():
153 for namespace
in namespaces
:
154 self
.enqueue(now
, pool_id
, namespace
)
156 self
.condition
.notify()
158 def refresh_queue(self
, current_pools
: Dict
[str, Dict
[str, str]]) -> None:
161 for pool_id
, namespaces
in self
.pools
.items():
162 for namespace
in namespaces
:
163 if pool_id
not in current_pools
or \
164 namespace
not in current_pools
[pool_id
]:
165 self
.remove_from_queue(pool_id
, namespace
)
167 for pool_id
, namespaces
in current_pools
.items():
168 for namespace
in namespaces
:
169 if pool_id
not in self
.pools
or \
170 namespace
not in self
.pools
[pool_id
]:
171 self
.enqueue(now
, pool_id
, namespace
)
173 self
.condition
.notify()
175 def enqueue(self
, now
: datetime
, pool_id
: str, namespace
: str) -> None:
176 schedule
= self
.schedules
.find(pool_id
, namespace
)
179 "TrashPurgeScheduleHandler: no schedule for {}/{}".format(
183 schedule_time
= schedule
.next_run(now
)
184 if schedule_time
not in self
.queue
:
185 self
.queue
[schedule_time
] = []
187 "TrashPurgeScheduleHandler: scheduling {}/{} at {}".format(
188 pool_id
, namespace
, schedule_time
))
189 ns_spec
= (pool_id
, namespace
)
190 if ns_spec
not in self
.queue
[schedule_time
]:
191 self
.queue
[schedule_time
].append((pool_id
, namespace
))
193 def dequeue(self
) -> Tuple
[Optional
[Tuple
[str, str]], float]:
198 schedule_time
= sorted(self
.queue
)[0]
200 if datetime
.strftime(now
, "%Y-%m-%d %H:%M:%S") < schedule_time
:
201 wait_time
= (datetime
.strptime(schedule_time
,
202 "%Y-%m-%d %H:%M:%S") - now
)
203 return None, wait_time
.total_seconds()
205 namespaces
= self
.queue
[schedule_time
]
206 namespace
= namespaces
.pop(0)
208 del self
.queue
[schedule_time
]
209 return namespace
, 0.0
211 def remove_from_queue(self
, pool_id
: str, namespace
: str) -> None:
213 "TrashPurgeScheduleHandler: descheduling {}/{}".format(
217 for schedule_time
, namespaces
in self
.queue
.items():
218 if (pool_id
, namespace
) in namespaces
:
219 namespaces
.remove((pool_id
, namespace
))
221 empty_slots
.append(schedule_time
)
222 for schedule_time
in empty_slots
:
223 del self
.queue
[schedule_time
]
225 def add_schedule(self
,
226 level_spec
: LevelSpec
,
228 start_time
: Optional
[str]) -> Tuple
[int, str, str]:
230 "TrashPurgeScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
231 level_spec
.name
, interval
, start_time
))
233 # TODO: optimize to rebuild only affected part of the queue
235 self
.schedules
.add(level_spec
, interval
, start_time
)
239 def remove_schedule(self
,
240 level_spec
: LevelSpec
,
241 interval
: Optional
[str],
242 start_time
: Optional
[str]) -> Tuple
[int, str, str]:
244 "TrashPurgeScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
245 level_spec
.name
, interval
, start_time
))
247 # TODO: optimize to rebuild only affected part of the queue
249 self
.schedules
.remove(level_spec
, interval
, start_time
)
253 def list(self
, level_spec
: LevelSpec
) -> Tuple
[int, str, str]:
255 "TrashPurgeScheduleHandler: list: level_spec={}".format(
259 result
= self
.schedules
.to_list(level_spec
)
261 return 0, json
.dumps(result
, indent
=4, sort_keys
=True), ""
263 def status(self
, level_spec
: LevelSpec
) -> Tuple
[int, str, str]:
265 "TrashPurgeScheduleHandler: status: level_spec={}".format(
270 for schedule_time
in sorted(self
.queue
):
271 for pool_id
, namespace
in self
.queue
[schedule_time
]:
272 if not level_spec
.matches(pool_id
, namespace
):
274 pool_name
= self
.pools
[pool_id
][namespace
]
276 'schedule_time': schedule_time
,
278 'pool_name': pool_name
,
279 'namespace': namespace
281 return 0, json
.dumps({'scheduled': scheduled
}, indent
=4,