]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | import json |
2 | import rados | |
3 | import rbd | |
9f95a23c TL |
4 | import traceback |
5 | ||
6 | from datetime import datetime | |
7 | from threading import Condition, Lock, Thread | |
20effc67 | 8 | from typing import Any, Dict, List, Optional, Tuple |
9f95a23c TL |
9 | |
10 | from .common import get_rbd_pools | |
1e59de90 | 11 | from .schedule import LevelSpec, Schedules |
9f95a23c TL |
12 | |
13 | ||
14 | class TrashPurgeScheduleHandler: | |
15 | MODULE_OPTION_NAME = "trash_purge_schedule" | |
e306af50 | 16 | SCHEDULE_OID = "rbd_trash_purge_schedule" |
2a845540 | 17 | REFRESH_DELAY_SECONDS = 60.0 |
9f95a23c TL |
18 | |
19 | lock = Lock() | |
20 | condition = Condition(lock) | |
9f95a23c | 21 | |
20effc67 | 22 | def __init__(self, module: Any) -> None: |
9f95a23c TL |
23 | self.module = module |
24 | self.log = module.log | |
25 | self.last_refresh_pools = datetime(1970, 1, 1) | |
26 | ||
1e59de90 | 27 | self.stop_thread = False |
9f95a23c | 28 | self.thread = Thread(target=self.run) |
1e59de90 TL |
29 | |
30 | def setup(self) -> None: | |
31 | self.init_schedule_queue() | |
9f95a23c TL |
32 | self.thread.start() |
33 | ||
1e59de90 TL |
34 | def shutdown(self) -> None: |
35 | self.log.info("TrashPurgeScheduleHandler: shutting down") | |
36 | self.stop_thread = True | |
37 | if self.thread.is_alive(): | |
38 | self.log.debug("TrashPurgeScheduleHandler: joining thread") | |
39 | self.thread.join() | |
40 | self.log.info("TrashPurgeScheduleHandler: shut down") | |
41 | ||
20effc67 | 42 | def run(self) -> None: |
9f95a23c TL |
43 | try: |
44 | self.log.info("TrashPurgeScheduleHandler: starting") | |
1e59de90 | 45 | while not self.stop_thread: |
2a845540 | 46 | refresh_delay = self.refresh_pools() |
9f95a23c TL |
47 | with self.lock: |
48 | (ns_spec, wait_time) = self.dequeue() | |
49 | if not ns_spec: | |
2a845540 | 50 | self.condition.wait(min(wait_time, refresh_delay)) |
9f95a23c TL |
51 | continue |
52 | pool_id, namespace = ns_spec | |
53 | self.trash_purge(pool_id, namespace) | |
54 | with self.lock: | |
55 | self.enqueue(datetime.now(), pool_id, namespace) | |
56 | ||
1e59de90 TL |
57 | except (rados.ConnectionShutdown, rbd.ConnectionShutdown): |
58 | self.log.exception("TrashPurgeScheduleHandler: client blocklisted") | |
59 | self.module.client_blocklisted.set() | |
9f95a23c TL |
60 | except Exception as ex: |
61 | self.log.fatal("Fatal runtime error: {}\n{}".format( | |
62 | ex, traceback.format_exc())) | |
63 | ||
20effc67 | 64 | def trash_purge(self, pool_id: str, namespace: str) -> None: |
9f95a23c TL |
65 | try: |
66 | with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: | |
67 | ioctx.set_namespace(namespace) | |
68 | rbd.RBD().trash_purge(ioctx, datetime.now()) | |
1e59de90 TL |
69 | except (rados.ConnectionShutdown, rbd.ConnectionShutdown): |
70 | raise | |
9f95a23c | 71 | except Exception as e: |
1e59de90 | 72 | self.log.error("exception when purging {}/{}: {}".format( |
9f95a23c TL |
73 | pool_id, namespace, e)) |
74 | ||
20effc67 TL |
75 | def init_schedule_queue(self) -> None: |
76 | self.queue: Dict[str, List[Tuple[str, str]]] = {} | |
77 | # pool_id => {namespace => pool_name} | |
78 | self.pools: Dict[str, Dict[str, str]] = {} | |
39ae355f | 79 | self.schedules = Schedules(self) |
9f95a23c | 80 | self.refresh_pools() |
2a845540 | 81 | self.log.debug("TrashPurgeScheduleHandler: queue is initialized") |
9f95a23c | 82 | |
20effc67 | 83 | def load_schedules(self) -> None: |
9f95a23c | 84 | self.log.info("TrashPurgeScheduleHandler: load_schedules") |
39ae355f | 85 | self.schedules.load() |
9f95a23c | 86 | |
2a845540 TL |
87 | def refresh_pools(self) -> float: |
88 | elapsed = (datetime.now() - self.last_refresh_pools).total_seconds() | |
89 | if elapsed < self.REFRESH_DELAY_SECONDS: | |
90 | return self.REFRESH_DELAY_SECONDS - elapsed | |
9f95a23c TL |
91 | |
92 | self.log.debug("TrashPurgeScheduleHandler: refresh_pools") | |
93 | ||
2a845540 TL |
94 | with self.lock: |
95 | self.load_schedules() | |
96 | if not self.schedules: | |
97 | self.log.debug("TrashPurgeScheduleHandler: no schedules") | |
98 | self.pools = {} | |
99 | self.queue = {} | |
100 | self.last_refresh_pools = datetime.now() | |
101 | return self.REFRESH_DELAY_SECONDS | |
9f95a23c | 102 | |
20effc67 | 103 | pools: Dict[str, Dict[str, str]] = {} |
9f95a23c TL |
104 | |
105 | for pool_id, pool_name in get_rbd_pools(self.module).items(): | |
106 | if not self.schedules.intersects( | |
107 | LevelSpec.from_pool_spec(pool_id, pool_name)): | |
108 | continue | |
109 | with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: | |
110 | self.load_pool(ioctx, pools) | |
111 | ||
112 | with self.lock: | |
113 | self.refresh_queue(pools) | |
114 | self.pools = pools | |
115 | ||
116 | self.last_refresh_pools = datetime.now() | |
2a845540 | 117 | return self.REFRESH_DELAY_SECONDS |
9f95a23c | 118 | |
20effc67 | 119 | def load_pool(self, ioctx: rados.Ioctx, pools: Dict[str, Dict[str, str]]) -> None: |
9f95a23c TL |
120 | pool_id = str(ioctx.get_pool_id()) |
121 | pool_name = ioctx.get_pool_name() | |
122 | pools[pool_id] = {} | |
123 | pool_namespaces = [''] | |
124 | ||
125 | self.log.debug("load_pool: {}".format(pool_name)) | |
126 | ||
127 | try: | |
128 | pool_namespaces += rbd.RBD().namespace_list(ioctx) | |
129 | except rbd.OperationNotSupported: | |
130 | self.log.debug("namespaces not supported") | |
1e59de90 TL |
131 | except rbd.ConnectionShutdown: |
132 | raise | |
9f95a23c TL |
133 | except Exception as e: |
134 | self.log.error("exception when scanning pool {}: {}".format( | |
135 | pool_name, e)) | |
136 | ||
137 | for namespace in pool_namespaces: | |
138 | pools[pool_id][namespace] = pool_name | |
139 | ||
20effc67 | 140 | def rebuild_queue(self) -> None: |
2a845540 | 141 | now = datetime.now() |
9f95a23c | 142 | |
2a845540 TL |
143 | # don't remove from queue "due" images |
144 | now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00") | |
9f95a23c | 145 | |
2a845540 TL |
146 | for schedule_time in list(self.queue): |
147 | if schedule_time > now_string: | |
148 | del self.queue[schedule_time] | |
9f95a23c | 149 | |
2a845540 TL |
150 | if not self.schedules: |
151 | return | |
9f95a23c | 152 | |
2a845540 TL |
153 | for pool_id, namespaces in self.pools.items(): |
154 | for namespace in namespaces: | |
155 | self.enqueue(now, pool_id, namespace) | |
9f95a23c | 156 | |
2a845540 | 157 | self.condition.notify() |
9f95a23c | 158 | |
20effc67 | 159 | def refresh_queue(self, current_pools: Dict[str, Dict[str, str]]) -> None: |
9f95a23c TL |
160 | now = datetime.now() |
161 | ||
162 | for pool_id, namespaces in self.pools.items(): | |
163 | for namespace in namespaces: | |
164 | if pool_id not in current_pools or \ | |
165 | namespace not in current_pools[pool_id]: | |
166 | self.remove_from_queue(pool_id, namespace) | |
167 | ||
168 | for pool_id, namespaces in current_pools.items(): | |
169 | for namespace in namespaces: | |
170 | if pool_id not in self.pools or \ | |
171 | namespace not in self.pools[pool_id]: | |
172 | self.enqueue(now, pool_id, namespace) | |
173 | ||
174 | self.condition.notify() | |
175 | ||
20effc67 | 176 | def enqueue(self, now: datetime, pool_id: str, namespace: str) -> None: |
9f95a23c TL |
177 | schedule = self.schedules.find(pool_id, namespace) |
178 | if not schedule: | |
2a845540 TL |
179 | self.log.debug( |
180 | "TrashPurgeScheduleHandler: no schedule for {}/{}".format( | |
181 | pool_id, namespace)) | |
9f95a23c TL |
182 | return |
183 | ||
184 | schedule_time = schedule.next_run(now) | |
185 | if schedule_time not in self.queue: | |
186 | self.queue[schedule_time] = [] | |
2a845540 TL |
187 | self.log.debug( |
188 | "TrashPurgeScheduleHandler: scheduling {}/{} at {}".format( | |
189 | pool_id, namespace, schedule_time)) | |
9f95a23c TL |
190 | ns_spec = (pool_id, namespace) |
191 | if ns_spec not in self.queue[schedule_time]: | |
192 | self.queue[schedule_time].append((pool_id, namespace)) | |
193 | ||
20effc67 | 194 | def dequeue(self) -> Tuple[Optional[Tuple[str, str]], float]: |
9f95a23c | 195 | if not self.queue: |
20effc67 | 196 | return None, 1000.0 |
9f95a23c TL |
197 | |
198 | now = datetime.now() | |
199 | schedule_time = sorted(self.queue)[0] | |
200 | ||
201 | if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time: | |
202 | wait_time = (datetime.strptime(schedule_time, | |
203 | "%Y-%m-%d %H:%M:%S") - now) | |
204 | return None, wait_time.total_seconds() | |
205 | ||
206 | namespaces = self.queue[schedule_time] | |
207 | namespace = namespaces.pop(0) | |
208 | if not namespaces: | |
209 | del self.queue[schedule_time] | |
20effc67 | 210 | return namespace, 0.0 |
9f95a23c | 211 | |
20effc67 | 212 | def remove_from_queue(self, pool_id: str, namespace: str) -> None: |
2a845540 TL |
213 | self.log.debug( |
214 | "TrashPurgeScheduleHandler: descheduling {}/{}".format( | |
215 | pool_id, namespace)) | |
216 | ||
9f95a23c TL |
217 | empty_slots = [] |
218 | for schedule_time, namespaces in self.queue.items(): | |
219 | if (pool_id, namespace) in namespaces: | |
220 | namespaces.remove((pool_id, namespace)) | |
221 | if not namespaces: | |
222 | empty_slots.append(schedule_time) | |
223 | for schedule_time in empty_slots: | |
224 | del self.queue[schedule_time] | |
225 | ||
20effc67 TL |
226 | def add_schedule(self, |
227 | level_spec: LevelSpec, | |
228 | interval: str, | |
229 | start_time: Optional[str]) -> Tuple[int, str, str]: | |
9f95a23c | 230 | self.log.debug( |
2a845540 | 231 | "TrashPurgeScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format( |
9f95a23c TL |
232 | level_spec.name, interval, start_time)) |
233 | ||
2a845540 | 234 | # TODO: optimize to rebuild only affected part of the queue |
9f95a23c TL |
235 | with self.lock: |
236 | self.schedules.add(level_spec, interval, start_time) | |
2a845540 | 237 | self.rebuild_queue() |
9f95a23c TL |
238 | return 0, "", "" |
239 | ||
20effc67 TL |
240 | def remove_schedule(self, |
241 | level_spec: LevelSpec, | |
242 | interval: Optional[str], | |
243 | start_time: Optional[str]) -> Tuple[int, str, str]: | |
9f95a23c | 244 | self.log.debug( |
2a845540 | 245 | "TrashPurgeScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format( |
9f95a23c TL |
246 | level_spec.name, interval, start_time)) |
247 | ||
2a845540 | 248 | # TODO: optimize to rebuild only affected part of the queue |
9f95a23c TL |
249 | with self.lock: |
250 | self.schedules.remove(level_spec, interval, start_time) | |
2a845540 | 251 | self.rebuild_queue() |
9f95a23c TL |
252 | return 0, "", "" |
253 | ||
20effc67 | 254 | def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]: |
2a845540 TL |
255 | self.log.debug( |
256 | "TrashPurgeScheduleHandler: list: level_spec={}".format( | |
257 | level_spec.name)) | |
9f95a23c TL |
258 | |
259 | with self.lock: | |
260 | result = self.schedules.to_list(level_spec) | |
261 | ||
262 | return 0, json.dumps(result, indent=4, sort_keys=True), "" | |
263 | ||
20effc67 | 264 | def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]: |
2a845540 TL |
265 | self.log.debug( |
266 | "TrashPurgeScheduleHandler: status: level_spec={}".format( | |
267 | level_spec.name)) | |
9f95a23c TL |
268 | |
269 | scheduled = [] | |
270 | with self.lock: | |
271 | for schedule_time in sorted(self.queue): | |
272 | for pool_id, namespace in self.queue[schedule_time]: | |
273 | if not level_spec.matches(pool_id, namespace): | |
274 | continue | |
275 | pool_name = self.pools[pool_id][namespace] | |
276 | scheduled.append({ | |
1e59de90 TL |
277 | 'schedule_time': schedule_time, |
278 | 'pool_id': pool_id, | |
279 | 'pool_name': pool_name, | |
280 | 'namespace': namespace | |
9f95a23c | 281 | }) |
1e59de90 | 282 | return 0, json.dumps({'scheduled': scheduled}, indent=4, |
9f95a23c | 283 | sort_keys=True), "" |