]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | import errno |
2 | import json | |
3 | import rados | |
4 | import rbd | |
5 | import re | |
6 | import traceback | |
7 | ||
8 | from datetime import datetime | |
9 | from threading import Condition, Lock, Thread | |
20effc67 | 10 | from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Set, Tuple, Union |
9f95a23c TL |
11 | |
12 | from .common import get_rbd_pools | |
13 | from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules | |
14 | ||
cd265ab1 TL |
15 | MIRRORING_OID = "rbd_mirroring" |
16 | ||
20effc67 | 17 | def namespace_validator(ioctx: rados.Ioctx) -> None: |
9f95a23c TL |
18 | mode = rbd.RBD().mirror_mode_get(ioctx) |
19 | if mode != rbd.RBD_MIRROR_MODE_IMAGE: | |
20 | raise ValueError("namespace {} is not in mirror image mode".format( | |
21 | ioctx.get_namespace())) | |
22 | ||
20effc67 | 23 | def image_validator(image: rbd.Image) -> None: |
9f95a23c TL |
24 | mode = image.mirror_image_get_mode() |
25 | if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: | |
26 | raise rbd.InvalidArgument("Invalid mirror image mode") | |
27 | ||
cd265ab1 TL |
28 | class Watchers: |
29 | ||
30 | lock = Lock() | |
31 | ||
20effc67 | 32 | def __init__(self, handler: Any) -> None: |
cd265ab1 TL |
33 | self.rados = handler.module.rados |
34 | self.log = handler.log | |
20effc67 TL |
35 | self.watchers: Dict[Tuple[str, str], rados.Watch] = {} |
36 | self.updated: Dict[int, bool] = {} | |
37 | self.error: Dict[int, str] = {} | |
38 | self.epoch: Dict[int, int] = {} | |
cd265ab1 | 39 | |
20effc67 | 40 | def __del__(self) -> None: |
cd265ab1 TL |
41 | self.unregister_all() |
42 | ||
20effc67 | 43 | def _clean_watcher(self, pool_id: str, namespace: str, watch_id: int) -> None: |
cd265ab1 TL |
44 | assert self.lock.locked() |
45 | ||
46 | del self.watchers[pool_id, namespace] | |
47 | self.updated.pop(watch_id, None) | |
48 | self.error.pop(watch_id, None) | |
49 | self.epoch.pop(watch_id, None) | |
50 | ||
20effc67 | 51 | def check(self, pool_id: str, namespace: str, epoch: int) -> bool: |
cd265ab1 TL |
52 | error = None |
53 | with self.lock: | |
54 | watch = self.watchers.get((pool_id, namespace)) | |
55 | if watch is not None: | |
56 | error = self.error.get(watch.get_id()) | |
57 | if not error: | |
58 | updated = self.updated[watch.get_id()] | |
59 | self.updated[watch.get_id()] = False | |
60 | self.epoch[watch.get_id()] = epoch | |
61 | return updated | |
62 | if error: | |
63 | self.unregister(pool_id, namespace) | |
64 | ||
65 | if self.register(pool_id, namespace): | |
66 | return self.check(pool_id, namespace, epoch) | |
67 | else: | |
68 | return True | |
69 | ||
20effc67 | 70 | def register(self, pool_id: str, namespace: str) -> bool: |
cd265ab1 | 71 | |
20effc67 | 72 | def callback(notify_id: str, notifier_id: str, watch_id: int, data: str) -> None: |
cd265ab1 TL |
73 | self.log.debug("watcher {}: got notify {} from {}".format( |
74 | watch_id, notify_id, notifier_id)) | |
75 | ||
76 | with self.lock: | |
77 | self.updated[watch_id] = True | |
78 | ||
20effc67 | 79 | def error_callback(watch_id: int, error: str) -> None: |
cd265ab1 TL |
80 | self.log.debug("watcher {}: got errror {}".format( |
81 | watch_id, error)) | |
82 | ||
83 | with self.lock: | |
84 | self.error[watch_id] = error | |
85 | ||
86 | try: | |
87 | ioctx = self.rados.open_ioctx2(int(pool_id)) | |
88 | ioctx.set_namespace(namespace) | |
89 | watch = ioctx.watch(MIRRORING_OID, callback, error_callback) | |
90 | except rados.ObjectNotFound: | |
91 | self.log.debug( | |
92 | "{}/{}/{} watcher not registered: object not found".format( | |
93 | pool_id, namespace, MIRRORING_OID)) | |
94 | return False | |
95 | ||
96 | self.log.debug("{}/{}/{} watcher {} registered".format( | |
97 | pool_id, namespace, MIRRORING_OID, watch.get_id())) | |
98 | ||
99 | with self.lock: | |
100 | self.watchers[pool_id, namespace] = watch | |
101 | self.updated[watch.get_id()] = True | |
102 | return True | |
103 | ||
20effc67 | 104 | def unregister(self, pool_id: str, namespace: str) -> None: |
cd265ab1 TL |
105 | |
106 | with self.lock: | |
107 | watch = self.watchers[pool_id, namespace] | |
108 | ||
109 | watch_id = watch.get_id() | |
110 | ||
111 | try: | |
112 | watch.close() | |
113 | ||
114 | self.log.debug("{}/{}/{} watcher {} unregistered".format( | |
115 | pool_id, namespace, MIRRORING_OID, watch_id)) | |
116 | ||
117 | except rados.Error as e: | |
118 | self.log.debug( | |
119 | "exception when unregistering {}/{} watcher: {}".format( | |
120 | pool_id, namespace, e)) | |
121 | ||
122 | with self.lock: | |
123 | self._clean_watcher(pool_id, namespace, watch_id) | |
124 | ||
20effc67 | 125 | def unregister_all(self) -> None: |
cd265ab1 TL |
126 | with self.lock: |
127 | watchers = list(self.watchers) | |
128 | ||
129 | for pool_id, namespace in watchers: | |
130 | self.unregister(pool_id, namespace) | |
131 | ||
20effc67 | 132 | def unregister_stale(self, current_epoch: int) -> None: |
cd265ab1 TL |
133 | with self.lock: |
134 | watchers = list(self.watchers) | |
135 | ||
136 | for pool_id, namespace in watchers: | |
137 | with self.lock: | |
138 | watch = self.watchers[pool_id, namespace] | |
139 | if self.epoch.get(watch.get_id()) == current_epoch: | |
140 | continue | |
141 | ||
142 | self.log.debug("{}/{}/{} watcher {} stale".format( | |
20effc67 | 143 | pool_id, namespace, MIRRORING_OID, watch.get_id())) |
cd265ab1 TL |
144 | |
145 | self.unregister(pool_id, namespace) | |
146 | ||
147 | ||
20effc67 TL |
148 | class ImageSpec(NamedTuple): |
149 | pool_id: str | |
150 | namespace: str | |
151 | image_id: str | |
152 | ||
153 | ||
cd265ab1 TL |
154 | class CreateSnapshotRequests: |
155 | ||
156 | lock = Lock() | |
157 | condition = Condition(lock) | |
158 | ||
20effc67 | 159 | def __init__(self, handler: Any) -> None: |
cd265ab1 TL |
160 | self.handler = handler |
161 | self.rados = handler.module.rados | |
162 | self.log = handler.log | |
20effc67 TL |
163 | self.pending: Set[ImageSpec] = set() |
164 | self.queue: List[ImageSpec] = [] | |
165 | self.ioctxs: Dict[Tuple[str, str], Tuple[rados.Ioctx, Set[ImageSpec]]] = {} | |
cd265ab1 | 166 | |
20effc67 | 167 | def __del__(self) -> None: |
cd265ab1 TL |
168 | self.wait_for_pending() |
169 | ||
20effc67 | 170 | def wait_for_pending(self) -> None: |
cd265ab1 TL |
171 | with self.lock: |
172 | while self.pending: | |
173 | self.condition.wait() | |
174 | ||
20effc67 TL |
175 | def add(self, pool_id: str, namespace: str, image_id: str) -> None: |
176 | image_spec = ImageSpec(pool_id, namespace, image_id) | |
cd265ab1 TL |
177 | |
178 | self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format( | |
179 | pool_id, namespace, image_id)) | |
180 | ||
181 | max_concurrent = self.handler.module.get_localized_module_option( | |
182 | self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE) | |
183 | ||
184 | with self.lock: | |
185 | if image_spec in self.pending: | |
186 | self.log.info( | |
187 | "CreateSnapshotRequests.add: {}/{}/{}: {}".format( | |
188 | pool_id, namespace, image_id, | |
189 | "previous request is still in progress")) | |
190 | return | |
191 | self.pending.add(image_spec) | |
192 | ||
193 | if len(self.pending) > max_concurrent: | |
194 | self.queue.append(image_spec) | |
195 | return | |
196 | ||
197 | self.open_image(image_spec) | |
198 | ||
20effc67 | 199 | def open_image(self, image_spec: ImageSpec) -> None: |
cd265ab1 TL |
200 | pool_id, namespace, image_id = image_spec |
201 | ||
202 | self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format( | |
203 | pool_id, namespace, image_id)) | |
204 | ||
205 | try: | |
206 | ioctx = self.get_ioctx(image_spec) | |
207 | ||
20effc67 | 208 | def cb(comp: rados.Completion, image: rbd.Image) -> None: |
cd265ab1 TL |
209 | self.handle_open_image(image_spec, comp, image) |
210 | ||
211 | rbd.RBD().aio_open_image(cb, ioctx, image_id=image_id) | |
212 | except Exception as e: | |
213 | self.log.error( | |
214 | "exception when opening {}/{}/{}: {}".format( | |
215 | pool_id, namespace, image_id, e)) | |
216 | self.finish(image_spec) | |
217 | ||
20effc67 TL |
218 | def handle_open_image(self, |
219 | image_spec: ImageSpec, | |
220 | comp: rados.Completion, | |
221 | image: rbd.Image) -> None: | |
cd265ab1 TL |
222 | pool_id, namespace, image_id = image_spec |
223 | ||
224 | self.log.debug( | |
225 | "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format( | |
226 | pool_id, namespace, image_id, comp.get_return_value())) | |
227 | ||
228 | if comp.get_return_value() < 0: | |
229 | if comp.get_return_value() != -errno.ENOENT: | |
230 | self.log.error( | |
231 | "error when opening {}/{}/{}: {}".format( | |
232 | pool_id, namespace, image_id, comp.get_return_value())) | |
233 | self.finish(image_spec) | |
234 | return | |
235 | ||
236 | self.get_mirror_mode(image_spec, image) | |
237 | ||
20effc67 | 238 | def get_mirror_mode(self, image_spec: ImageSpec, image: rbd.Image) -> None: |
cd265ab1 TL |
239 | pool_id, namespace, image_id = image_spec |
240 | ||
241 | self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format( | |
242 | pool_id, namespace, image_id)) | |
243 | ||
20effc67 | 244 | def cb(comp: rados.Completion, mode: int) -> None: |
cd265ab1 TL |
245 | self.handle_get_mirror_mode(image_spec, image, comp, mode) |
246 | ||
247 | try: | |
248 | image.aio_mirror_image_get_mode(cb) | |
249 | except Exception as e: | |
250 | self.log.error( | |
251 | "exception when getting mirror mode for {}/{}/{}: {}".format( | |
252 | pool_id, namespace, image_id, e)) | |
253 | self.close_image(image_spec, image) | |
254 | ||
20effc67 TL |
255 | def handle_get_mirror_mode(self, |
256 | image_spec: ImageSpec, | |
257 | image: rbd.Image, | |
258 | comp: rados.Completion, | |
259 | mode: int) -> None: | |
cd265ab1 TL |
260 | pool_id, namespace, image_id = image_spec |
261 | ||
262 | self.log.debug( | |
263 | "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format( | |
264 | pool_id, namespace, image_id, comp.get_return_value(), mode)) | |
265 | ||
266 | if comp.get_return_value() < 0: | |
267 | if comp.get_return_value() != -errno.ENOENT: | |
268 | self.log.error( | |
269 | "error when getting mirror mode for {}/{}/{}: {}".format( | |
270 | pool_id, namespace, image_id, comp.get_return_value())) | |
271 | self.close_image(image_spec, image) | |
272 | return | |
273 | ||
274 | if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT: | |
275 | self.log.debug( | |
276 | "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format( | |
277 | pool_id, namespace, image_id, | |
278 | "snapshot mirroring is not enabled")) | |
279 | self.close_image(image_spec, image) | |
280 | return | |
281 | ||
282 | self.get_mirror_info(image_spec, image) | |
283 | ||
20effc67 | 284 | def get_mirror_info(self, image_spec: ImageSpec, image: rbd.Image) -> None: |
cd265ab1 TL |
285 | pool_id, namespace, image_id = image_spec |
286 | ||
287 | self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format( | |
288 | pool_id, namespace, image_id)) | |
289 | ||
20effc67 | 290 | def cb(comp: rados.Completion, info: Dict[str, Union[str, int]]) -> None: |
cd265ab1 TL |
291 | self.handle_get_mirror_info(image_spec, image, comp, info) |
292 | ||
293 | try: | |
294 | image.aio_mirror_image_get_info(cb) | |
295 | except Exception as e: | |
296 | self.log.error( | |
297 | "exception when getting mirror info for {}/{}/{}: {}".format( | |
298 | pool_id, namespace, image_id, e)) | |
299 | self.close_image(image_spec, image) | |
300 | ||
20effc67 TL |
301 | def handle_get_mirror_info(self, |
302 | image_spec: ImageSpec, | |
303 | image: rbd.Image, | |
304 | comp: rados.Completion, | |
305 | info: Dict[str, Union[str, int]]) -> None: | |
cd265ab1 TL |
306 | pool_id, namespace, image_id = image_spec |
307 | ||
308 | self.log.debug( | |
309 | "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format( | |
310 | pool_id, namespace, image_id, comp.get_return_value(), info)) | |
311 | ||
312 | if comp.get_return_value() < 0: | |
313 | if comp.get_return_value() != -errno.ENOENT: | |
314 | self.log.error( | |
315 | "error when getting mirror info for {}/{}/{}: {}".format( | |
316 | pool_id, namespace, image_id, comp.get_return_value())) | |
317 | self.close_image(image_spec, image) | |
318 | return | |
319 | ||
320 | if not info['primary']: | |
321 | self.log.debug( | |
322 | "CreateSnapshotRequests.handle_get_mirror_info: {}/{}/{}: {}".format( | |
323 | pool_id, namespace, image_id, | |
324 | "is not primary")) | |
325 | self.close_image(image_spec, image) | |
326 | return | |
327 | ||
328 | self.create_snapshot(image_spec, image) | |
329 | ||
20effc67 | 330 | def create_snapshot(self, image_spec: ImageSpec, image: rbd.Image) -> None: |
cd265ab1 TL |
331 | pool_id, namespace, image_id = image_spec |
332 | ||
333 | self.log.debug( | |
334 | "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format( | |
335 | pool_id, namespace, image_id)) | |
336 | ||
20effc67 | 337 | def cb(comp: rados.Completion, snap_id: int) -> None: |
cd265ab1 TL |
338 | self.handle_create_snapshot(image_spec, image, comp, snap_id) |
339 | ||
340 | try: | |
341 | image.aio_mirror_image_create_snapshot(0, cb) | |
342 | except Exception as e: | |
343 | self.log.error( | |
344 | "exception when creating snapshot for {}/{}/{}: {}".format( | |
345 | pool_id, namespace, image_id, e)) | |
346 | self.close_image(image_spec, image) | |
347 | ||
348 | ||
20effc67 TL |
349 | def handle_create_snapshot(self, |
350 | image_spec: ImageSpec, | |
351 | image: rbd.Image, | |
352 | comp: rados.Completion, | |
353 | snap_id: int) -> None: | |
cd265ab1 TL |
354 | pool_id, namespace, image_id = image_spec |
355 | ||
356 | self.log.debug( | |
357 | "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format( | |
358 | pool_id, namespace, image_id, comp.get_return_value(), snap_id)) | |
359 | ||
360 | if comp.get_return_value() < 0 and \ | |
361 | comp.get_return_value() != -errno.ENOENT: | |
362 | self.log.error( | |
363 | "error when creating snapshot for {}/{}/{}: {}".format( | |
364 | pool_id, namespace, image_id, comp.get_return_value())) | |
365 | ||
366 | self.close_image(image_spec, image) | |
367 | ||
20effc67 | 368 | def close_image(self, image_spec: ImageSpec, image: rbd.Image) -> None: |
cd265ab1 TL |
369 | pool_id, namespace, image_id = image_spec |
370 | ||
371 | self.log.debug( | |
372 | "CreateSnapshotRequests.close_image {}/{}/{}".format( | |
373 | pool_id, namespace, image_id)) | |
374 | ||
20effc67 | 375 | def cb(comp: rados.Completion) -> None: |
cd265ab1 TL |
376 | self.handle_close_image(image_spec, comp) |
377 | ||
378 | try: | |
379 | image.aio_close(cb) | |
380 | except Exception as e: | |
381 | self.log.error( | |
382 | "exception when closing {}/{}/{}: {}".format( | |
383 | pool_id, namespace, image_id, e)) | |
384 | self.finish(image_spec) | |
385 | ||
20effc67 TL |
386 | def handle_close_image(self, |
387 | image_spec: ImageSpec, | |
388 | comp: rados.Completion) -> None: | |
cd265ab1 TL |
389 | pool_id, namespace, image_id = image_spec |
390 | ||
391 | self.log.debug( | |
392 | "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format( | |
393 | pool_id, namespace, image_id, comp.get_return_value())) | |
394 | ||
395 | if comp.get_return_value() < 0: | |
396 | self.log.error( | |
397 | "error when closing {}/{}/{}: {}".format( | |
398 | pool_id, namespace, image_id, comp.get_return_value())) | |
399 | ||
400 | self.finish(image_spec) | |
401 | ||
20effc67 | 402 | def finish(self, image_spec: ImageSpec) -> None: |
cd265ab1 TL |
403 | pool_id, namespace, image_id = image_spec |
404 | ||
405 | self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format( | |
406 | pool_id, namespace, image_id)) | |
407 | ||
408 | self.put_ioctx(image_spec) | |
409 | ||
410 | with self.lock: | |
411 | self.pending.remove(image_spec) | |
412 | if not self.queue: | |
413 | return | |
414 | image_spec = self.queue.pop(0) | |
415 | ||
416 | self.open_image(image_spec) | |
417 | ||
20effc67 | 418 | def get_ioctx(self, image_spec: ImageSpec) -> rados.Ioctx: |
cd265ab1 TL |
419 | pool_id, namespace, image_id = image_spec |
420 | nspec = (pool_id, namespace) | |
421 | ||
422 | with self.lock: | |
423 | ioctx, images = self.ioctxs.get(nspec, (None, None)) | |
424 | if not ioctx: | |
425 | ioctx = self.rados.open_ioctx2(int(pool_id)) | |
426 | ioctx.set_namespace(namespace) | |
427 | images = set() | |
428 | self.ioctxs[nspec] = (ioctx, images) | |
20effc67 | 429 | assert images is not None |
cd265ab1 TL |
430 | images.add(image_spec) |
431 | ||
432 | return ioctx | |
433 | ||
20effc67 | 434 | def put_ioctx(self, image_spec: ImageSpec) -> None: |
cd265ab1 TL |
435 | pool_id, namespace, image_id = image_spec |
436 | nspec = (pool_id, namespace) | |
437 | ||
438 | with self.lock: | |
439 | ioctx, images = self.ioctxs[nspec] | |
440 | images.remove(image_spec) | |
441 | if not images: | |
442 | del self.ioctxs[nspec] | |
443 | ||
444 | ||
9f95a23c TL |
445 | class MirrorSnapshotScheduleHandler: |
446 | MODULE_OPTION_NAME = "mirror_snapshot_schedule" | |
cd265ab1 | 447 | MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE = "max_concurrent_snap_create" |
9f95a23c TL |
448 | SCHEDULE_OID = "rbd_mirror_snapshot_schedule" |
449 | ||
450 | lock = Lock() | |
451 | condition = Condition(lock) | |
452 | thread = None | |
453 | ||
20effc67 | 454 | def __init__(self, module: Any) -> None: |
9f95a23c TL |
455 | self.module = module |
456 | self.log = module.log | |
457 | self.last_refresh_images = datetime(1970, 1, 1) | |
cd265ab1 | 458 | self.create_snapshot_requests = CreateSnapshotRequests(self) |
9f95a23c TL |
459 | |
460 | self.init_schedule_queue() | |
461 | ||
462 | self.thread = Thread(target=self.run) | |
463 | self.thread.start() | |
464 | ||
20effc67 | 465 | def _cleanup(self) -> None: |
cd265ab1 TL |
466 | self.watchers.unregister_all() |
467 | self.create_snapshot_requests.wait_for_pending() | |
468 | ||
20effc67 | 469 | def run(self) -> None: |
9f95a23c TL |
470 | try: |
471 | self.log.info("MirrorSnapshotScheduleHandler: starting") | |
472 | while True: | |
473 | self.refresh_images() | |
474 | with self.lock: | |
475 | (image_spec, wait_time) = self.dequeue() | |
476 | if not image_spec: | |
477 | self.condition.wait(min(wait_time, 60)) | |
478 | continue | |
479 | pool_id, namespace, image_id = image_spec | |
cd265ab1 | 480 | self.create_snapshot_requests.add(pool_id, namespace, image_id) |
9f95a23c TL |
481 | with self.lock: |
482 | self.enqueue(datetime.now(), pool_id, namespace, image_id) | |
483 | ||
484 | except Exception as ex: | |
485 | self.log.fatal("Fatal runtime error: {}\n{}".format( | |
486 | ex, traceback.format_exc())) | |
487 | ||
20effc67 TL |
488 | def init_schedule_queue(self) -> None: |
489 | # schedule_time => image_spec | |
490 | self.queue: Dict[str, List[ImageSpec]] = {} | |
491 | # pool_id => {namespace => image_id} | |
492 | self.images: Dict[str, Dict[str, Dict[str, str]]] = {} | |
cd265ab1 | 493 | self.watchers = Watchers(self) |
9f95a23c TL |
494 | self.refresh_images() |
495 | self.log.debug("scheduler queue is initialized") | |
496 | ||
20effc67 | 497 | def load_schedules(self) -> None: |
9f95a23c TL |
498 | self.log.info("MirrorSnapshotScheduleHandler: load_schedules") |
499 | ||
500 | schedules = Schedules(self) | |
501 | schedules.load(namespace_validator, image_validator) | |
502 | with self.lock: | |
503 | self.schedules = schedules | |
504 | ||
20effc67 | 505 | def refresh_images(self) -> None: |
9f95a23c TL |
506 | if (datetime.now() - self.last_refresh_images).seconds < 60: |
507 | return | |
508 | ||
509 | self.log.debug("MirrorSnapshotScheduleHandler: refresh_images") | |
510 | ||
511 | self.load_schedules() | |
512 | ||
cd265ab1 TL |
513 | with self.lock: |
514 | if not self.schedules: | |
515 | self.watchers.unregister_all() | |
516 | self.images = {} | |
517 | self.queue = {} | |
518 | self.last_refresh_images = datetime.now() | |
519 | return | |
520 | ||
521 | epoch = int(datetime.now().strftime('%s')) | |
20effc67 | 522 | images: Dict[str, Dict[str, Dict[str, str]]] = {} |
9f95a23c TL |
523 | |
524 | for pool_id, pool_name in get_rbd_pools(self.module).items(): | |
525 | if not self.schedules.intersects( | |
526 | LevelSpec.from_pool_spec(pool_id, pool_name)): | |
527 | continue | |
528 | with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: | |
cd265ab1 | 529 | self.load_pool_images(ioctx, epoch, images) |
9f95a23c TL |
530 | |
531 | with self.lock: | |
532 | self.refresh_queue(images) | |
533 | self.images = images | |
534 | ||
cd265ab1 | 535 | self.watchers.unregister_stale(epoch) |
9f95a23c TL |
536 | self.last_refresh_images = datetime.now() |
537 | ||
20effc67 TL |
538 | def load_pool_images(self, |
539 | ioctx: rados.Ioctx, | |
540 | epoch: int, | |
541 | images: Dict[str, Dict[str, Dict[str, str]]]) -> None: | |
9f95a23c TL |
542 | pool_id = str(ioctx.get_pool_id()) |
543 | pool_name = ioctx.get_pool_name() | |
544 | images[pool_id] = {} | |
545 | ||
546 | self.log.debug("load_pool_images: pool={}".format(pool_name)) | |
547 | ||
548 | try: | |
549 | namespaces = [''] + rbd.RBD().namespace_list(ioctx) | |
550 | for namespace in namespaces: | |
551 | if not self.schedules.intersects( | |
20effc67 | 552 | LevelSpec.from_pool_spec(int(pool_id), pool_name, namespace)): |
9f95a23c TL |
553 | continue |
554 | self.log.debug("load_pool_images: pool={}, namespace={}".format( | |
555 | pool_name, namespace)) | |
556 | images[pool_id][namespace] = {} | |
557 | ioctx.set_namespace(namespace) | |
cd265ab1 TL |
558 | updated = self.watchers.check(pool_id, namespace, epoch) |
559 | if not updated: | |
560 | self.log.debug("load_pool_images: {}/{} not updated".format( | |
561 | pool_name, namespace)) | |
562 | with self.lock: | |
563 | images[pool_id][namespace] = \ | |
564 | self.images[pool_id][namespace] | |
565 | continue | |
9f95a23c TL |
566 | mirror_images = dict(rbd.RBD().mirror_image_info_list( |
567 | ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT)) | |
568 | if not mirror_images: | |
569 | continue | |
570 | image_names = dict( | |
571 | [(x['id'], x['name']) for x in filter( | |
572 | lambda x: x['id'] in mirror_images, | |
573 | rbd.RBD().list2(ioctx))]) | |
cd265ab1 TL |
574 | for image_id, info in mirror_images.items(): |
575 | if not info['primary']: | |
576 | continue | |
9f95a23c TL |
577 | image_name = image_names.get(image_id) |
578 | if not image_name: | |
579 | continue | |
580 | if namespace: | |
581 | name = "{}/{}/{}".format(pool_name, namespace, | |
582 | image_name) | |
583 | else: | |
584 | name = "{}/{}".format(pool_name, image_name) | |
cd265ab1 TL |
585 | self.log.debug( |
586 | "load_pool_images: adding image {}".format(name)) | |
9f95a23c TL |
587 | images[pool_id][namespace][image_id] = name |
588 | except Exception as e: | |
cd265ab1 TL |
589 | self.log.error( |
590 | "load_pool_images: exception when scanning pool {}: {}".format( | |
591 | pool_name, e)) | |
9f95a23c | 592 | |
20effc67 | 593 | def rebuild_queue(self) -> None: |
9f95a23c TL |
594 | with self.lock: |
595 | now = datetime.now() | |
596 | ||
597 | # don't remove from queue "due" images | |
598 | now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00") | |
599 | ||
600 | for schedule_time in list(self.queue): | |
601 | if schedule_time > now_string: | |
602 | del self.queue[schedule_time] | |
603 | ||
604 | if not self.schedules: | |
605 | return | |
606 | ||
607 | for pool_id in self.images: | |
608 | for namespace in self.images[pool_id]: | |
609 | for image_id in self.images[pool_id][namespace]: | |
610 | self.enqueue(now, pool_id, namespace, image_id) | |
611 | ||
612 | self.condition.notify() | |
613 | ||
20effc67 TL |
614 | def refresh_queue(self, |
615 | current_images: Dict[str, Dict[str, Dict[str, str]]]) -> None: | |
9f95a23c TL |
616 | now = datetime.now() |
617 | ||
618 | for pool_id in self.images: | |
619 | for namespace in self.images[pool_id]: | |
620 | for image_id in self.images[pool_id][namespace]: | |
621 | if pool_id not in current_images or \ | |
622 | namespace not in current_images[pool_id] or \ | |
623 | image_id not in current_images[pool_id][namespace]: | |
624 | self.remove_from_queue(pool_id, namespace, image_id) | |
625 | ||
626 | for pool_id in current_images: | |
627 | for namespace in current_images[pool_id]: | |
628 | for image_id in current_images[pool_id][namespace]: | |
629 | if pool_id not in self.images or \ | |
630 | namespace not in self.images[pool_id] or \ | |
631 | image_id not in self.images[pool_id][namespace]: | |
632 | self.enqueue(now, pool_id, namespace, image_id) | |
633 | ||
634 | self.condition.notify() | |
635 | ||
20effc67 | 636 | def enqueue(self, now: datetime, pool_id: str, namespace: str, image_id: str) -> None: |
9f95a23c TL |
637 | |
638 | schedule = self.schedules.find(pool_id, namespace, image_id) | |
639 | if not schedule: | |
640 | return | |
641 | ||
642 | schedule_time = schedule.next_run(now) | |
643 | if schedule_time not in self.queue: | |
644 | self.queue[schedule_time] = [] | |
645 | self.log.debug("schedule image {}/{}/{} at {}".format( | |
646 | pool_id, namespace, image_id, schedule_time)) | |
20effc67 | 647 | image_spec = ImageSpec(pool_id, namespace, image_id) |
9f95a23c | 648 | if image_spec not in self.queue[schedule_time]: |
20effc67 | 649 | self.queue[schedule_time].append(image_spec) |
9f95a23c | 650 | |
20effc67 | 651 | def dequeue(self) -> Tuple[Optional[ImageSpec], float]: |
9f95a23c | 652 | if not self.queue: |
20effc67 | 653 | return None, 1000.0 |
9f95a23c TL |
654 | |
655 | now = datetime.now() | |
656 | schedule_time = sorted(self.queue)[0] | |
657 | ||
658 | if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time: | |
659 | wait_time = (datetime.strptime(schedule_time, | |
660 | "%Y-%m-%d %H:%M:%S") - now) | |
661 | return None, wait_time.total_seconds() | |
662 | ||
663 | images = self.queue[schedule_time] | |
664 | image = images.pop(0) | |
665 | if not images: | |
666 | del self.queue[schedule_time] | |
20effc67 | 667 | return image, 0.0 |
9f95a23c | 668 | |
20effc67 | 669 | def remove_from_queue(self, pool_id: str, namespace: str, image_id: str) -> None: |
9f95a23c | 670 | empty_slots = [] |
20effc67 | 671 | image_spec = ImageSpec(pool_id, namespace, image_id) |
9f95a23c | 672 | for schedule_time, images in self.queue.items(): |
20effc67 TL |
673 | if image_spec in images: |
674 | images.remove(image_spec) | |
9f95a23c TL |
675 | if not images: |
676 | empty_slots.append(schedule_time) | |
677 | for schedule_time in empty_slots: | |
678 | del self.queue[schedule_time] | |
679 | ||
20effc67 TL |
680 | def add_schedule(self, |
681 | level_spec: LevelSpec, | |
682 | interval: str, | |
683 | start_time: Optional[str]) -> Tuple[int, str, str]: | |
9f95a23c TL |
684 | self.log.debug( |
685 | "add_schedule: level_spec={}, interval={}, start_time={}".format( | |
686 | level_spec.name, interval, start_time)) | |
687 | ||
688 | with self.lock: | |
689 | self.schedules.add(level_spec, interval, start_time) | |
690 | ||
691 | # TODO: optimize to rebuild only affected part of the queue | |
692 | self.rebuild_queue() | |
693 | return 0, "", "" | |
694 | ||
20effc67 TL |
695 | def remove_schedule(self, |
696 | level_spec: LevelSpec, | |
697 | interval: Optional[str], | |
698 | start_time: Optional[str]) -> Tuple[int, str, str]: | |
9f95a23c TL |
699 | self.log.debug( |
700 | "remove_schedule: level_spec={}, interval={}, start_time={}".format( | |
701 | level_spec.name, interval, start_time)) | |
702 | ||
703 | with self.lock: | |
704 | self.schedules.remove(level_spec, interval, start_time) | |
705 | ||
706 | # TODO: optimize to rebuild only affected part of the queue | |
707 | self.rebuild_queue() | |
708 | return 0, "", "" | |
709 | ||
20effc67 | 710 | def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]: |
9f95a23c TL |
711 | self.log.debug("list: level_spec={}".format(level_spec.name)) |
712 | ||
713 | with self.lock: | |
714 | result = self.schedules.to_list(level_spec) | |
715 | ||
716 | return 0, json.dumps(result, indent=4, sort_keys=True), "" | |
717 | ||
20effc67 | 718 | def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]: |
9f95a23c TL |
719 | self.log.debug("status: level_spec={}".format(level_spec.name)) |
720 | ||
721 | scheduled_images = [] | |
722 | with self.lock: | |
723 | for schedule_time in sorted(self.queue): | |
724 | for pool_id, namespace, image_id in self.queue[schedule_time]: | |
725 | if not level_spec.matches(pool_id, namespace, image_id): | |
726 | continue | |
727 | image_name = self.images[pool_id][namespace][image_id] | |
728 | scheduled_images.append({ | |
729 | 'schedule_time' : schedule_time, | |
730 | 'image' : image_name | |
731 | }) | |
732 | return 0, json.dumps({'scheduled_images' : scheduled_images}, | |
733 | indent=4, sort_keys=True), "" |