]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | import errno |
2 | import json | |
3 | import rados | |
4 | import rbd | |
5 | import re | |
6 | import traceback | |
7 | ||
8 | from datetime import datetime | |
9 | from threading import Condition, Lock, Thread | |
20effc67 | 10 | from typing import Any, Dict, List, Optional, Tuple |
9f95a23c TL |
11 | |
12 | from .common import get_rbd_pools | |
13 | from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules | |
14 | ||
15 | ||
16 | class TrashPurgeScheduleHandler: | |
17 | MODULE_OPTION_NAME = "trash_purge_schedule" | |
e306af50 | 18 | SCHEDULE_OID = "rbd_trash_purge_schedule" |
9f95a23c TL |
19 | |
20 | lock = Lock() | |
21 | condition = Condition(lock) | |
22 | thread = None | |
23 | ||
20effc67 | 24 | def __init__(self, module: Any) -> None: |
9f95a23c TL |
25 | self.module = module |
26 | self.log = module.log | |
27 | self.last_refresh_pools = datetime(1970, 1, 1) | |
28 | ||
29 | self.init_schedule_queue() | |
30 | ||
31 | self.thread = Thread(target=self.run) | |
32 | self.thread.start() | |
33 | ||
20effc67 | 34 | def run(self) -> None: |
9f95a23c TL |
35 | try: |
36 | self.log.info("TrashPurgeScheduleHandler: starting") | |
37 | while True: | |
38 | self.refresh_pools() | |
39 | with self.lock: | |
40 | (ns_spec, wait_time) = self.dequeue() | |
41 | if not ns_spec: | |
42 | self.condition.wait(min(wait_time, 60)) | |
43 | continue | |
44 | pool_id, namespace = ns_spec | |
45 | self.trash_purge(pool_id, namespace) | |
46 | with self.lock: | |
47 | self.enqueue(datetime.now(), pool_id, namespace) | |
48 | ||
49 | except Exception as ex: | |
50 | self.log.fatal("Fatal runtime error: {}\n{}".format( | |
51 | ex, traceback.format_exc())) | |
52 | ||
20effc67 | 53 | def trash_purge(self, pool_id: str, namespace: str) -> None: |
9f95a23c TL |
54 | try: |
55 | with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: | |
56 | ioctx.set_namespace(namespace) | |
57 | rbd.RBD().trash_purge(ioctx, datetime.now()) | |
58 | except Exception as e: | |
59 | self.log.error("exception when purgin {}/{}: {}".format( | |
60 | pool_id, namespace, e)) | |
61 | ||
20effc67 TL |
62 | def init_schedule_queue(self) -> None: |
63 | self.queue: Dict[str, List[Tuple[str, str]]] = {} | |
64 | # pool_id => {namespace => pool_name} | |
65 | self.pools: Dict[str, Dict[str, str]] = {} | |
9f95a23c TL |
66 | self.refresh_pools() |
67 | self.log.debug("scheduler queue is initialized") | |
68 | ||
20effc67 | 69 | def load_schedules(self) -> None: |
9f95a23c TL |
70 | self.log.info("TrashPurgeScheduleHandler: load_schedules") |
71 | ||
72 | schedules = Schedules(self) | |
73 | schedules.load() | |
74 | with self.lock: | |
75 | self.schedules = schedules | |
76 | ||
20effc67 | 77 | def refresh_pools(self) -> None: |
9f95a23c TL |
78 | if (datetime.now() - self.last_refresh_pools).seconds < 60: |
79 | return | |
80 | ||
81 | self.log.debug("TrashPurgeScheduleHandler: refresh_pools") | |
82 | ||
83 | self.load_schedules() | |
84 | ||
20effc67 | 85 | pools: Dict[str, Dict[str, str]] = {} |
9f95a23c TL |
86 | |
87 | for pool_id, pool_name in get_rbd_pools(self.module).items(): | |
88 | if not self.schedules.intersects( | |
89 | LevelSpec.from_pool_spec(pool_id, pool_name)): | |
90 | continue | |
91 | with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: | |
92 | self.load_pool(ioctx, pools) | |
93 | ||
94 | with self.lock: | |
95 | self.refresh_queue(pools) | |
96 | self.pools = pools | |
97 | ||
98 | self.last_refresh_pools = datetime.now() | |
99 | ||
20effc67 | 100 | def load_pool(self, ioctx: rados.Ioctx, pools: Dict[str, Dict[str, str]]) -> None: |
9f95a23c TL |
101 | pool_id = str(ioctx.get_pool_id()) |
102 | pool_name = ioctx.get_pool_name() | |
103 | pools[pool_id] = {} | |
104 | pool_namespaces = [''] | |
105 | ||
106 | self.log.debug("load_pool: {}".format(pool_name)) | |
107 | ||
108 | try: | |
109 | pool_namespaces += rbd.RBD().namespace_list(ioctx) | |
110 | except rbd.OperationNotSupported: | |
111 | self.log.debug("namespaces not supported") | |
112 | except Exception as e: | |
113 | self.log.error("exception when scanning pool {}: {}".format( | |
114 | pool_name, e)) | |
115 | ||
116 | for namespace in pool_namespaces: | |
117 | pools[pool_id][namespace] = pool_name | |
118 | ||
20effc67 | 119 | def rebuild_queue(self) -> None: |
9f95a23c TL |
120 | with self.lock: |
121 | now = datetime.now() | |
122 | ||
123 | # don't remove from queue "due" images | |
124 | now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00") | |
125 | ||
126 | for schedule_time in list(self.queue): | |
127 | if schedule_time > now_string: | |
128 | del self.queue[schedule_time] | |
129 | ||
130 | if not self.schedules: | |
131 | return | |
132 | ||
133 | for pool_id, namespaces in self.pools.items(): | |
134 | for namespace in namespaces: | |
135 | self.enqueue(now, pool_id, namespace) | |
136 | ||
137 | self.condition.notify() | |
138 | ||
20effc67 | 139 | def refresh_queue(self, current_pools: Dict[str, Dict[str, str]]) -> None: |
9f95a23c TL |
140 | now = datetime.now() |
141 | ||
142 | for pool_id, namespaces in self.pools.items(): | |
143 | for namespace in namespaces: | |
144 | if pool_id not in current_pools or \ | |
145 | namespace not in current_pools[pool_id]: | |
146 | self.remove_from_queue(pool_id, namespace) | |
147 | ||
148 | for pool_id, namespaces in current_pools.items(): | |
149 | for namespace in namespaces: | |
150 | if pool_id not in self.pools or \ | |
151 | namespace not in self.pools[pool_id]: | |
152 | self.enqueue(now, pool_id, namespace) | |
153 | ||
154 | self.condition.notify() | |
155 | ||
20effc67 | 156 | def enqueue(self, now: datetime, pool_id: str, namespace: str) -> None: |
9f95a23c TL |
157 | |
158 | schedule = self.schedules.find(pool_id, namespace) | |
159 | if not schedule: | |
160 | return | |
161 | ||
162 | schedule_time = schedule.next_run(now) | |
163 | if schedule_time not in self.queue: | |
164 | self.queue[schedule_time] = [] | |
165 | self.log.debug("schedule {}/{} at {}".format( | |
166 | pool_id, namespace, schedule_time)) | |
167 | ns_spec = (pool_id, namespace) | |
168 | if ns_spec not in self.queue[schedule_time]: | |
169 | self.queue[schedule_time].append((pool_id, namespace)) | |
170 | ||
20effc67 | 171 | def dequeue(self) -> Tuple[Optional[Tuple[str, str]], float]: |
9f95a23c | 172 | if not self.queue: |
20effc67 | 173 | return None, 1000.0 |
9f95a23c TL |
174 | |
175 | now = datetime.now() | |
176 | schedule_time = sorted(self.queue)[0] | |
177 | ||
178 | if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time: | |
179 | wait_time = (datetime.strptime(schedule_time, | |
180 | "%Y-%m-%d %H:%M:%S") - now) | |
181 | return None, wait_time.total_seconds() | |
182 | ||
183 | namespaces = self.queue[schedule_time] | |
184 | namespace = namespaces.pop(0) | |
185 | if not namespaces: | |
186 | del self.queue[schedule_time] | |
20effc67 | 187 | return namespace, 0.0 |
9f95a23c | 188 | |
20effc67 | 189 | def remove_from_queue(self, pool_id: str, namespace: str) -> None: |
9f95a23c TL |
190 | empty_slots = [] |
191 | for schedule_time, namespaces in self.queue.items(): | |
192 | if (pool_id, namespace) in namespaces: | |
193 | namespaces.remove((pool_id, namespace)) | |
194 | if not namespaces: | |
195 | empty_slots.append(schedule_time) | |
196 | for schedule_time in empty_slots: | |
197 | del self.queue[schedule_time] | |
198 | ||
20effc67 TL |
199 | def add_schedule(self, |
200 | level_spec: LevelSpec, | |
201 | interval: str, | |
202 | start_time: Optional[str]) -> Tuple[int, str, str]: | |
9f95a23c TL |
203 | self.log.debug( |
204 | "add_schedule: level_spec={}, interval={}, start_time={}".format( | |
205 | level_spec.name, interval, start_time)) | |
206 | ||
207 | with self.lock: | |
208 | self.schedules.add(level_spec, interval, start_time) | |
209 | ||
210 | # TODO: optimize to rebuild only affected part of the queue | |
211 | self.rebuild_queue() | |
212 | return 0, "", "" | |
213 | ||
20effc67 TL |
214 | def remove_schedule(self, |
215 | level_spec: LevelSpec, | |
216 | interval: Optional[str], | |
217 | start_time: Optional[str]) -> Tuple[int, str, str]: | |
9f95a23c TL |
218 | self.log.debug( |
219 | "remove_schedule: level_spec={}, interval={}, start_time={}".format( | |
220 | level_spec.name, interval, start_time)) | |
221 | ||
222 | with self.lock: | |
223 | self.schedules.remove(level_spec, interval, start_time) | |
224 | ||
225 | # TODO: optimize to rebuild only affected part of the queue | |
226 | self.rebuild_queue() | |
227 | return 0, "", "" | |
228 | ||
20effc67 | 229 | def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]: |
9f95a23c TL |
230 | self.log.debug("list: level_spec={}".format(level_spec.name)) |
231 | ||
232 | with self.lock: | |
233 | result = self.schedules.to_list(level_spec) | |
234 | ||
235 | return 0, json.dumps(result, indent=4, sort_keys=True), "" | |
236 | ||
20effc67 | 237 | def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]: |
9f95a23c TL |
238 | self.log.debug("status: level_spec={}".format(level_spec.name)) |
239 | ||
240 | scheduled = [] | |
241 | with self.lock: | |
242 | for schedule_time in sorted(self.queue): | |
243 | for pool_id, namespace in self.queue[schedule_time]: | |
244 | if not level_spec.matches(pool_id, namespace): | |
245 | continue | |
246 | pool_name = self.pools[pool_id][namespace] | |
247 | scheduled.append({ | |
248 | 'schedule_time' : schedule_time, | |
249 | 'pool_id' : pool_id, | |
250 | 'pool_name' : pool_name, | |
251 | 'namespace' : namespace | |
252 | }) | |
253 | return 0, json.dumps({'scheduled' : scheduled}, indent=4, | |
254 | sort_keys=True), "" |