]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / rbd_support / mirror_snapshot_schedule.py
CommitLineData
9f95a23c
TL
1import errno
2import json
3import rados
4import rbd
5import re
6import traceback
7
8from datetime import datetime
9from threading import Condition, Lock, Thread
20effc67 10from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Set, Tuple, Union
9f95a23c
TL
11
12from .common import get_rbd_pools
13from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
14
cd265ab1
TL
15MIRRORING_OID = "rbd_mirroring"
16
20effc67 17def namespace_validator(ioctx: rados.Ioctx) -> None:
9f95a23c
TL
18 mode = rbd.RBD().mirror_mode_get(ioctx)
19 if mode != rbd.RBD_MIRROR_MODE_IMAGE:
20 raise ValueError("namespace {} is not in mirror image mode".format(
21 ioctx.get_namespace()))
22
20effc67 23def image_validator(image: rbd.Image) -> None:
9f95a23c
TL
24 mode = image.mirror_image_get_mode()
25 if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
26 raise rbd.InvalidArgument("Invalid mirror image mode")
27
cd265ab1
TL
28class Watchers:
29
30 lock = Lock()
31
20effc67 32 def __init__(self, handler: Any) -> None:
cd265ab1
TL
33 self.rados = handler.module.rados
34 self.log = handler.log
20effc67
TL
35 self.watchers: Dict[Tuple[str, str], rados.Watch] = {}
36 self.updated: Dict[int, bool] = {}
37 self.error: Dict[int, str] = {}
38 self.epoch: Dict[int, int] = {}
cd265ab1 39
20effc67 40 def __del__(self) -> None:
cd265ab1
TL
41 self.unregister_all()
42
20effc67 43 def _clean_watcher(self, pool_id: str, namespace: str, watch_id: int) -> None:
cd265ab1
TL
44 assert self.lock.locked()
45
46 del self.watchers[pool_id, namespace]
47 self.updated.pop(watch_id, None)
48 self.error.pop(watch_id, None)
49 self.epoch.pop(watch_id, None)
50
20effc67 51 def check(self, pool_id: str, namespace: str, epoch: int) -> bool:
cd265ab1
TL
52 error = None
53 with self.lock:
54 watch = self.watchers.get((pool_id, namespace))
55 if watch is not None:
56 error = self.error.get(watch.get_id())
57 if not error:
58 updated = self.updated[watch.get_id()]
59 self.updated[watch.get_id()] = False
60 self.epoch[watch.get_id()] = epoch
61 return updated
62 if error:
63 self.unregister(pool_id, namespace)
64
65 if self.register(pool_id, namespace):
66 return self.check(pool_id, namespace, epoch)
67 else:
68 return True
69
20effc67 70 def register(self, pool_id: str, namespace: str) -> bool:
cd265ab1 71
20effc67 72 def callback(notify_id: str, notifier_id: str, watch_id: int, data: str) -> None:
cd265ab1
TL
73 self.log.debug("watcher {}: got notify {} from {}".format(
74 watch_id, notify_id, notifier_id))
75
76 with self.lock:
77 self.updated[watch_id] = True
78
20effc67 79 def error_callback(watch_id: int, error: str) -> None:
cd265ab1
TL
80 self.log.debug("watcher {}: got errror {}".format(
81 watch_id, error))
82
83 with self.lock:
84 self.error[watch_id] = error
85
86 try:
87 ioctx = self.rados.open_ioctx2(int(pool_id))
88 ioctx.set_namespace(namespace)
89 watch = ioctx.watch(MIRRORING_OID, callback, error_callback)
90 except rados.ObjectNotFound:
91 self.log.debug(
92 "{}/{}/{} watcher not registered: object not found".format(
93 pool_id, namespace, MIRRORING_OID))
94 return False
95
96 self.log.debug("{}/{}/{} watcher {} registered".format(
97 pool_id, namespace, MIRRORING_OID, watch.get_id()))
98
99 with self.lock:
100 self.watchers[pool_id, namespace] = watch
101 self.updated[watch.get_id()] = True
102 return True
103
20effc67 104 def unregister(self, pool_id: str, namespace: str) -> None:
cd265ab1
TL
105
106 with self.lock:
107 watch = self.watchers[pool_id, namespace]
108
109 watch_id = watch.get_id()
110
111 try:
112 watch.close()
113
114 self.log.debug("{}/{}/{} watcher {} unregistered".format(
115 pool_id, namespace, MIRRORING_OID, watch_id))
116
117 except rados.Error as e:
118 self.log.debug(
119 "exception when unregistering {}/{} watcher: {}".format(
120 pool_id, namespace, e))
121
122 with self.lock:
123 self._clean_watcher(pool_id, namespace, watch_id)
124
20effc67 125 def unregister_all(self) -> None:
cd265ab1
TL
126 with self.lock:
127 watchers = list(self.watchers)
128
129 for pool_id, namespace in watchers:
130 self.unregister(pool_id, namespace)
131
20effc67 132 def unregister_stale(self, current_epoch: int) -> None:
cd265ab1
TL
133 with self.lock:
134 watchers = list(self.watchers)
135
136 for pool_id, namespace in watchers:
137 with self.lock:
138 watch = self.watchers[pool_id, namespace]
139 if self.epoch.get(watch.get_id()) == current_epoch:
140 continue
141
142 self.log.debug("{}/{}/{} watcher {} stale".format(
20effc67 143 pool_id, namespace, MIRRORING_OID, watch.get_id()))
cd265ab1
TL
144
145 self.unregister(pool_id, namespace)
146
147
20effc67
TL
148class ImageSpec(NamedTuple):
149 pool_id: str
150 namespace: str
151 image_id: str
152
153
cd265ab1
TL
154class CreateSnapshotRequests:
155
156 lock = Lock()
157 condition = Condition(lock)
158
20effc67 159 def __init__(self, handler: Any) -> None:
cd265ab1
TL
160 self.handler = handler
161 self.rados = handler.module.rados
162 self.log = handler.log
20effc67
TL
163 self.pending: Set[ImageSpec] = set()
164 self.queue: List[ImageSpec] = []
165 self.ioctxs: Dict[Tuple[str, str], Tuple[rados.Ioctx, Set[ImageSpec]]] = {}
cd265ab1 166
20effc67 167 def __del__(self) -> None:
cd265ab1
TL
168 self.wait_for_pending()
169
20effc67 170 def wait_for_pending(self) -> None:
cd265ab1
TL
171 with self.lock:
172 while self.pending:
173 self.condition.wait()
174
20effc67
TL
175 def add(self, pool_id: str, namespace: str, image_id: str) -> None:
176 image_spec = ImageSpec(pool_id, namespace, image_id)
cd265ab1
TL
177
178 self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format(
179 pool_id, namespace, image_id))
180
181 max_concurrent = self.handler.module.get_localized_module_option(
182 self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE)
183
184 with self.lock:
185 if image_spec in self.pending:
186 self.log.info(
187 "CreateSnapshotRequests.add: {}/{}/{}: {}".format(
188 pool_id, namespace, image_id,
189 "previous request is still in progress"))
190 return
191 self.pending.add(image_spec)
192
193 if len(self.pending) > max_concurrent:
194 self.queue.append(image_spec)
195 return
196
197 self.open_image(image_spec)
198
20effc67 199 def open_image(self, image_spec: ImageSpec) -> None:
cd265ab1
TL
200 pool_id, namespace, image_id = image_spec
201
202 self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format(
203 pool_id, namespace, image_id))
204
205 try:
206 ioctx = self.get_ioctx(image_spec)
207
20effc67 208 def cb(comp: rados.Completion, image: rbd.Image) -> None:
cd265ab1
TL
209 self.handle_open_image(image_spec, comp, image)
210
211 rbd.RBD().aio_open_image(cb, ioctx, image_id=image_id)
212 except Exception as e:
213 self.log.error(
214 "exception when opening {}/{}/{}: {}".format(
215 pool_id, namespace, image_id, e))
216 self.finish(image_spec)
217
20effc67
TL
218 def handle_open_image(self,
219 image_spec: ImageSpec,
220 comp: rados.Completion,
221 image: rbd.Image) -> None:
cd265ab1
TL
222 pool_id, namespace, image_id = image_spec
223
224 self.log.debug(
225 "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format(
226 pool_id, namespace, image_id, comp.get_return_value()))
227
228 if comp.get_return_value() < 0:
229 if comp.get_return_value() != -errno.ENOENT:
230 self.log.error(
231 "error when opening {}/{}/{}: {}".format(
232 pool_id, namespace, image_id, comp.get_return_value()))
233 self.finish(image_spec)
234 return
235
236 self.get_mirror_mode(image_spec, image)
237
20effc67 238 def get_mirror_mode(self, image_spec: ImageSpec, image: rbd.Image) -> None:
cd265ab1
TL
239 pool_id, namespace, image_id = image_spec
240
241 self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format(
242 pool_id, namespace, image_id))
243
20effc67 244 def cb(comp: rados.Completion, mode: int) -> None:
cd265ab1
TL
245 self.handle_get_mirror_mode(image_spec, image, comp, mode)
246
247 try:
248 image.aio_mirror_image_get_mode(cb)
249 except Exception as e:
250 self.log.error(
251 "exception when getting mirror mode for {}/{}/{}: {}".format(
252 pool_id, namespace, image_id, e))
253 self.close_image(image_spec, image)
254
20effc67
TL
255 def handle_get_mirror_mode(self,
256 image_spec: ImageSpec,
257 image: rbd.Image,
258 comp: rados.Completion,
259 mode: int) -> None:
cd265ab1
TL
260 pool_id, namespace, image_id = image_spec
261
262 self.log.debug(
263 "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format(
264 pool_id, namespace, image_id, comp.get_return_value(), mode))
265
266 if comp.get_return_value() < 0:
267 if comp.get_return_value() != -errno.ENOENT:
268 self.log.error(
269 "error when getting mirror mode for {}/{}/{}: {}".format(
270 pool_id, namespace, image_id, comp.get_return_value()))
271 self.close_image(image_spec, image)
272 return
273
274 if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
275 self.log.debug(
276 "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format(
277 pool_id, namespace, image_id,
278 "snapshot mirroring is not enabled"))
279 self.close_image(image_spec, image)
280 return
281
282 self.get_mirror_info(image_spec, image)
283
20effc67 284 def get_mirror_info(self, image_spec: ImageSpec, image: rbd.Image) -> None:
cd265ab1
TL
285 pool_id, namespace, image_id = image_spec
286
287 self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
288 pool_id, namespace, image_id))
289
20effc67 290 def cb(comp: rados.Completion, info: Dict[str, Union[str, int]]) -> None:
cd265ab1
TL
291 self.handle_get_mirror_info(image_spec, image, comp, info)
292
293 try:
294 image.aio_mirror_image_get_info(cb)
295 except Exception as e:
296 self.log.error(
297 "exception when getting mirror info for {}/{}/{}: {}".format(
298 pool_id, namespace, image_id, e))
299 self.close_image(image_spec, image)
300
20effc67
TL
301 def handle_get_mirror_info(self,
302 image_spec: ImageSpec,
303 image: rbd.Image,
304 comp: rados.Completion,
305 info: Dict[str, Union[str, int]]) -> None:
cd265ab1
TL
306 pool_id, namespace, image_id = image_spec
307
308 self.log.debug(
309 "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format(
310 pool_id, namespace, image_id, comp.get_return_value(), info))
311
312 if comp.get_return_value() < 0:
313 if comp.get_return_value() != -errno.ENOENT:
314 self.log.error(
315 "error when getting mirror info for {}/{}/{}: {}".format(
316 pool_id, namespace, image_id, comp.get_return_value()))
317 self.close_image(image_spec, image)
318 return
319
320 if not info['primary']:
321 self.log.debug(
322 "CreateSnapshotRequests.handle_get_mirror_info: {}/{}/{}: {}".format(
323 pool_id, namespace, image_id,
324 "is not primary"))
325 self.close_image(image_spec, image)
326 return
327
328 self.create_snapshot(image_spec, image)
329
20effc67 330 def create_snapshot(self, image_spec: ImageSpec, image: rbd.Image) -> None:
cd265ab1
TL
331 pool_id, namespace, image_id = image_spec
332
333 self.log.debug(
334 "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format(
335 pool_id, namespace, image_id))
336
20effc67 337 def cb(comp: rados.Completion, snap_id: int) -> None:
cd265ab1
TL
338 self.handle_create_snapshot(image_spec, image, comp, snap_id)
339
340 try:
341 image.aio_mirror_image_create_snapshot(0, cb)
342 except Exception as e:
343 self.log.error(
344 "exception when creating snapshot for {}/{}/{}: {}".format(
345 pool_id, namespace, image_id, e))
346 self.close_image(image_spec, image)
347
348
20effc67
TL
349 def handle_create_snapshot(self,
350 image_spec: ImageSpec,
351 image: rbd.Image,
352 comp: rados.Completion,
353 snap_id: int) -> None:
cd265ab1
TL
354 pool_id, namespace, image_id = image_spec
355
356 self.log.debug(
357 "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format(
358 pool_id, namespace, image_id, comp.get_return_value(), snap_id))
359
360 if comp.get_return_value() < 0 and \
361 comp.get_return_value() != -errno.ENOENT:
362 self.log.error(
363 "error when creating snapshot for {}/{}/{}: {}".format(
364 pool_id, namespace, image_id, comp.get_return_value()))
365
366 self.close_image(image_spec, image)
367
20effc67 368 def close_image(self, image_spec: ImageSpec, image: rbd.Image) -> None:
cd265ab1
TL
369 pool_id, namespace, image_id = image_spec
370
371 self.log.debug(
372 "CreateSnapshotRequests.close_image {}/{}/{}".format(
373 pool_id, namespace, image_id))
374
20effc67 375 def cb(comp: rados.Completion) -> None:
cd265ab1
TL
376 self.handle_close_image(image_spec, comp)
377
378 try:
379 image.aio_close(cb)
380 except Exception as e:
381 self.log.error(
382 "exception when closing {}/{}/{}: {}".format(
383 pool_id, namespace, image_id, e))
384 self.finish(image_spec)
385
20effc67
TL
386 def handle_close_image(self,
387 image_spec: ImageSpec,
388 comp: rados.Completion) -> None:
cd265ab1
TL
389 pool_id, namespace, image_id = image_spec
390
391 self.log.debug(
392 "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format(
393 pool_id, namespace, image_id, comp.get_return_value()))
394
395 if comp.get_return_value() < 0:
396 self.log.error(
397 "error when closing {}/{}/{}: {}".format(
398 pool_id, namespace, image_id, comp.get_return_value()))
399
400 self.finish(image_spec)
401
20effc67 402 def finish(self, image_spec: ImageSpec) -> None:
cd265ab1
TL
403 pool_id, namespace, image_id = image_spec
404
405 self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format(
406 pool_id, namespace, image_id))
407
408 self.put_ioctx(image_spec)
409
410 with self.lock:
411 self.pending.remove(image_spec)
412 if not self.queue:
413 return
414 image_spec = self.queue.pop(0)
415
416 self.open_image(image_spec)
417
20effc67 418 def get_ioctx(self, image_spec: ImageSpec) -> rados.Ioctx:
cd265ab1
TL
419 pool_id, namespace, image_id = image_spec
420 nspec = (pool_id, namespace)
421
422 with self.lock:
423 ioctx, images = self.ioctxs.get(nspec, (None, None))
424 if not ioctx:
425 ioctx = self.rados.open_ioctx2(int(pool_id))
426 ioctx.set_namespace(namespace)
427 images = set()
428 self.ioctxs[nspec] = (ioctx, images)
20effc67 429 assert images is not None
cd265ab1
TL
430 images.add(image_spec)
431
432 return ioctx
433
20effc67 434 def put_ioctx(self, image_spec: ImageSpec) -> None:
cd265ab1
TL
435 pool_id, namespace, image_id = image_spec
436 nspec = (pool_id, namespace)
437
438 with self.lock:
439 ioctx, images = self.ioctxs[nspec]
440 images.remove(image_spec)
441 if not images:
442 del self.ioctxs[nspec]
443
444
9f95a23c
TL
445class MirrorSnapshotScheduleHandler:
446 MODULE_OPTION_NAME = "mirror_snapshot_schedule"
cd265ab1 447 MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE = "max_concurrent_snap_create"
9f95a23c
TL
448 SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
449
450 lock = Lock()
451 condition = Condition(lock)
452 thread = None
453
20effc67 454 def __init__(self, module: Any) -> None:
9f95a23c
TL
455 self.module = module
456 self.log = module.log
457 self.last_refresh_images = datetime(1970, 1, 1)
cd265ab1 458 self.create_snapshot_requests = CreateSnapshotRequests(self)
9f95a23c
TL
459
460 self.init_schedule_queue()
461
462 self.thread = Thread(target=self.run)
463 self.thread.start()
464
20effc67 465 def _cleanup(self) -> None:
cd265ab1
TL
466 self.watchers.unregister_all()
467 self.create_snapshot_requests.wait_for_pending()
468
20effc67 469 def run(self) -> None:
9f95a23c
TL
470 try:
471 self.log.info("MirrorSnapshotScheduleHandler: starting")
472 while True:
473 self.refresh_images()
474 with self.lock:
475 (image_spec, wait_time) = self.dequeue()
476 if not image_spec:
477 self.condition.wait(min(wait_time, 60))
478 continue
479 pool_id, namespace, image_id = image_spec
cd265ab1 480 self.create_snapshot_requests.add(pool_id, namespace, image_id)
9f95a23c
TL
481 with self.lock:
482 self.enqueue(datetime.now(), pool_id, namespace, image_id)
483
484 except Exception as ex:
485 self.log.fatal("Fatal runtime error: {}\n{}".format(
486 ex, traceback.format_exc()))
487
20effc67
TL
488 def init_schedule_queue(self) -> None:
489 # schedule_time => image_spec
490 self.queue: Dict[str, List[ImageSpec]] = {}
491 # pool_id => {namespace => image_id}
492 self.images: Dict[str, Dict[str, Dict[str, str]]] = {}
cd265ab1 493 self.watchers = Watchers(self)
9f95a23c
TL
494 self.refresh_images()
495 self.log.debug("scheduler queue is initialized")
496
20effc67 497 def load_schedules(self) -> None:
9f95a23c
TL
498 self.log.info("MirrorSnapshotScheduleHandler: load_schedules")
499
500 schedules = Schedules(self)
501 schedules.load(namespace_validator, image_validator)
502 with self.lock:
503 self.schedules = schedules
504
20effc67 505 def refresh_images(self) -> None:
9f95a23c
TL
506 if (datetime.now() - self.last_refresh_images).seconds < 60:
507 return
508
509 self.log.debug("MirrorSnapshotScheduleHandler: refresh_images")
510
511 self.load_schedules()
512
cd265ab1
TL
513 with self.lock:
514 if not self.schedules:
515 self.watchers.unregister_all()
516 self.images = {}
517 self.queue = {}
518 self.last_refresh_images = datetime.now()
519 return
520
521 epoch = int(datetime.now().strftime('%s'))
20effc67 522 images: Dict[str, Dict[str, Dict[str, str]]] = {}
9f95a23c
TL
523
524 for pool_id, pool_name in get_rbd_pools(self.module).items():
525 if not self.schedules.intersects(
526 LevelSpec.from_pool_spec(pool_id, pool_name)):
527 continue
528 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
cd265ab1 529 self.load_pool_images(ioctx, epoch, images)
9f95a23c
TL
530
531 with self.lock:
532 self.refresh_queue(images)
533 self.images = images
534
cd265ab1 535 self.watchers.unregister_stale(epoch)
9f95a23c
TL
536 self.last_refresh_images = datetime.now()
537
20effc67
TL
538 def load_pool_images(self,
539 ioctx: rados.Ioctx,
540 epoch: int,
541 images: Dict[str, Dict[str, Dict[str, str]]]) -> None:
9f95a23c
TL
542 pool_id = str(ioctx.get_pool_id())
543 pool_name = ioctx.get_pool_name()
544 images[pool_id] = {}
545
546 self.log.debug("load_pool_images: pool={}".format(pool_name))
547
548 try:
549 namespaces = [''] + rbd.RBD().namespace_list(ioctx)
550 for namespace in namespaces:
551 if not self.schedules.intersects(
20effc67 552 LevelSpec.from_pool_spec(int(pool_id), pool_name, namespace)):
9f95a23c
TL
553 continue
554 self.log.debug("load_pool_images: pool={}, namespace={}".format(
555 pool_name, namespace))
556 images[pool_id][namespace] = {}
557 ioctx.set_namespace(namespace)
cd265ab1
TL
558 updated = self.watchers.check(pool_id, namespace, epoch)
559 if not updated:
560 self.log.debug("load_pool_images: {}/{} not updated".format(
561 pool_name, namespace))
562 with self.lock:
563 images[pool_id][namespace] = \
564 self.images[pool_id][namespace]
565 continue
9f95a23c
TL
566 mirror_images = dict(rbd.RBD().mirror_image_info_list(
567 ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT))
568 if not mirror_images:
569 continue
570 image_names = dict(
571 [(x['id'], x['name']) for x in filter(
572 lambda x: x['id'] in mirror_images,
573 rbd.RBD().list2(ioctx))])
cd265ab1
TL
574 for image_id, info in mirror_images.items():
575 if not info['primary']:
576 continue
9f95a23c
TL
577 image_name = image_names.get(image_id)
578 if not image_name:
579 continue
580 if namespace:
581 name = "{}/{}/{}".format(pool_name, namespace,
582 image_name)
583 else:
584 name = "{}/{}".format(pool_name, image_name)
cd265ab1
TL
585 self.log.debug(
586 "load_pool_images: adding image {}".format(name))
9f95a23c
TL
587 images[pool_id][namespace][image_id] = name
588 except Exception as e:
cd265ab1
TL
589 self.log.error(
590 "load_pool_images: exception when scanning pool {}: {}".format(
591 pool_name, e))
9f95a23c 592
20effc67 593 def rebuild_queue(self) -> None:
9f95a23c
TL
594 with self.lock:
595 now = datetime.now()
596
597 # don't remove from queue "due" images
598 now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
599
600 for schedule_time in list(self.queue):
601 if schedule_time > now_string:
602 del self.queue[schedule_time]
603
604 if not self.schedules:
605 return
606
607 for pool_id in self.images:
608 for namespace in self.images[pool_id]:
609 for image_id in self.images[pool_id][namespace]:
610 self.enqueue(now, pool_id, namespace, image_id)
611
612 self.condition.notify()
613
20effc67
TL
614 def refresh_queue(self,
615 current_images: Dict[str, Dict[str, Dict[str, str]]]) -> None:
9f95a23c
TL
616 now = datetime.now()
617
618 for pool_id in self.images:
619 for namespace in self.images[pool_id]:
620 for image_id in self.images[pool_id][namespace]:
621 if pool_id not in current_images or \
622 namespace not in current_images[pool_id] or \
623 image_id not in current_images[pool_id][namespace]:
624 self.remove_from_queue(pool_id, namespace, image_id)
625
626 for pool_id in current_images:
627 for namespace in current_images[pool_id]:
628 for image_id in current_images[pool_id][namespace]:
629 if pool_id not in self.images or \
630 namespace not in self.images[pool_id] or \
631 image_id not in self.images[pool_id][namespace]:
632 self.enqueue(now, pool_id, namespace, image_id)
633
634 self.condition.notify()
635
20effc67 636 def enqueue(self, now: datetime, pool_id: str, namespace: str, image_id: str) -> None:
9f95a23c
TL
637
638 schedule = self.schedules.find(pool_id, namespace, image_id)
639 if not schedule:
640 return
641
642 schedule_time = schedule.next_run(now)
643 if schedule_time not in self.queue:
644 self.queue[schedule_time] = []
645 self.log.debug("schedule image {}/{}/{} at {}".format(
646 pool_id, namespace, image_id, schedule_time))
20effc67 647 image_spec = ImageSpec(pool_id, namespace, image_id)
9f95a23c 648 if image_spec not in self.queue[schedule_time]:
20effc67 649 self.queue[schedule_time].append(image_spec)
9f95a23c 650
20effc67 651 def dequeue(self) -> Tuple[Optional[ImageSpec], float]:
9f95a23c 652 if not self.queue:
20effc67 653 return None, 1000.0
9f95a23c
TL
654
655 now = datetime.now()
656 schedule_time = sorted(self.queue)[0]
657
658 if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
659 wait_time = (datetime.strptime(schedule_time,
660 "%Y-%m-%d %H:%M:%S") - now)
661 return None, wait_time.total_seconds()
662
663 images = self.queue[schedule_time]
664 image = images.pop(0)
665 if not images:
666 del self.queue[schedule_time]
20effc67 667 return image, 0.0
9f95a23c 668
20effc67 669 def remove_from_queue(self, pool_id: str, namespace: str, image_id: str) -> None:
9f95a23c 670 empty_slots = []
20effc67 671 image_spec = ImageSpec(pool_id, namespace, image_id)
9f95a23c 672 for schedule_time, images in self.queue.items():
20effc67
TL
673 if image_spec in images:
674 images.remove(image_spec)
9f95a23c
TL
675 if not images:
676 empty_slots.append(schedule_time)
677 for schedule_time in empty_slots:
678 del self.queue[schedule_time]
679
20effc67
TL
680 def add_schedule(self,
681 level_spec: LevelSpec,
682 interval: str,
683 start_time: Optional[str]) -> Tuple[int, str, str]:
9f95a23c
TL
684 self.log.debug(
685 "add_schedule: level_spec={}, interval={}, start_time={}".format(
686 level_spec.name, interval, start_time))
687
688 with self.lock:
689 self.schedules.add(level_spec, interval, start_time)
690
691 # TODO: optimize to rebuild only affected part of the queue
692 self.rebuild_queue()
693 return 0, "", ""
694
20effc67
TL
695 def remove_schedule(self,
696 level_spec: LevelSpec,
697 interval: Optional[str],
698 start_time: Optional[str]) -> Tuple[int, str, str]:
9f95a23c
TL
699 self.log.debug(
700 "remove_schedule: level_spec={}, interval={}, start_time={}".format(
701 level_spec.name, interval, start_time))
702
703 with self.lock:
704 self.schedules.remove(level_spec, interval, start_time)
705
706 # TODO: optimize to rebuild only affected part of the queue
707 self.rebuild_queue()
708 return 0, "", ""
709
20effc67 710 def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
9f95a23c
TL
711 self.log.debug("list: level_spec={}".format(level_spec.name))
712
713 with self.lock:
714 result = self.schedules.to_list(level_spec)
715
716 return 0, json.dumps(result, indent=4, sort_keys=True), ""
717
20effc67 718 def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
9f95a23c
TL
719 self.log.debug("status: level_spec={}".format(level_spec.name))
720
721 scheduled_images = []
722 with self.lock:
723 for schedule_time in sorted(self.queue):
724 for pool_id, namespace, image_id in self.queue[schedule_time]:
725 if not level_spec.matches(pool_id, namespace, image_id):
726 continue
727 image_name = self.images[pool_id][namespace][image_id]
728 scheduled_images.append({
729 'schedule_time' : schedule_time,
730 'image' : image_name
731 })
732 return 0, json.dumps({'scheduled_images' : scheduled_images},
733 indent=4, sort_keys=True), ""