]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | import json |
2 | import rados | |
3 | import rbd | |
9f95a23c TL |
4 | import traceback |
5 | ||
6 | from datetime import datetime | |
7 | from threading import Condition, Lock, Thread | |
20effc67 | 8 | from typing import Any, Dict, List, Optional, Tuple |
9f95a23c TL |
9 | |
10 | from .common import get_rbd_pools | |
1e59de90 | 11 | from .schedule import LevelSpec, Schedules |
9f95a23c TL |
12 | |
13 | ||
14 | class TrashPurgeScheduleHandler: | |
15 | MODULE_OPTION_NAME = "trash_purge_schedule" | |
e306af50 | 16 | SCHEDULE_OID = "rbd_trash_purge_schedule" |
2a845540 | 17 | REFRESH_DELAY_SECONDS = 60.0 |
9f95a23c | 18 | |
20effc67 | 19 | def __init__(self, module: Any) -> None: |
aee94f69 TL |
20 | self.lock = Lock() |
21 | self.condition = Condition(self.lock) | |
9f95a23c TL |
22 | self.module = module |
23 | self.log = module.log | |
24 | self.last_refresh_pools = datetime(1970, 1, 1) | |
25 | ||
1e59de90 | 26 | self.stop_thread = False |
9f95a23c | 27 | self.thread = Thread(target=self.run) |
1e59de90 TL |
28 | |
29 | def setup(self) -> None: | |
30 | self.init_schedule_queue() | |
9f95a23c TL |
31 | self.thread.start() |
32 | ||
1e59de90 TL |
33 | def shutdown(self) -> None: |
34 | self.log.info("TrashPurgeScheduleHandler: shutting down") | |
35 | self.stop_thread = True | |
36 | if self.thread.is_alive(): | |
37 | self.log.debug("TrashPurgeScheduleHandler: joining thread") | |
38 | self.thread.join() | |
39 | self.log.info("TrashPurgeScheduleHandler: shut down") | |
40 | ||
20effc67 | 41 | def run(self) -> None: |
9f95a23c TL |
42 | try: |
43 | self.log.info("TrashPurgeScheduleHandler: starting") | |
1e59de90 | 44 | while not self.stop_thread: |
2a845540 | 45 | refresh_delay = self.refresh_pools() |
9f95a23c TL |
46 | with self.lock: |
47 | (ns_spec, wait_time) = self.dequeue() | |
48 | if not ns_spec: | |
2a845540 | 49 | self.condition.wait(min(wait_time, refresh_delay)) |
9f95a23c TL |
50 | continue |
51 | pool_id, namespace = ns_spec | |
52 | self.trash_purge(pool_id, namespace) | |
53 | with self.lock: | |
54 | self.enqueue(datetime.now(), pool_id, namespace) | |
55 | ||
1e59de90 TL |
56 | except (rados.ConnectionShutdown, rbd.ConnectionShutdown): |
57 | self.log.exception("TrashPurgeScheduleHandler: client blocklisted") | |
58 | self.module.client_blocklisted.set() | |
9f95a23c TL |
59 | except Exception as ex: |
60 | self.log.fatal("Fatal runtime error: {}\n{}".format( | |
61 | ex, traceback.format_exc())) | |
62 | ||
20effc67 | 63 | def trash_purge(self, pool_id: str, namespace: str) -> None: |
9f95a23c TL |
64 | try: |
65 | with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: | |
66 | ioctx.set_namespace(namespace) | |
67 | rbd.RBD().trash_purge(ioctx, datetime.now()) | |
1e59de90 TL |
68 | except (rados.ConnectionShutdown, rbd.ConnectionShutdown): |
69 | raise | |
9f95a23c | 70 | except Exception as e: |
1e59de90 | 71 | self.log.error("exception when purging {}/{}: {}".format( |
9f95a23c TL |
72 | pool_id, namespace, e)) |
73 | ||
20effc67 TL |
74 | def init_schedule_queue(self) -> None: |
75 | self.queue: Dict[str, List[Tuple[str, str]]] = {} | |
76 | # pool_id => {namespace => pool_name} | |
77 | self.pools: Dict[str, Dict[str, str]] = {} | |
39ae355f | 78 | self.schedules = Schedules(self) |
9f95a23c | 79 | self.refresh_pools() |
2a845540 | 80 | self.log.debug("TrashPurgeScheduleHandler: queue is initialized") |
9f95a23c | 81 | |
20effc67 | 82 | def load_schedules(self) -> None: |
9f95a23c | 83 | self.log.info("TrashPurgeScheduleHandler: load_schedules") |
39ae355f | 84 | self.schedules.load() |
9f95a23c | 85 | |
2a845540 TL |
86 | def refresh_pools(self) -> float: |
87 | elapsed = (datetime.now() - self.last_refresh_pools).total_seconds() | |
88 | if elapsed < self.REFRESH_DELAY_SECONDS: | |
89 | return self.REFRESH_DELAY_SECONDS - elapsed | |
9f95a23c TL |
90 | |
91 | self.log.debug("TrashPurgeScheduleHandler: refresh_pools") | |
92 | ||
2a845540 TL |
93 | with self.lock: |
94 | self.load_schedules() | |
95 | if not self.schedules: | |
96 | self.log.debug("TrashPurgeScheduleHandler: no schedules") | |
97 | self.pools = {} | |
98 | self.queue = {} | |
99 | self.last_refresh_pools = datetime.now() | |
100 | return self.REFRESH_DELAY_SECONDS | |
9f95a23c | 101 | |
20effc67 | 102 | pools: Dict[str, Dict[str, str]] = {} |
9f95a23c TL |
103 | |
104 | for pool_id, pool_name in get_rbd_pools(self.module).items(): | |
105 | if not self.schedules.intersects( | |
106 | LevelSpec.from_pool_spec(pool_id, pool_name)): | |
107 | continue | |
108 | with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: | |
109 | self.load_pool(ioctx, pools) | |
110 | ||
111 | with self.lock: | |
112 | self.refresh_queue(pools) | |
113 | self.pools = pools | |
114 | ||
115 | self.last_refresh_pools = datetime.now() | |
2a845540 | 116 | return self.REFRESH_DELAY_SECONDS |
9f95a23c | 117 | |
20effc67 | 118 | def load_pool(self, ioctx: rados.Ioctx, pools: Dict[str, Dict[str, str]]) -> None: |
9f95a23c TL |
119 | pool_id = str(ioctx.get_pool_id()) |
120 | pool_name = ioctx.get_pool_name() | |
121 | pools[pool_id] = {} | |
122 | pool_namespaces = [''] | |
123 | ||
124 | self.log.debug("load_pool: {}".format(pool_name)) | |
125 | ||
126 | try: | |
127 | pool_namespaces += rbd.RBD().namespace_list(ioctx) | |
128 | except rbd.OperationNotSupported: | |
129 | self.log.debug("namespaces not supported") | |
1e59de90 TL |
130 | except rbd.ConnectionShutdown: |
131 | raise | |
9f95a23c TL |
132 | except Exception as e: |
133 | self.log.error("exception when scanning pool {}: {}".format( | |
134 | pool_name, e)) | |
135 | ||
136 | for namespace in pool_namespaces: | |
137 | pools[pool_id][namespace] = pool_name | |
138 | ||
20effc67 | 139 | def rebuild_queue(self) -> None: |
2a845540 | 140 | now = datetime.now() |
9f95a23c | 141 | |
2a845540 TL |
142 | # don't remove from queue "due" images |
143 | now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00") | |
9f95a23c | 144 | |
2a845540 TL |
145 | for schedule_time in list(self.queue): |
146 | if schedule_time > now_string: | |
147 | del self.queue[schedule_time] | |
9f95a23c | 148 | |
2a845540 TL |
149 | if not self.schedules: |
150 | return | |
9f95a23c | 151 | |
2a845540 TL |
152 | for pool_id, namespaces in self.pools.items(): |
153 | for namespace in namespaces: | |
154 | self.enqueue(now, pool_id, namespace) | |
9f95a23c | 155 | |
2a845540 | 156 | self.condition.notify() |
9f95a23c | 157 | |
20effc67 | 158 | def refresh_queue(self, current_pools: Dict[str, Dict[str, str]]) -> None: |
9f95a23c TL |
159 | now = datetime.now() |
160 | ||
161 | for pool_id, namespaces in self.pools.items(): | |
162 | for namespace in namespaces: | |
163 | if pool_id not in current_pools or \ | |
164 | namespace not in current_pools[pool_id]: | |
165 | self.remove_from_queue(pool_id, namespace) | |
166 | ||
167 | for pool_id, namespaces in current_pools.items(): | |
168 | for namespace in namespaces: | |
169 | if pool_id not in self.pools or \ | |
170 | namespace not in self.pools[pool_id]: | |
171 | self.enqueue(now, pool_id, namespace) | |
172 | ||
173 | self.condition.notify() | |
174 | ||
20effc67 | 175 | def enqueue(self, now: datetime, pool_id: str, namespace: str) -> None: |
9f95a23c TL |
176 | schedule = self.schedules.find(pool_id, namespace) |
177 | if not schedule: | |
2a845540 TL |
178 | self.log.debug( |
179 | "TrashPurgeScheduleHandler: no schedule for {}/{}".format( | |
180 | pool_id, namespace)) | |
9f95a23c TL |
181 | return |
182 | ||
183 | schedule_time = schedule.next_run(now) | |
184 | if schedule_time not in self.queue: | |
185 | self.queue[schedule_time] = [] | |
2a845540 TL |
186 | self.log.debug( |
187 | "TrashPurgeScheduleHandler: scheduling {}/{} at {}".format( | |
188 | pool_id, namespace, schedule_time)) | |
9f95a23c TL |
189 | ns_spec = (pool_id, namespace) |
190 | if ns_spec not in self.queue[schedule_time]: | |
191 | self.queue[schedule_time].append((pool_id, namespace)) | |
192 | ||
20effc67 | 193 | def dequeue(self) -> Tuple[Optional[Tuple[str, str]], float]: |
9f95a23c | 194 | if not self.queue: |
20effc67 | 195 | return None, 1000.0 |
9f95a23c TL |
196 | |
197 | now = datetime.now() | |
198 | schedule_time = sorted(self.queue)[0] | |
199 | ||
200 | if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time: | |
201 | wait_time = (datetime.strptime(schedule_time, | |
202 | "%Y-%m-%d %H:%M:%S") - now) | |
203 | return None, wait_time.total_seconds() | |
204 | ||
205 | namespaces = self.queue[schedule_time] | |
206 | namespace = namespaces.pop(0) | |
207 | if not namespaces: | |
208 | del self.queue[schedule_time] | |
20effc67 | 209 | return namespace, 0.0 |
9f95a23c | 210 | |
20effc67 | 211 | def remove_from_queue(self, pool_id: str, namespace: str) -> None: |
2a845540 TL |
212 | self.log.debug( |
213 | "TrashPurgeScheduleHandler: descheduling {}/{}".format( | |
214 | pool_id, namespace)) | |
215 | ||
9f95a23c TL |
216 | empty_slots = [] |
217 | for schedule_time, namespaces in self.queue.items(): | |
218 | if (pool_id, namespace) in namespaces: | |
219 | namespaces.remove((pool_id, namespace)) | |
220 | if not namespaces: | |
221 | empty_slots.append(schedule_time) | |
222 | for schedule_time in empty_slots: | |
223 | del self.queue[schedule_time] | |
224 | ||
20effc67 TL |
225 | def add_schedule(self, |
226 | level_spec: LevelSpec, | |
227 | interval: str, | |
228 | start_time: Optional[str]) -> Tuple[int, str, str]: | |
9f95a23c | 229 | self.log.debug( |
2a845540 | 230 | "TrashPurgeScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format( |
9f95a23c TL |
231 | level_spec.name, interval, start_time)) |
232 | ||
2a845540 | 233 | # TODO: optimize to rebuild only affected part of the queue |
9f95a23c TL |
234 | with self.lock: |
235 | self.schedules.add(level_spec, interval, start_time) | |
2a845540 | 236 | self.rebuild_queue() |
9f95a23c TL |
237 | return 0, "", "" |
238 | ||
20effc67 TL |
239 | def remove_schedule(self, |
240 | level_spec: LevelSpec, | |
241 | interval: Optional[str], | |
242 | start_time: Optional[str]) -> Tuple[int, str, str]: | |
9f95a23c | 243 | self.log.debug( |
2a845540 | 244 | "TrashPurgeScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format( |
9f95a23c TL |
245 | level_spec.name, interval, start_time)) |
246 | ||
2a845540 | 247 | # TODO: optimize to rebuild only affected part of the queue |
9f95a23c TL |
248 | with self.lock: |
249 | self.schedules.remove(level_spec, interval, start_time) | |
2a845540 | 250 | self.rebuild_queue() |
9f95a23c TL |
251 | return 0, "", "" |
252 | ||
20effc67 | 253 | def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]: |
2a845540 TL |
254 | self.log.debug( |
255 | "TrashPurgeScheduleHandler: list: level_spec={}".format( | |
256 | level_spec.name)) | |
9f95a23c TL |
257 | |
258 | with self.lock: | |
259 | result = self.schedules.to_list(level_spec) | |
260 | ||
261 | return 0, json.dumps(result, indent=4, sort_keys=True), "" | |
262 | ||
20effc67 | 263 | def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]: |
2a845540 TL |
264 | self.log.debug( |
265 | "TrashPurgeScheduleHandler: status: level_spec={}".format( | |
266 | level_spec.name)) | |
9f95a23c TL |
267 | |
268 | scheduled = [] | |
269 | with self.lock: | |
270 | for schedule_time in sorted(self.queue): | |
271 | for pool_id, namespace in self.queue[schedule_time]: | |
272 | if not level_spec.matches(pool_id, namespace): | |
273 | continue | |
274 | pool_name = self.pools[pool_id][namespace] | |
275 | scheduled.append({ | |
1e59de90 TL |
276 | 'schedule_time': schedule_time, |
277 | 'pool_id': pool_id, | |
278 | 'pool_name': pool_name, | |
279 | 'namespace': namespace | |
9f95a23c | 280 | }) |
1e59de90 | 281 | return 0, json.dumps({'scheduled': scheduled}, indent=4, |
9f95a23c | 282 | sort_keys=True), "" |