7 from datetime
import datetime
8 from threading
import Condition
, Lock
, Thread
9 from typing
import Any
, Dict
, List
, NamedTuple
, Optional
, Set
, Tuple
, Union
11 from .common
import get_rbd_pools
12 from .schedule
import LevelSpec
, Schedules
15 def namespace_validator(ioctx
: rados
.Ioctx
) -> None:
16 mode
= rbd
.RBD().mirror_mode_get(ioctx
)
17 if mode
!= rbd
.RBD_MIRROR_MODE_IMAGE
:
18 raise ValueError("namespace {} is not in mirror image mode".format(
19 ioctx
.get_namespace()))
22 def image_validator(image
: rbd
.Image
) -> None:
23 mode
= image
.mirror_image_get_mode()
24 if mode
!= rbd
.RBD_MIRROR_IMAGE_MODE_SNAPSHOT
:
25 raise rbd
.InvalidArgument("Invalid mirror image mode")
28 class ImageSpec(NamedTuple
):
34 class CreateSnapshotRequests
:
37 condition
= Condition(lock
)
39 def __init__(self
, handler
: Any
) -> None:
40 self
.handler
= handler
41 self
.rados
= handler
.module
.rados
42 self
.log
= handler
.log
43 self
.pending
: Set
[ImageSpec
] = set()
44 self
.queue
: List
[ImageSpec
] = []
45 self
.ioctxs
: Dict
[Tuple
[str, str], Tuple
[rados
.Ioctx
, Set
[ImageSpec
]]] = {}
47 def __del__(self
) -> None:
48 self
.wait_for_pending()
50 def wait_for_pending(self
) -> None:
54 "CreateSnapshotRequests.wait_for_pending: "
55 "{} images".format(len(self
.pending
)))
57 self
.log
.debug("CreateSnapshotRequests.wait_for_pending: done")
59 def add(self
, pool_id
: str, namespace
: str, image_id
: str) -> None:
60 image_spec
= ImageSpec(pool_id
, namespace
, image_id
)
62 self
.log
.debug("CreateSnapshotRequests.add: {}/{}/{}".format(
63 pool_id
, namespace
, image_id
))
65 max_concurrent
= self
.handler
.module
.get_localized_module_option(
66 self
.handler
.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE
)
69 if image_spec
in self
.pending
:
71 "CreateSnapshotRequests.add: {}/{}/{}: {}".format(
72 pool_id
, namespace
, image_id
,
73 "previous request is still in progress"))
75 self
.pending
.add(image_spec
)
77 if len(self
.pending
) > max_concurrent
:
78 self
.queue
.append(image_spec
)
81 self
.open_image(image_spec
)
83 def open_image(self
, image_spec
: ImageSpec
) -> None:
84 pool_id
, namespace
, image_id
= image_spec
86 self
.log
.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format(
87 pool_id
, namespace
, image_id
))
90 ioctx
= self
.get_ioctx(image_spec
)
92 def cb(comp
: rados
.Completion
, image
: rbd
.Image
) -> None:
93 self
.handle_open_image(image_spec
, comp
, image
)
95 rbd
.RBD().aio_open_image(cb
, ioctx
, image_id
=image_id
)
96 except Exception as e
:
98 "exception when opening {}/{}/{}: {}".format(
99 pool_id
, namespace
, image_id
, e
))
100 self
.finish(image_spec
)
102 def handle_open_image(self
,
103 image_spec
: ImageSpec
,
104 comp
: rados
.Completion
,
105 image
: rbd
.Image
) -> None:
106 pool_id
, namespace
, image_id
= image_spec
109 "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format(
110 pool_id
, namespace
, image_id
, comp
.get_return_value()))
112 if comp
.get_return_value() < 0:
113 if comp
.get_return_value() != -errno
.ENOENT
:
115 "error when opening {}/{}/{}: {}".format(
116 pool_id
, namespace
, image_id
, comp
.get_return_value()))
117 self
.finish(image_spec
)
120 self
.get_mirror_mode(image_spec
, image
)
122 def get_mirror_mode(self
, image_spec
: ImageSpec
, image
: rbd
.Image
) -> None:
123 pool_id
, namespace
, image_id
= image_spec
125 self
.log
.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format(
126 pool_id
, namespace
, image_id
))
128 def cb(comp
: rados
.Completion
, mode
: int) -> None:
129 self
.handle_get_mirror_mode(image_spec
, image
, comp
, mode
)
132 image
.aio_mirror_image_get_mode(cb
)
133 except Exception as e
:
135 "exception when getting mirror mode for {}/{}/{}: {}".format(
136 pool_id
, namespace
, image_id
, e
))
137 self
.close_image(image_spec
, image
)
139 def handle_get_mirror_mode(self
,
140 image_spec
: ImageSpec
,
142 comp
: rados
.Completion
,
144 pool_id
, namespace
, image_id
= image_spec
147 "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format(
148 pool_id
, namespace
, image_id
, comp
.get_return_value(), mode
))
150 if comp
.get_return_value() < 0:
151 if comp
.get_return_value() != -errno
.ENOENT
:
153 "error when getting mirror mode for {}/{}/{}: {}".format(
154 pool_id
, namespace
, image_id
, comp
.get_return_value()))
155 self
.close_image(image_spec
, image
)
158 if mode
!= rbd
.RBD_MIRROR_IMAGE_MODE_SNAPSHOT
:
160 "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format(
161 pool_id
, namespace
, image_id
,
162 "snapshot mirroring is not enabled"))
163 self
.close_image(image_spec
, image
)
166 self
.get_mirror_info(image_spec
, image
)
168 def get_mirror_info(self
, image_spec
: ImageSpec
, image
: rbd
.Image
) -> None:
169 pool_id
, namespace
, image_id
= image_spec
171 self
.log
.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
172 pool_id
, namespace
, image_id
))
174 def cb(comp
: rados
.Completion
, info
: Dict
[str, Union
[str, int]]) -> None:
175 self
.handle_get_mirror_info(image_spec
, image
, comp
, info
)
178 image
.aio_mirror_image_get_info(cb
)
179 except Exception as e
:
181 "exception when getting mirror info for {}/{}/{}: {}".format(
182 pool_id
, namespace
, image_id
, e
))
183 self
.close_image(image_spec
, image
)
185 def handle_get_mirror_info(self
,
186 image_spec
: ImageSpec
,
188 comp
: rados
.Completion
,
189 info
: Dict
[str, Union
[str, int]]) -> None:
190 pool_id
, namespace
, image_id
= image_spec
193 "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format(
194 pool_id
, namespace
, image_id
, comp
.get_return_value(), info
))
196 if comp
.get_return_value() < 0:
197 if comp
.get_return_value() != -errno
.ENOENT
:
199 "error when getting mirror info for {}/{}/{}: {}".format(
200 pool_id
, namespace
, image_id
, comp
.get_return_value()))
201 self
.close_image(image_spec
, image
)
204 if not info
['primary']:
206 "CreateSnapshotRequests.handle_get_mirror_info: {}/{}/{}: {}".format(
207 pool_id
, namespace
, image_id
,
209 self
.close_image(image_spec
, image
)
212 self
.create_snapshot(image_spec
, image
)
214 def create_snapshot(self
, image_spec
: ImageSpec
, image
: rbd
.Image
) -> None:
215 pool_id
, namespace
, image_id
= image_spec
218 "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format(
219 pool_id
, namespace
, image_id
))
221 def cb(comp
: rados
.Completion
, snap_id
: int) -> None:
222 self
.handle_create_snapshot(image_spec
, image
, comp
, snap_id
)
225 image
.aio_mirror_image_create_snapshot(0, cb
)
226 except Exception as e
:
228 "exception when creating snapshot for {}/{}/{}: {}".format(
229 pool_id
, namespace
, image_id
, e
))
230 self
.close_image(image_spec
, image
)
232 def handle_create_snapshot(self
,
233 image_spec
: ImageSpec
,
235 comp
: rados
.Completion
,
236 snap_id
: int) -> None:
237 pool_id
, namespace
, image_id
= image_spec
240 "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format(
241 pool_id
, namespace
, image_id
, comp
.get_return_value(), snap_id
))
243 if comp
.get_return_value() < 0 and \
244 comp
.get_return_value() != -errno
.ENOENT
:
246 "error when creating snapshot for {}/{}/{}: {}".format(
247 pool_id
, namespace
, image_id
, comp
.get_return_value()))
249 self
.close_image(image_spec
, image
)
251 def close_image(self
, image_spec
: ImageSpec
, image
: rbd
.Image
) -> None:
252 pool_id
, namespace
, image_id
= image_spec
255 "CreateSnapshotRequests.close_image {}/{}/{}".format(
256 pool_id
, namespace
, image_id
))
258 def cb(comp
: rados
.Completion
) -> None:
259 self
.handle_close_image(image_spec
, comp
)
263 except Exception as e
:
265 "exception when closing {}/{}/{}: {}".format(
266 pool_id
, namespace
, image_id
, e
))
267 self
.finish(image_spec
)
269 def handle_close_image(self
,
270 image_spec
: ImageSpec
,
271 comp
: rados
.Completion
) -> None:
272 pool_id
, namespace
, image_id
= image_spec
275 "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format(
276 pool_id
, namespace
, image_id
, comp
.get_return_value()))
278 if comp
.get_return_value() < 0:
280 "error when closing {}/{}/{}: {}".format(
281 pool_id
, namespace
, image_id
, comp
.get_return_value()))
283 self
.finish(image_spec
)
285 def finish(self
, image_spec
: ImageSpec
) -> None:
286 pool_id
, namespace
, image_id
= image_spec
288 self
.log
.debug("CreateSnapshotRequests.finish: {}/{}/{}".format(
289 pool_id
, namespace
, image_id
))
291 self
.put_ioctx(image_spec
)
294 self
.pending
.remove(image_spec
)
295 self
.condition
.notify()
298 image_spec
= self
.queue
.pop(0)
300 self
.open_image(image_spec
)
302 def get_ioctx(self
, image_spec
: ImageSpec
) -> rados
.Ioctx
:
303 pool_id
, namespace
, image_id
= image_spec
304 nspec
= (pool_id
, namespace
)
307 ioctx
, images
= self
.ioctxs
.get(nspec
, (None, None))
309 ioctx
= self
.rados
.open_ioctx2(int(pool_id
))
310 ioctx
.set_namespace(namespace
)
312 self
.ioctxs
[nspec
] = (ioctx
, images
)
313 assert images
is not None
314 images
.add(image_spec
)
318 def put_ioctx(self
, image_spec
: ImageSpec
) -> None:
319 pool_id
, namespace
, image_id
= image_spec
320 nspec
= (pool_id
, namespace
)
323 ioctx
, images
= self
.ioctxs
[nspec
]
324 images
.remove(image_spec
)
326 del self
.ioctxs
[nspec
]
329 class MirrorSnapshotScheduleHandler
:
330 MODULE_OPTION_NAME
= "mirror_snapshot_schedule"
331 MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE
= "max_concurrent_snap_create"
332 SCHEDULE_OID
= "rbd_mirror_snapshot_schedule"
333 REFRESH_DELAY_SECONDS
= 60.0
336 condition
= Condition(lock
)
338 def __init__(self
, module
: Any
) -> None:
340 self
.log
= module
.log
341 self
.last_refresh_images
= datetime(1970, 1, 1)
342 self
.create_snapshot_requests
= CreateSnapshotRequests(self
)
344 self
.stop_thread
= False
345 self
.thread
= Thread(target
=self
.run
)
347 def setup(self
) -> None:
348 self
.init_schedule_queue()
351 def shutdown(self
) -> None:
352 self
.log
.info("MirrorSnapshotScheduleHandler: shutting down")
353 self
.stop_thread
= True
354 if self
.thread
.is_alive():
355 self
.log
.debug("MirrorSnapshotScheduleHandler: joining thread")
357 self
.create_snapshot_requests
.wait_for_pending()
358 self
.log
.info("MirrorSnapshotScheduleHandler: shut down")
360 def run(self
) -> None:
362 self
.log
.info("MirrorSnapshotScheduleHandler: starting")
363 while not self
.stop_thread
:
364 refresh_delay
= self
.refresh_images()
366 (image_spec
, wait_time
) = self
.dequeue()
368 self
.condition
.wait(min(wait_time
, refresh_delay
))
370 pool_id
, namespace
, image_id
= image_spec
371 self
.create_snapshot_requests
.add(pool_id
, namespace
, image_id
)
373 self
.enqueue(datetime
.now(), pool_id
, namespace
, image_id
)
375 except (rados
.ConnectionShutdown
, rbd
.ConnectionShutdown
):
376 self
.log
.exception("MirrorSnapshotScheduleHandler: client blocklisted")
377 self
.module
.client_blocklisted
.set()
378 except Exception as ex
:
379 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
380 ex
, traceback
.format_exc()))
382 def init_schedule_queue(self
) -> None:
383 # schedule_time => image_spec
384 self
.queue
: Dict
[str, List
[ImageSpec
]] = {}
385 # pool_id => {namespace => image_id}
386 self
.images
: Dict
[str, Dict
[str, Dict
[str, str]]] = {}
387 self
.schedules
= Schedules(self
)
388 self
.refresh_images()
389 self
.log
.debug("MirrorSnapshotScheduleHandler: queue is initialized")
391 def load_schedules(self
) -> None:
392 self
.log
.info("MirrorSnapshotScheduleHandler: load_schedules")
393 self
.schedules
.load(namespace_validator
, image_validator
)
395 def refresh_images(self
) -> float:
396 elapsed
= (datetime
.now() - self
.last_refresh_images
).total_seconds()
397 if elapsed
< self
.REFRESH_DELAY_SECONDS
:
398 return self
.REFRESH_DELAY_SECONDS
- elapsed
400 self
.log
.debug("MirrorSnapshotScheduleHandler: refresh_images")
403 self
.load_schedules()
404 if not self
.schedules
:
405 self
.log
.debug("MirrorSnapshotScheduleHandler: no schedules")
408 self
.last_refresh_images
= datetime
.now()
409 return self
.REFRESH_DELAY_SECONDS
411 images
: Dict
[str, Dict
[str, Dict
[str, str]]] = {}
413 for pool_id
, pool_name
in get_rbd_pools(self
.module
).items():
414 if not self
.schedules
.intersects(
415 LevelSpec
.from_pool_spec(pool_id
, pool_name
)):
417 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
418 self
.load_pool_images(ioctx
, images
)
421 self
.refresh_queue(images
)
424 self
.last_refresh_images
= datetime
.now()
425 return self
.REFRESH_DELAY_SECONDS
427 def load_pool_images(self
,
429 images
: Dict
[str, Dict
[str, Dict
[str, str]]]) -> None:
430 pool_id
= str(ioctx
.get_pool_id())
431 pool_name
= ioctx
.get_pool_name()
434 self
.log
.debug("load_pool_images: pool={}".format(pool_name
))
437 namespaces
= [''] + rbd
.RBD().namespace_list(ioctx
)
438 for namespace
in namespaces
:
439 if not self
.schedules
.intersects(
440 LevelSpec
.from_pool_spec(int(pool_id
), pool_name
, namespace
)):
442 self
.log
.debug("load_pool_images: pool={}, namespace={}".format(
443 pool_name
, namespace
))
444 images
[pool_id
][namespace
] = {}
445 ioctx
.set_namespace(namespace
)
446 mirror_images
= dict(rbd
.RBD().mirror_image_info_list(
447 ioctx
, rbd
.RBD_MIRROR_IMAGE_MODE_SNAPSHOT
))
448 if not mirror_images
:
451 [(x
['id'], x
['name']) for x
in filter(
452 lambda x
: x
['id'] in mirror_images
,
453 rbd
.RBD().list2(ioctx
))])
454 for image_id
, info
in mirror_images
.items():
455 if not info
['primary']:
457 image_name
= image_names
.get(image_id
)
461 name
= "{}/{}/{}".format(pool_name
, namespace
,
464 name
= "{}/{}".format(pool_name
, image_name
)
466 "load_pool_images: adding image {}".format(name
))
467 images
[pool_id
][namespace
][image_id
] = name
468 except rbd
.ConnectionShutdown
:
470 except Exception as e
:
472 "load_pool_images: exception when scanning pool {}: {}".format(
475 def rebuild_queue(self
) -> None:
478 # don't remove from queue "due" images
479 now_string
= datetime
.strftime(now
, "%Y-%m-%d %H:%M:00")
481 for schedule_time
in list(self
.queue
):
482 if schedule_time
> now_string
:
483 del self
.queue
[schedule_time
]
485 if not self
.schedules
:
488 for pool_id
in self
.images
:
489 for namespace
in self
.images
[pool_id
]:
490 for image_id
in self
.images
[pool_id
][namespace
]:
491 self
.enqueue(now
, pool_id
, namespace
, image_id
)
493 self
.condition
.notify()
495 def refresh_queue(self
,
496 current_images
: Dict
[str, Dict
[str, Dict
[str, str]]]) -> None:
499 for pool_id
in self
.images
:
500 for namespace
in self
.images
[pool_id
]:
501 for image_id
in self
.images
[pool_id
][namespace
]:
502 if pool_id
not in current_images
or \
503 namespace
not in current_images
[pool_id
] or \
504 image_id
not in current_images
[pool_id
][namespace
]:
505 self
.remove_from_queue(pool_id
, namespace
, image_id
)
507 for pool_id
in current_images
:
508 for namespace
in current_images
[pool_id
]:
509 for image_id
in current_images
[pool_id
][namespace
]:
510 if pool_id
not in self
.images
or \
511 namespace
not in self
.images
[pool_id
] or \
512 image_id
not in self
.images
[pool_id
][namespace
]:
513 self
.enqueue(now
, pool_id
, namespace
, image_id
)
515 self
.condition
.notify()
517 def enqueue(self
, now
: datetime
, pool_id
: str, namespace
: str, image_id
: str) -> None:
518 schedule
= self
.schedules
.find(pool_id
, namespace
, image_id
)
521 "MirrorSnapshotScheduleHandler: no schedule for {}/{}/{}".format(
522 pool_id
, namespace
, image_id
))
525 schedule_time
= schedule
.next_run(now
)
526 if schedule_time
not in self
.queue
:
527 self
.queue
[schedule_time
] = []
529 "MirrorSnapshotScheduleHandler: scheduling {}/{}/{} at {}".format(
530 pool_id
, namespace
, image_id
, schedule_time
))
531 image_spec
= ImageSpec(pool_id
, namespace
, image_id
)
532 if image_spec
not in self
.queue
[schedule_time
]:
533 self
.queue
[schedule_time
].append(image_spec
)
535 def dequeue(self
) -> Tuple
[Optional
[ImageSpec
], float]:
540 schedule_time
= sorted(self
.queue
)[0]
542 if datetime
.strftime(now
, "%Y-%m-%d %H:%M:%S") < schedule_time
:
543 wait_time
= (datetime
.strptime(schedule_time
,
544 "%Y-%m-%d %H:%M:%S") - now
)
545 return None, wait_time
.total_seconds()
547 images
= self
.queue
[schedule_time
]
548 image
= images
.pop(0)
550 del self
.queue
[schedule_time
]
553 def remove_from_queue(self
, pool_id
: str, namespace
: str, image_id
: str) -> None:
555 "MirrorSnapshotScheduleHandler: descheduling {}/{}/{}".format(
556 pool_id
, namespace
, image_id
))
559 image_spec
= ImageSpec(pool_id
, namespace
, image_id
)
560 for schedule_time
, images
in self
.queue
.items():
561 if image_spec
in images
:
562 images
.remove(image_spec
)
564 empty_slots
.append(schedule_time
)
565 for schedule_time
in empty_slots
:
566 del self
.queue
[schedule_time
]
568 def add_schedule(self
,
569 level_spec
: LevelSpec
,
571 start_time
: Optional
[str]) -> Tuple
[int, str, str]:
573 "MirrorSnapshotScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
574 level_spec
.name
, interval
, start_time
))
576 # TODO: optimize to rebuild only affected part of the queue
578 self
.schedules
.add(level_spec
, interval
, start_time
)
582 def remove_schedule(self
,
583 level_spec
: LevelSpec
,
584 interval
: Optional
[str],
585 start_time
: Optional
[str]) -> Tuple
[int, str, str]:
587 "MirrorSnapshotScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
588 level_spec
.name
, interval
, start_time
))
590 # TODO: optimize to rebuild only affected part of the queue
592 self
.schedules
.remove(level_spec
, interval
, start_time
)
596 def list(self
, level_spec
: LevelSpec
) -> Tuple
[int, str, str]:
598 "MirrorSnapshotScheduleHandler: list: level_spec={}".format(
602 result
= self
.schedules
.to_list(level_spec
)
604 return 0, json
.dumps(result
, indent
=4, sort_keys
=True), ""
606 def status(self
, level_spec
: LevelSpec
) -> Tuple
[int, str, str]:
608 "MirrorSnapshotScheduleHandler: status: level_spec={}".format(
611 scheduled_images
= []
613 for schedule_time
in sorted(self
.queue
):
614 for pool_id
, namespace
, image_id
in self
.queue
[schedule_time
]:
615 if not level_spec
.matches(pool_id
, namespace
, image_id
):
617 image_name
= self
.images
[pool_id
][namespace
][image_id
]
618 scheduled_images
.append({
619 'schedule_time': schedule_time
,
622 return 0, json
.dumps({'scheduled_images': scheduled_images
},
623 indent
=4, sort_keys
=True), ""