]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | import errno |
2 | import json | |
3 | import rados | |
4 | import rbd | |
5 | import re | |
6 | import traceback | |
7 | ||
8 | from datetime import datetime | |
9 | from threading import Condition, Lock, Thread | |
10 | ||
11 | from .common import get_rbd_pools | |
12 | from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules | |
13 | ||
14 | ||
15 | class TrashPurgeScheduleHandler: | |
16 | MODULE_OPTION_NAME = "trash_purge_schedule" | |
e306af50 | 17 | SCHEDULE_OID = "rbd_trash_purge_schedule" |
9f95a23c TL |
18 | |
19 | lock = Lock() | |
20 | condition = Condition(lock) | |
21 | thread = None | |
22 | ||
23 | def __init__(self, module): | |
24 | self.module = module | |
25 | self.log = module.log | |
26 | self.last_refresh_pools = datetime(1970, 1, 1) | |
27 | ||
28 | self.init_schedule_queue() | |
29 | ||
30 | self.thread = Thread(target=self.run) | |
31 | self.thread.start() | |
32 | ||
33 | def run(self): | |
34 | try: | |
35 | self.log.info("TrashPurgeScheduleHandler: starting") | |
36 | while True: | |
37 | self.refresh_pools() | |
38 | with self.lock: | |
39 | (ns_spec, wait_time) = self.dequeue() | |
40 | if not ns_spec: | |
41 | self.condition.wait(min(wait_time, 60)) | |
42 | continue | |
43 | pool_id, namespace = ns_spec | |
44 | self.trash_purge(pool_id, namespace) | |
45 | with self.lock: | |
46 | self.enqueue(datetime.now(), pool_id, namespace) | |
47 | ||
48 | except Exception as ex: | |
49 | self.log.fatal("Fatal runtime error: {}\n{}".format( | |
50 | ex, traceback.format_exc())) | |
51 | ||
52 | def trash_purge(self, pool_id, namespace): | |
53 | try: | |
54 | with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: | |
55 | ioctx.set_namespace(namespace) | |
56 | rbd.RBD().trash_purge(ioctx, datetime.now()) | |
57 | except Exception as e: | |
58 | self.log.error("exception when purgin {}/{}: {}".format( | |
59 | pool_id, namespace, e)) | |
60 | ||
61 | ||
62 | def init_schedule_queue(self): | |
63 | self.queue = {} | |
64 | self.pools = {} | |
65 | self.refresh_pools() | |
66 | self.log.debug("scheduler queue is initialized") | |
67 | ||
68 | def load_schedules(self): | |
69 | self.log.info("TrashPurgeScheduleHandler: load_schedules") | |
70 | ||
71 | schedules = Schedules(self) | |
72 | schedules.load() | |
73 | with self.lock: | |
74 | self.schedules = schedules | |
75 | ||
76 | def refresh_pools(self): | |
77 | if (datetime.now() - self.last_refresh_pools).seconds < 60: | |
78 | return | |
79 | ||
80 | self.log.debug("TrashPurgeScheduleHandler: refresh_pools") | |
81 | ||
82 | self.load_schedules() | |
83 | ||
84 | pools = {} | |
85 | ||
86 | for pool_id, pool_name in get_rbd_pools(self.module).items(): | |
87 | if not self.schedules.intersects( | |
88 | LevelSpec.from_pool_spec(pool_id, pool_name)): | |
89 | continue | |
90 | with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: | |
91 | self.load_pool(ioctx, pools) | |
92 | ||
93 | with self.lock: | |
94 | self.refresh_queue(pools) | |
95 | self.pools = pools | |
96 | ||
97 | self.last_refresh_pools = datetime.now() | |
98 | ||
99 | def load_pool(self, ioctx, pools): | |
100 | pool_id = str(ioctx.get_pool_id()) | |
101 | pool_name = ioctx.get_pool_name() | |
102 | pools[pool_id] = {} | |
103 | pool_namespaces = [''] | |
104 | ||
105 | self.log.debug("load_pool: {}".format(pool_name)) | |
106 | ||
107 | try: | |
108 | pool_namespaces += rbd.RBD().namespace_list(ioctx) | |
109 | except rbd.OperationNotSupported: | |
110 | self.log.debug("namespaces not supported") | |
111 | except Exception as e: | |
112 | self.log.error("exception when scanning pool {}: {}".format( | |
113 | pool_name, e)) | |
114 | ||
115 | for namespace in pool_namespaces: | |
116 | pools[pool_id][namespace] = pool_name | |
117 | ||
118 | def rebuild_queue(self): | |
119 | with self.lock: | |
120 | now = datetime.now() | |
121 | ||
122 | # don't remove from queue "due" images | |
123 | now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00") | |
124 | ||
125 | for schedule_time in list(self.queue): | |
126 | if schedule_time > now_string: | |
127 | del self.queue[schedule_time] | |
128 | ||
129 | if not self.schedules: | |
130 | return | |
131 | ||
132 | for pool_id, namespaces in self.pools.items(): | |
133 | for namespace in namespaces: | |
134 | self.enqueue(now, pool_id, namespace) | |
135 | ||
136 | self.condition.notify() | |
137 | ||
138 | def refresh_queue(self, current_pools): | |
139 | now = datetime.now() | |
140 | ||
141 | for pool_id, namespaces in self.pools.items(): | |
142 | for namespace in namespaces: | |
143 | if pool_id not in current_pools or \ | |
144 | namespace not in current_pools[pool_id]: | |
145 | self.remove_from_queue(pool_id, namespace) | |
146 | ||
147 | for pool_id, namespaces in current_pools.items(): | |
148 | for namespace in namespaces: | |
149 | if pool_id not in self.pools or \ | |
150 | namespace not in self.pools[pool_id]: | |
151 | self.enqueue(now, pool_id, namespace) | |
152 | ||
153 | self.condition.notify() | |
154 | ||
155 | def enqueue(self, now, pool_id, namespace): | |
156 | ||
157 | schedule = self.schedules.find(pool_id, namespace) | |
158 | if not schedule: | |
159 | return | |
160 | ||
161 | schedule_time = schedule.next_run(now) | |
162 | if schedule_time not in self.queue: | |
163 | self.queue[schedule_time] = [] | |
164 | self.log.debug("schedule {}/{} at {}".format( | |
165 | pool_id, namespace, schedule_time)) | |
166 | ns_spec = (pool_id, namespace) | |
167 | if ns_spec not in self.queue[schedule_time]: | |
168 | self.queue[schedule_time].append((pool_id, namespace)) | |
169 | ||
170 | def dequeue(self): | |
171 | if not self.queue: | |
172 | return None, 1000 | |
173 | ||
174 | now = datetime.now() | |
175 | schedule_time = sorted(self.queue)[0] | |
176 | ||
177 | if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time: | |
178 | wait_time = (datetime.strptime(schedule_time, | |
179 | "%Y-%m-%d %H:%M:%S") - now) | |
180 | return None, wait_time.total_seconds() | |
181 | ||
182 | namespaces = self.queue[schedule_time] | |
183 | namespace = namespaces.pop(0) | |
184 | if not namespaces: | |
185 | del self.queue[schedule_time] | |
186 | return namespace, 0 | |
187 | ||
188 | def remove_from_queue(self, pool_id, namespace): | |
189 | empty_slots = [] | |
190 | for schedule_time, namespaces in self.queue.items(): | |
191 | if (pool_id, namespace) in namespaces: | |
192 | namespaces.remove((pool_id, namespace)) | |
193 | if not namespaces: | |
194 | empty_slots.append(schedule_time) | |
195 | for schedule_time in empty_slots: | |
196 | del self.queue[schedule_time] | |
197 | ||
198 | def add_schedule(self, level_spec, interval, start_time): | |
199 | self.log.debug( | |
200 | "add_schedule: level_spec={}, interval={}, start_time={}".format( | |
201 | level_spec.name, interval, start_time)) | |
202 | ||
203 | with self.lock: | |
204 | self.schedules.add(level_spec, interval, start_time) | |
205 | ||
206 | # TODO: optimize to rebuild only affected part of the queue | |
207 | self.rebuild_queue() | |
208 | return 0, "", "" | |
209 | ||
210 | def remove_schedule(self, level_spec, interval, start_time): | |
211 | self.log.debug( | |
212 | "remove_schedule: level_spec={}, interval={}, start_time={}".format( | |
213 | level_spec.name, interval, start_time)) | |
214 | ||
215 | with self.lock: | |
216 | self.schedules.remove(level_spec, interval, start_time) | |
217 | ||
218 | # TODO: optimize to rebuild only affected part of the queue | |
219 | self.rebuild_queue() | |
220 | return 0, "", "" | |
221 | ||
222 | def list(self, level_spec): | |
223 | self.log.debug("list: level_spec={}".format(level_spec.name)) | |
224 | ||
225 | with self.lock: | |
226 | result = self.schedules.to_list(level_spec) | |
227 | ||
228 | return 0, json.dumps(result, indent=4, sort_keys=True), "" | |
229 | ||
230 | def status(self, level_spec): | |
231 | self.log.debug("status: level_spec={}".format(level_spec.name)) | |
232 | ||
233 | scheduled = [] | |
234 | with self.lock: | |
235 | for schedule_time in sorted(self.queue): | |
236 | for pool_id, namespace in self.queue[schedule_time]: | |
237 | if not level_spec.matches(pool_id, namespace): | |
238 | continue | |
239 | pool_name = self.pools[pool_id][namespace] | |
240 | scheduled.append({ | |
241 | 'schedule_time' : schedule_time, | |
242 | 'pool_id' : pool_id, | |
243 | 'pool_name' : pool_name, | |
244 | 'namespace' : namespace | |
245 | }) | |
246 | return 0, json.dumps({'scheduled' : scheduled}, indent=4, | |
247 | sort_keys=True), "" | |
248 | ||
249 | def handle_command(self, inbuf, prefix, cmd): | |
250 | level_spec_name = cmd.get('level_spec', "") | |
251 | ||
252 | try: | |
253 | level_spec = LevelSpec.from_name(self, level_spec_name, | |
254 | allow_image_level=False) | |
255 | except ValueError as e: | |
256 | return -errno.EINVAL, '', "Invalid level spec {}: {}".format( | |
257 | level_spec_name, e) | |
258 | ||
259 | if prefix == 'add': | |
260 | return self.add_schedule(level_spec, cmd['interval'], | |
261 | cmd.get('start_time')) | |
262 | elif prefix == 'remove': | |
263 | return self.remove_schedule(level_spec, cmd.get('interval'), | |
264 | cmd.get('start_time')) | |
265 | elif prefix == 'list': | |
266 | return self.list(level_spec) | |
267 | elif prefix == 'status': | |
268 | return self.status(level_spec) | |
269 | ||
270 | raise NotImplementedError(cmd['prefix']) |