7 from datetime
import datetime
8 from threading
import Condition
, Lock
, Thread
9 from typing
import Any
, Dict
, List
, NamedTuple
, Optional
, Set
, Tuple
, Union
11 from .common
import get_rbd_pools
12 from .schedule
import LevelSpec
, Schedules
15 def namespace_validator(ioctx
: rados
.Ioctx
) -> None:
16 mode
= rbd
.RBD().mirror_mode_get(ioctx
)
17 if mode
!= rbd
.RBD_MIRROR_MODE_IMAGE
:
18 raise ValueError("namespace {} is not in mirror image mode".format(
19 ioctx
.get_namespace()))
22 def image_validator(image
: rbd
.Image
) -> None:
23 mode
= image
.mirror_image_get_mode()
24 if mode
!= rbd
.RBD_MIRROR_IMAGE_MODE_SNAPSHOT
:
25 raise rbd
.InvalidArgument("Invalid mirror image mode")
28 class ImageSpec(NamedTuple
):
34 class CreateSnapshotRequests
:
36 def __init__(self
, handler
: Any
) -> None:
38 self
.condition
= Condition(self
.lock
)
39 self
.handler
= handler
40 self
.rados
= handler
.module
.rados
41 self
.log
= handler
.log
42 self
.pending
: Set
[ImageSpec
] = set()
43 self
.queue
: List
[ImageSpec
] = []
44 self
.ioctxs
: Dict
[Tuple
[str, str], Tuple
[rados
.Ioctx
, Set
[ImageSpec
]]] = {}
46 def wait_for_pending(self
) -> None:
50 "CreateSnapshotRequests.wait_for_pending: "
51 "{} images".format(len(self
.pending
)))
53 self
.log
.debug("CreateSnapshotRequests.wait_for_pending: done")
55 def add(self
, pool_id
: str, namespace
: str, image_id
: str) -> None:
56 image_spec
= ImageSpec(pool_id
, namespace
, image_id
)
58 self
.log
.debug("CreateSnapshotRequests.add: {}/{}/{}".format(
59 pool_id
, namespace
, image_id
))
61 max_concurrent
= self
.handler
.module
.get_localized_module_option(
62 self
.handler
.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE
)
65 if image_spec
in self
.pending
:
67 "CreateSnapshotRequests.add: {}/{}/{}: {}".format(
68 pool_id
, namespace
, image_id
,
69 "previous request is still in progress"))
71 self
.pending
.add(image_spec
)
73 if len(self
.pending
) > max_concurrent
:
74 self
.queue
.append(image_spec
)
77 self
.open_image(image_spec
)
79 def open_image(self
, image_spec
: ImageSpec
) -> None:
80 pool_id
, namespace
, image_id
= image_spec
82 self
.log
.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format(
83 pool_id
, namespace
, image_id
))
86 ioctx
= self
.get_ioctx(image_spec
)
88 def cb(comp
: rados
.Completion
, image
: rbd
.Image
) -> None:
89 self
.handle_open_image(image_spec
, comp
, image
)
91 rbd
.RBD().aio_open_image(cb
, ioctx
, image_id
=image_id
)
92 except Exception as e
:
94 "exception when opening {}/{}/{}: {}".format(
95 pool_id
, namespace
, image_id
, e
))
96 self
.finish(image_spec
)
98 def handle_open_image(self
,
99 image_spec
: ImageSpec
,
100 comp
: rados
.Completion
,
101 image
: rbd
.Image
) -> None:
102 pool_id
, namespace
, image_id
= image_spec
105 "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format(
106 pool_id
, namespace
, image_id
, comp
.get_return_value()))
108 if comp
.get_return_value() < 0:
109 if comp
.get_return_value() != -errno
.ENOENT
:
111 "error when opening {}/{}/{}: {}".format(
112 pool_id
, namespace
, image_id
, comp
.get_return_value()))
113 self
.finish(image_spec
)
116 self
.get_mirror_mode(image_spec
, image
)
118 def get_mirror_mode(self
, image_spec
: ImageSpec
, image
: rbd
.Image
) -> None:
119 pool_id
, namespace
, image_id
= image_spec
121 self
.log
.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format(
122 pool_id
, namespace
, image_id
))
124 def cb(comp
: rados
.Completion
, mode
: Optional
[int]) -> None:
125 self
.handle_get_mirror_mode(image_spec
, image
, comp
, mode
)
128 image
.aio_mirror_image_get_mode(cb
)
129 except Exception as e
:
131 "exception when getting mirror mode for {}/{}/{}: {}".format(
132 pool_id
, namespace
, image_id
, e
))
133 self
.close_image(image_spec
, image
)
135 def handle_get_mirror_mode(self
,
136 image_spec
: ImageSpec
,
138 comp
: rados
.Completion
,
139 mode
: Optional
[int]) -> None:
140 pool_id
, namespace
, image_id
= image_spec
143 "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format(
144 pool_id
, namespace
, image_id
, comp
.get_return_value(), mode
))
147 if comp
.get_return_value() != -errno
.ENOENT
:
149 "error when getting mirror mode for {}/{}/{}: {}".format(
150 pool_id
, namespace
, image_id
, comp
.get_return_value()))
151 self
.close_image(image_spec
, image
)
154 if mode
!= rbd
.RBD_MIRROR_IMAGE_MODE_SNAPSHOT
:
156 "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format(
157 pool_id
, namespace
, image_id
,
158 "snapshot mirroring is not enabled"))
159 self
.close_image(image_spec
, image
)
162 self
.get_mirror_info(image_spec
, image
)
164 def get_mirror_info(self
, image_spec
: ImageSpec
, image
: rbd
.Image
) -> None:
165 pool_id
, namespace
, image_id
= image_spec
167 self
.log
.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
168 pool_id
, namespace
, image_id
))
170 def cb(comp
: rados
.Completion
, info
: Optional
[Dict
[str, Union
[str, int]]]) -> None:
171 self
.handle_get_mirror_info(image_spec
, image
, comp
, info
)
174 image
.aio_mirror_image_get_info(cb
)
175 except Exception as e
:
177 "exception when getting mirror info for {}/{}/{}: {}".format(
178 pool_id
, namespace
, image_id
, e
))
179 self
.close_image(image_spec
, image
)
181 def handle_get_mirror_info(self
,
182 image_spec
: ImageSpec
,
184 comp
: rados
.Completion
,
185 info
: Optional
[Dict
[str, Union
[str, int]]]) -> None:
186 pool_id
, namespace
, image_id
= image_spec
189 "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format(
190 pool_id
, namespace
, image_id
, comp
.get_return_value(), info
))
193 if comp
.get_return_value() != -errno
.ENOENT
:
195 "error when getting mirror info for {}/{}/{}: {}".format(
196 pool_id
, namespace
, image_id
, comp
.get_return_value()))
197 self
.close_image(image_spec
, image
)
200 if not info
['primary']:
202 "CreateSnapshotRequests.handle_get_mirror_info: {}/{}/{}: {}".format(
203 pool_id
, namespace
, image_id
,
205 self
.close_image(image_spec
, image
)
208 self
.create_snapshot(image_spec
, image
)
210 def create_snapshot(self
, image_spec
: ImageSpec
, image
: rbd
.Image
) -> None:
211 pool_id
, namespace
, image_id
= image_spec
214 "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format(
215 pool_id
, namespace
, image_id
))
217 def cb(comp
: rados
.Completion
, snap_id
: Optional
[int]) -> None:
218 self
.handle_create_snapshot(image_spec
, image
, comp
, snap_id
)
221 image
.aio_mirror_image_create_snapshot(0, cb
)
222 except Exception as e
:
224 "exception when creating snapshot for {}/{}/{}: {}".format(
225 pool_id
, namespace
, image_id
, e
))
226 self
.close_image(image_spec
, image
)
228 def handle_create_snapshot(self
,
229 image_spec
: ImageSpec
,
231 comp
: rados
.Completion
,
232 snap_id
: Optional
[int]) -> None:
233 pool_id
, namespace
, image_id
= image_spec
236 "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format(
237 pool_id
, namespace
, image_id
, comp
.get_return_value(), snap_id
))
239 if snap_id
is None and comp
.get_return_value() != -errno
.ENOENT
:
241 "error when creating snapshot for {}/{}/{}: {}".format(
242 pool_id
, namespace
, image_id
, comp
.get_return_value()))
244 self
.close_image(image_spec
, image
)
246 def close_image(self
, image_spec
: ImageSpec
, image
: rbd
.Image
) -> None:
247 pool_id
, namespace
, image_id
= image_spec
250 "CreateSnapshotRequests.close_image {}/{}/{}".format(
251 pool_id
, namespace
, image_id
))
253 def cb(comp
: rados
.Completion
) -> None:
254 self
.handle_close_image(image_spec
, comp
)
258 except Exception as e
:
260 "exception when closing {}/{}/{}: {}".format(
261 pool_id
, namespace
, image_id
, e
))
262 self
.finish(image_spec
)
264 def handle_close_image(self
,
265 image_spec
: ImageSpec
,
266 comp
: rados
.Completion
) -> None:
267 pool_id
, namespace
, image_id
= image_spec
270 "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format(
271 pool_id
, namespace
, image_id
, comp
.get_return_value()))
273 if comp
.get_return_value() < 0:
275 "error when closing {}/{}/{}: {}".format(
276 pool_id
, namespace
, image_id
, comp
.get_return_value()))
278 self
.finish(image_spec
)
280 def finish(self
, image_spec
: ImageSpec
) -> None:
281 pool_id
, namespace
, image_id
= image_spec
283 self
.log
.debug("CreateSnapshotRequests.finish: {}/{}/{}".format(
284 pool_id
, namespace
, image_id
))
286 self
.put_ioctx(image_spec
)
289 self
.pending
.remove(image_spec
)
290 self
.condition
.notify()
293 image_spec
= self
.queue
.pop(0)
295 self
.open_image(image_spec
)
297 def get_ioctx(self
, image_spec
: ImageSpec
) -> rados
.Ioctx
:
298 pool_id
, namespace
, image_id
= image_spec
299 nspec
= (pool_id
, namespace
)
302 ioctx
, images
= self
.ioctxs
.get(nspec
, (None, None))
304 ioctx
= self
.rados
.open_ioctx2(int(pool_id
))
305 ioctx
.set_namespace(namespace
)
307 self
.ioctxs
[nspec
] = (ioctx
, images
)
308 assert images
is not None
309 images
.add(image_spec
)
313 def put_ioctx(self
, image_spec
: ImageSpec
) -> None:
314 pool_id
, namespace
, image_id
= image_spec
315 nspec
= (pool_id
, namespace
)
318 ioctx
, images
= self
.ioctxs
[nspec
]
319 images
.remove(image_spec
)
321 del self
.ioctxs
[nspec
]
324 class MirrorSnapshotScheduleHandler
:
325 MODULE_OPTION_NAME
= "mirror_snapshot_schedule"
326 MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE
= "max_concurrent_snap_create"
327 SCHEDULE_OID
= "rbd_mirror_snapshot_schedule"
328 REFRESH_DELAY_SECONDS
= 60.0
330 def __init__(self
, module
: Any
) -> None:
332 self
.condition
= Condition(self
.lock
)
334 self
.log
= module
.log
335 self
.last_refresh_images
= datetime(1970, 1, 1)
336 self
.create_snapshot_requests
= CreateSnapshotRequests(self
)
338 self
.stop_thread
= False
339 self
.thread
= Thread(target
=self
.run
)
341 def setup(self
) -> None:
342 self
.init_schedule_queue()
345 def shutdown(self
) -> None:
346 self
.log
.info("MirrorSnapshotScheduleHandler: shutting down")
347 self
.stop_thread
= True
348 if self
.thread
.is_alive():
349 self
.log
.debug("MirrorSnapshotScheduleHandler: joining thread")
351 self
.create_snapshot_requests
.wait_for_pending()
352 self
.log
.info("MirrorSnapshotScheduleHandler: shut down")
354 def run(self
) -> None:
356 self
.log
.info("MirrorSnapshotScheduleHandler: starting")
357 while not self
.stop_thread
:
358 refresh_delay
= self
.refresh_images()
360 (image_spec
, wait_time
) = self
.dequeue()
362 self
.condition
.wait(min(wait_time
, refresh_delay
))
364 pool_id
, namespace
, image_id
= image_spec
365 self
.create_snapshot_requests
.add(pool_id
, namespace
, image_id
)
367 self
.enqueue(datetime
.now(), pool_id
, namespace
, image_id
)
369 except (rados
.ConnectionShutdown
, rbd
.ConnectionShutdown
):
370 self
.log
.exception("MirrorSnapshotScheduleHandler: client blocklisted")
371 self
.module
.client_blocklisted
.set()
372 except Exception as ex
:
373 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
374 ex
, traceback
.format_exc()))
376 def init_schedule_queue(self
) -> None:
377 # schedule_time => image_spec
378 self
.queue
: Dict
[str, List
[ImageSpec
]] = {}
379 # pool_id => {namespace => image_id}
380 self
.images
: Dict
[str, Dict
[str, Dict
[str, str]]] = {}
381 self
.schedules
= Schedules(self
)
382 self
.refresh_images()
383 self
.log
.debug("MirrorSnapshotScheduleHandler: queue is initialized")
385 def load_schedules(self
) -> None:
386 self
.log
.info("MirrorSnapshotScheduleHandler: load_schedules")
387 self
.schedules
.load(namespace_validator
, image_validator
)
389 def refresh_images(self
) -> float:
390 elapsed
= (datetime
.now() - self
.last_refresh_images
).total_seconds()
391 if elapsed
< self
.REFRESH_DELAY_SECONDS
:
392 return self
.REFRESH_DELAY_SECONDS
- elapsed
394 self
.log
.debug("MirrorSnapshotScheduleHandler: refresh_images")
397 self
.load_schedules()
398 if not self
.schedules
:
399 self
.log
.debug("MirrorSnapshotScheduleHandler: no schedules")
402 self
.last_refresh_images
= datetime
.now()
403 return self
.REFRESH_DELAY_SECONDS
405 images
: Dict
[str, Dict
[str, Dict
[str, str]]] = {}
407 for pool_id
, pool_name
in get_rbd_pools(self
.module
).items():
408 if not self
.schedules
.intersects(
409 LevelSpec
.from_pool_spec(pool_id
, pool_name
)):
411 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
412 self
.load_pool_images(ioctx
, images
)
415 self
.refresh_queue(images
)
418 self
.last_refresh_images
= datetime
.now()
419 return self
.REFRESH_DELAY_SECONDS
421 def load_pool_images(self
,
423 images
: Dict
[str, Dict
[str, Dict
[str, str]]]) -> None:
424 pool_id
= str(ioctx
.get_pool_id())
425 pool_name
= ioctx
.get_pool_name()
428 self
.log
.debug("load_pool_images: pool={}".format(pool_name
))
431 namespaces
= [''] + rbd
.RBD().namespace_list(ioctx
)
432 for namespace
in namespaces
:
433 if not self
.schedules
.intersects(
434 LevelSpec
.from_pool_spec(int(pool_id
), pool_name
, namespace
)):
436 self
.log
.debug("load_pool_images: pool={}, namespace={}".format(
437 pool_name
, namespace
))
438 images
[pool_id
][namespace
] = {}
439 ioctx
.set_namespace(namespace
)
440 mirror_images
= dict(rbd
.RBD().mirror_image_info_list(
441 ioctx
, rbd
.RBD_MIRROR_IMAGE_MODE_SNAPSHOT
))
442 if not mirror_images
:
445 [(x
['id'], x
['name']) for x
in filter(
446 lambda x
: x
['id'] in mirror_images
,
447 rbd
.RBD().list2(ioctx
))])
448 for image_id
, info
in mirror_images
.items():
449 if not info
['primary']:
451 image_name
= image_names
.get(image_id
)
455 name
= "{}/{}/{}".format(pool_name
, namespace
,
458 name
= "{}/{}".format(pool_name
, image_name
)
460 "load_pool_images: adding image {}".format(name
))
461 images
[pool_id
][namespace
][image_id
] = name
462 except rbd
.ConnectionShutdown
:
464 except Exception as e
:
466 "load_pool_images: exception when scanning pool {}: {}".format(
469 def rebuild_queue(self
) -> None:
472 # don't remove from queue "due" images
473 now_string
= datetime
.strftime(now
, "%Y-%m-%d %H:%M:00")
475 for schedule_time
in list(self
.queue
):
476 if schedule_time
> now_string
:
477 del self
.queue
[schedule_time
]
479 if not self
.schedules
:
482 for pool_id
in self
.images
:
483 for namespace
in self
.images
[pool_id
]:
484 for image_id
in self
.images
[pool_id
][namespace
]:
485 self
.enqueue(now
, pool_id
, namespace
, image_id
)
487 self
.condition
.notify()
489 def refresh_queue(self
,
490 current_images
: Dict
[str, Dict
[str, Dict
[str, str]]]) -> None:
493 for pool_id
in self
.images
:
494 for namespace
in self
.images
[pool_id
]:
495 for image_id
in self
.images
[pool_id
][namespace
]:
496 if pool_id
not in current_images
or \
497 namespace
not in current_images
[pool_id
] or \
498 image_id
not in current_images
[pool_id
][namespace
]:
499 self
.remove_from_queue(pool_id
, namespace
, image_id
)
501 for pool_id
in current_images
:
502 for namespace
in current_images
[pool_id
]:
503 for image_id
in current_images
[pool_id
][namespace
]:
504 if pool_id
not in self
.images
or \
505 namespace
not in self
.images
[pool_id
] or \
506 image_id
not in self
.images
[pool_id
][namespace
]:
507 self
.enqueue(now
, pool_id
, namespace
, image_id
)
509 self
.condition
.notify()
511 def enqueue(self
, now
: datetime
, pool_id
: str, namespace
: str, image_id
: str) -> None:
512 schedule
= self
.schedules
.find(pool_id
, namespace
, image_id
)
515 "MirrorSnapshotScheduleHandler: no schedule for {}/{}/{}".format(
516 pool_id
, namespace
, image_id
))
519 schedule_time
= schedule
.next_run(now
)
520 if schedule_time
not in self
.queue
:
521 self
.queue
[schedule_time
] = []
523 "MirrorSnapshotScheduleHandler: scheduling {}/{}/{} at {}".format(
524 pool_id
, namespace
, image_id
, schedule_time
))
525 image_spec
= ImageSpec(pool_id
, namespace
, image_id
)
526 if image_spec
not in self
.queue
[schedule_time
]:
527 self
.queue
[schedule_time
].append(image_spec
)
529 def dequeue(self
) -> Tuple
[Optional
[ImageSpec
], float]:
534 schedule_time
= sorted(self
.queue
)[0]
536 if datetime
.strftime(now
, "%Y-%m-%d %H:%M:%S") < schedule_time
:
537 wait_time
= (datetime
.strptime(schedule_time
,
538 "%Y-%m-%d %H:%M:%S") - now
)
539 return None, wait_time
.total_seconds()
541 images
= self
.queue
[schedule_time
]
542 image
= images
.pop(0)
544 del self
.queue
[schedule_time
]
547 def remove_from_queue(self
, pool_id
: str, namespace
: str, image_id
: str) -> None:
549 "MirrorSnapshotScheduleHandler: descheduling {}/{}/{}".format(
550 pool_id
, namespace
, image_id
))
553 image_spec
= ImageSpec(pool_id
, namespace
, image_id
)
554 for schedule_time
, images
in self
.queue
.items():
555 if image_spec
in images
:
556 images
.remove(image_spec
)
558 empty_slots
.append(schedule_time
)
559 for schedule_time
in empty_slots
:
560 del self
.queue
[schedule_time
]
562 def add_schedule(self
,
563 level_spec
: LevelSpec
,
565 start_time
: Optional
[str]) -> Tuple
[int, str, str]:
567 "MirrorSnapshotScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
568 level_spec
.name
, interval
, start_time
))
570 # TODO: optimize to rebuild only affected part of the queue
572 self
.schedules
.add(level_spec
, interval
, start_time
)
576 def remove_schedule(self
,
577 level_spec
: LevelSpec
,
578 interval
: Optional
[str],
579 start_time
: Optional
[str]) -> Tuple
[int, str, str]:
581 "MirrorSnapshotScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
582 level_spec
.name
, interval
, start_time
))
584 # TODO: optimize to rebuild only affected part of the queue
586 self
.schedules
.remove(level_spec
, interval
, start_time
)
590 def list(self
, level_spec
: LevelSpec
) -> Tuple
[int, str, str]:
592 "MirrorSnapshotScheduleHandler: list: level_spec={}".format(
596 result
= self
.schedules
.to_list(level_spec
)
598 return 0, json
.dumps(result
, indent
=4, sort_keys
=True), ""
600 def status(self
, level_spec
: LevelSpec
) -> Tuple
[int, str, str]:
602 "MirrorSnapshotScheduleHandler: status: level_spec={}".format(
605 scheduled_images
= []
607 for schedule_time
in sorted(self
.queue
):
608 for pool_id
, namespace
, image_id
in self
.queue
[schedule_time
]:
609 if not level_spec
.matches(pool_id
, namespace
, image_id
):
611 image_name
= self
.images
[pool_id
][namespace
][image_id
]
612 scheduled_images
.append({
613 'schedule_time': schedule_time
,
616 return 0, json
.dumps({'scheduled_images': scheduled_images
},
617 indent
=4, sort_keys
=True), ""