8 from datetime
import datetime
9 from threading
import Condition
, Lock
, Thread
10 from typing
import Any
, Dict
, List
, NamedTuple
, Optional
, Sequence
, Set
, Tuple
, Union
12 from .common
import get_rbd_pools
13 from .schedule
import LevelSpec
, Interval
, StartTime
, Schedule
, Schedules
15 def namespace_validator(ioctx
: rados
.Ioctx
) -> None:
16 mode
= rbd
.RBD().mirror_mode_get(ioctx
)
17 if mode
!= rbd
.RBD_MIRROR_MODE_IMAGE
:
18 raise ValueError("namespace {} is not in mirror image mode".format(
19 ioctx
.get_namespace()))
21 def image_validator(image
: rbd
.Image
) -> None:
22 mode
= image
.mirror_image_get_mode()
23 if mode
!= rbd
.RBD_MIRROR_IMAGE_MODE_SNAPSHOT
:
24 raise rbd
.InvalidArgument("Invalid mirror image mode")
27 class ImageSpec(NamedTuple
):
33 class CreateSnapshotRequests
:
36 condition
= Condition(lock
)
38 def __init__(self
, handler
: Any
) -> None:
39 self
.handler
= handler
40 self
.rados
= handler
.module
.rados
41 self
.log
= handler
.log
42 self
.pending
: Set
[ImageSpec
] = set()
43 self
.queue
: List
[ImageSpec
] = []
44 self
.ioctxs
: Dict
[Tuple
[str, str], Tuple
[rados
.Ioctx
, Set
[ImageSpec
]]] = {}
46 def __del__(self
) -> None:
47 self
.wait_for_pending()
49 def wait_for_pending(self
) -> None:
54 def add(self
, pool_id
: str, namespace
: str, image_id
: str) -> None:
55 image_spec
= ImageSpec(pool_id
, namespace
, image_id
)
57 self
.log
.debug("CreateSnapshotRequests.add: {}/{}/{}".format(
58 pool_id
, namespace
, image_id
))
60 max_concurrent
= self
.handler
.module
.get_localized_module_option(
61 self
.handler
.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE
)
64 if image_spec
in self
.pending
:
66 "CreateSnapshotRequests.add: {}/{}/{}: {}".format(
67 pool_id
, namespace
, image_id
,
68 "previous request is still in progress"))
70 self
.pending
.add(image_spec
)
72 if len(self
.pending
) > max_concurrent
:
73 self
.queue
.append(image_spec
)
76 self
.open_image(image_spec
)
78 def open_image(self
, image_spec
: ImageSpec
) -> None:
79 pool_id
, namespace
, image_id
= image_spec
81 self
.log
.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format(
82 pool_id
, namespace
, image_id
))
85 ioctx
= self
.get_ioctx(image_spec
)
87 def cb(comp
: rados
.Completion
, image
: rbd
.Image
) -> None:
88 self
.handle_open_image(image_spec
, comp
, image
)
90 rbd
.RBD().aio_open_image(cb
, ioctx
, image_id
=image_id
)
91 except Exception as e
:
93 "exception when opening {}/{}/{}: {}".format(
94 pool_id
, namespace
, image_id
, e
))
95 self
.finish(image_spec
)
97 def handle_open_image(self
,
98 image_spec
: ImageSpec
,
99 comp
: rados
.Completion
,
100 image
: rbd
.Image
) -> None:
101 pool_id
, namespace
, image_id
= image_spec
104 "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format(
105 pool_id
, namespace
, image_id
, comp
.get_return_value()))
107 if comp
.get_return_value() < 0:
108 if comp
.get_return_value() != -errno
.ENOENT
:
110 "error when opening {}/{}/{}: {}".format(
111 pool_id
, namespace
, image_id
, comp
.get_return_value()))
112 self
.finish(image_spec
)
115 self
.get_mirror_mode(image_spec
, image
)
117 def get_mirror_mode(self
, image_spec
: ImageSpec
, image
: rbd
.Image
) -> None:
118 pool_id
, namespace
, image_id
= image_spec
120 self
.log
.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format(
121 pool_id
, namespace
, image_id
))
123 def cb(comp
: rados
.Completion
, mode
: int) -> None:
124 self
.handle_get_mirror_mode(image_spec
, image
, comp
, mode
)
127 image
.aio_mirror_image_get_mode(cb
)
128 except Exception as e
:
130 "exception when getting mirror mode for {}/{}/{}: {}".format(
131 pool_id
, namespace
, image_id
, e
))
132 self
.close_image(image_spec
, image
)
134 def handle_get_mirror_mode(self
,
135 image_spec
: ImageSpec
,
137 comp
: rados
.Completion
,
139 pool_id
, namespace
, image_id
= image_spec
142 "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format(
143 pool_id
, namespace
, image_id
, comp
.get_return_value(), mode
))
145 if comp
.get_return_value() < 0:
146 if comp
.get_return_value() != -errno
.ENOENT
:
148 "error when getting mirror mode for {}/{}/{}: {}".format(
149 pool_id
, namespace
, image_id
, comp
.get_return_value()))
150 self
.close_image(image_spec
, image
)
153 if mode
!= rbd
.RBD_MIRROR_IMAGE_MODE_SNAPSHOT
:
155 "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format(
156 pool_id
, namespace
, image_id
,
157 "snapshot mirroring is not enabled"))
158 self
.close_image(image_spec
, image
)
161 self
.get_mirror_info(image_spec
, image
)
163 def get_mirror_info(self
, image_spec
: ImageSpec
, image
: rbd
.Image
) -> None:
164 pool_id
, namespace
, image_id
= image_spec
166 self
.log
.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
167 pool_id
, namespace
, image_id
))
169 def cb(comp
: rados
.Completion
, info
: Dict
[str, Union
[str, int]]) -> None:
170 self
.handle_get_mirror_info(image_spec
, image
, comp
, info
)
173 image
.aio_mirror_image_get_info(cb
)
174 except Exception as e
:
176 "exception when getting mirror info for {}/{}/{}: {}".format(
177 pool_id
, namespace
, image_id
, e
))
178 self
.close_image(image_spec
, image
)
180 def handle_get_mirror_info(self
,
181 image_spec
: ImageSpec
,
183 comp
: rados
.Completion
,
184 info
: Dict
[str, Union
[str, int]]) -> None:
185 pool_id
, namespace
, image_id
= image_spec
188 "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format(
189 pool_id
, namespace
, image_id
, comp
.get_return_value(), info
))
191 if comp
.get_return_value() < 0:
192 if comp
.get_return_value() != -errno
.ENOENT
:
194 "error when getting mirror info for {}/{}/{}: {}".format(
195 pool_id
, namespace
, image_id
, comp
.get_return_value()))
196 self
.close_image(image_spec
, image
)
199 if not info
['primary']:
201 "CreateSnapshotRequests.handle_get_mirror_info: {}/{}/{}: {}".format(
202 pool_id
, namespace
, image_id
,
204 self
.close_image(image_spec
, image
)
207 self
.create_snapshot(image_spec
, image
)
209 def create_snapshot(self
, image_spec
: ImageSpec
, image
: rbd
.Image
) -> None:
210 pool_id
, namespace
, image_id
= image_spec
213 "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format(
214 pool_id
, namespace
, image_id
))
216 def cb(comp
: rados
.Completion
, snap_id
: int) -> None:
217 self
.handle_create_snapshot(image_spec
, image
, comp
, snap_id
)
220 image
.aio_mirror_image_create_snapshot(0, cb
)
221 except Exception as e
:
223 "exception when creating snapshot for {}/{}/{}: {}".format(
224 pool_id
, namespace
, image_id
, e
))
225 self
.close_image(image_spec
, image
)
228 def handle_create_snapshot(self
,
229 image_spec
: ImageSpec
,
231 comp
: rados
.Completion
,
232 snap_id
: int) -> None:
233 pool_id
, namespace
, image_id
= image_spec
236 "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format(
237 pool_id
, namespace
, image_id
, comp
.get_return_value(), snap_id
))
239 if comp
.get_return_value() < 0 and \
240 comp
.get_return_value() != -errno
.ENOENT
:
242 "error when creating snapshot for {}/{}/{}: {}".format(
243 pool_id
, namespace
, image_id
, comp
.get_return_value()))
245 self
.close_image(image_spec
, image
)
247 def close_image(self
, image_spec
: ImageSpec
, image
: rbd
.Image
) -> None:
248 pool_id
, namespace
, image_id
= image_spec
251 "CreateSnapshotRequests.close_image {}/{}/{}".format(
252 pool_id
, namespace
, image_id
))
254 def cb(comp
: rados
.Completion
) -> None:
255 self
.handle_close_image(image_spec
, comp
)
259 except Exception as e
:
261 "exception when closing {}/{}/{}: {}".format(
262 pool_id
, namespace
, image_id
, e
))
263 self
.finish(image_spec
)
265 def handle_close_image(self
,
266 image_spec
: ImageSpec
,
267 comp
: rados
.Completion
) -> None:
268 pool_id
, namespace
, image_id
= image_spec
271 "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format(
272 pool_id
, namespace
, image_id
, comp
.get_return_value()))
274 if comp
.get_return_value() < 0:
276 "error when closing {}/{}/{}: {}".format(
277 pool_id
, namespace
, image_id
, comp
.get_return_value()))
279 self
.finish(image_spec
)
281 def finish(self
, image_spec
: ImageSpec
) -> None:
282 pool_id
, namespace
, image_id
= image_spec
284 self
.log
.debug("CreateSnapshotRequests.finish: {}/{}/{}".format(
285 pool_id
, namespace
, image_id
))
287 self
.put_ioctx(image_spec
)
290 self
.pending
.remove(image_spec
)
293 image_spec
= self
.queue
.pop(0)
295 self
.open_image(image_spec
)
297 def get_ioctx(self
, image_spec
: ImageSpec
) -> rados
.Ioctx
:
298 pool_id
, namespace
, image_id
= image_spec
299 nspec
= (pool_id
, namespace
)
302 ioctx
, images
= self
.ioctxs
.get(nspec
, (None, None))
304 ioctx
= self
.rados
.open_ioctx2(int(pool_id
))
305 ioctx
.set_namespace(namespace
)
307 self
.ioctxs
[nspec
] = (ioctx
, images
)
308 assert images
is not None
309 images
.add(image_spec
)
313 def put_ioctx(self
, image_spec
: ImageSpec
) -> None:
314 pool_id
, namespace
, image_id
= image_spec
315 nspec
= (pool_id
, namespace
)
318 ioctx
, images
= self
.ioctxs
[nspec
]
319 images
.remove(image_spec
)
321 del self
.ioctxs
[nspec
]
324 class MirrorSnapshotScheduleHandler
:
325 MODULE_OPTION_NAME
= "mirror_snapshot_schedule"
326 MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE
= "max_concurrent_snap_create"
327 SCHEDULE_OID
= "rbd_mirror_snapshot_schedule"
328 REFRESH_DELAY_SECONDS
= 60.0
331 condition
= Condition(lock
)
334 def __init__(self
, module
: Any
) -> None:
336 self
.log
= module
.log
337 self
.last_refresh_images
= datetime(1970, 1, 1)
338 self
.create_snapshot_requests
= CreateSnapshotRequests(self
)
340 self
.init_schedule_queue()
342 self
.thread
= Thread(target
=self
.run
)
345 def _cleanup(self
) -> None:
346 self
.create_snapshot_requests
.wait_for_pending()
348 def run(self
) -> None:
350 self
.log
.info("MirrorSnapshotScheduleHandler: starting")
352 refresh_delay
= self
.refresh_images()
354 (image_spec
, wait_time
) = self
.dequeue()
356 self
.condition
.wait(min(wait_time
, refresh_delay
))
358 pool_id
, namespace
, image_id
= image_spec
359 self
.create_snapshot_requests
.add(pool_id
, namespace
, image_id
)
361 self
.enqueue(datetime
.now(), pool_id
, namespace
, image_id
)
363 except Exception as ex
:
364 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
365 ex
, traceback
.format_exc()))
367 def init_schedule_queue(self
) -> None:
368 # schedule_time => image_spec
369 self
.queue
: Dict
[str, List
[ImageSpec
]] = {}
370 # pool_id => {namespace => image_id}
371 self
.images
: Dict
[str, Dict
[str, Dict
[str, str]]] = {}
372 self
.refresh_images()
373 self
.log
.debug("MirrorSnapshotScheduleHandler: queue is initialized")
375 def load_schedules(self
) -> None:
376 self
.log
.info("MirrorSnapshotScheduleHandler: load_schedules")
378 schedules
= Schedules(self
)
379 schedules
.load(namespace_validator
, image_validator
)
380 self
.schedules
= schedules
382 def refresh_images(self
) -> float:
383 elapsed
= (datetime
.now() - self
.last_refresh_images
).total_seconds()
384 if elapsed
< self
.REFRESH_DELAY_SECONDS
:
385 return self
.REFRESH_DELAY_SECONDS
- elapsed
387 self
.log
.debug("MirrorSnapshotScheduleHandler: refresh_images")
390 self
.load_schedules()
391 if not self
.schedules
:
392 self
.log
.debug("MirrorSnapshotScheduleHandler: no schedules")
395 self
.last_refresh_images
= datetime
.now()
396 return self
.REFRESH_DELAY_SECONDS
398 images
: Dict
[str, Dict
[str, Dict
[str, str]]] = {}
400 for pool_id
, pool_name
in get_rbd_pools(self
.module
).items():
401 if not self
.schedules
.intersects(
402 LevelSpec
.from_pool_spec(pool_id
, pool_name
)):
404 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
405 self
.load_pool_images(ioctx
, images
)
408 self
.refresh_queue(images
)
411 self
.last_refresh_images
= datetime
.now()
412 return self
.REFRESH_DELAY_SECONDS
414 def load_pool_images(self
,
416 images
: Dict
[str, Dict
[str, Dict
[str, str]]]) -> None:
417 pool_id
= str(ioctx
.get_pool_id())
418 pool_name
= ioctx
.get_pool_name()
421 self
.log
.debug("load_pool_images: pool={}".format(pool_name
))
424 namespaces
= [''] + rbd
.RBD().namespace_list(ioctx
)
425 for namespace
in namespaces
:
426 if not self
.schedules
.intersects(
427 LevelSpec
.from_pool_spec(int(pool_id
), pool_name
, namespace
)):
429 self
.log
.debug("load_pool_images: pool={}, namespace={}".format(
430 pool_name
, namespace
))
431 images
[pool_id
][namespace
] = {}
432 ioctx
.set_namespace(namespace
)
433 mirror_images
= dict(rbd
.RBD().mirror_image_info_list(
434 ioctx
, rbd
.RBD_MIRROR_IMAGE_MODE_SNAPSHOT
))
435 if not mirror_images
:
438 [(x
['id'], x
['name']) for x
in filter(
439 lambda x
: x
['id'] in mirror_images
,
440 rbd
.RBD().list2(ioctx
))])
441 for image_id
, info
in mirror_images
.items():
442 if not info
['primary']:
444 image_name
= image_names
.get(image_id
)
448 name
= "{}/{}/{}".format(pool_name
, namespace
,
451 name
= "{}/{}".format(pool_name
, image_name
)
453 "load_pool_images: adding image {}".format(name
))
454 images
[pool_id
][namespace
][image_id
] = name
455 except Exception as e
:
457 "load_pool_images: exception when scanning pool {}: {}".format(
460 def rebuild_queue(self
) -> None:
463 # don't remove from queue "due" images
464 now_string
= datetime
.strftime(now
, "%Y-%m-%d %H:%M:00")
466 for schedule_time
in list(self
.queue
):
467 if schedule_time
> now_string
:
468 del self
.queue
[schedule_time
]
470 if not self
.schedules
:
473 for pool_id
in self
.images
:
474 for namespace
in self
.images
[pool_id
]:
475 for image_id
in self
.images
[pool_id
][namespace
]:
476 self
.enqueue(now
, pool_id
, namespace
, image_id
)
478 self
.condition
.notify()
480 def refresh_queue(self
,
481 current_images
: Dict
[str, Dict
[str, Dict
[str, str]]]) -> None:
484 for pool_id
in self
.images
:
485 for namespace
in self
.images
[pool_id
]:
486 for image_id
in self
.images
[pool_id
][namespace
]:
487 if pool_id
not in current_images
or \
488 namespace
not in current_images
[pool_id
] or \
489 image_id
not in current_images
[pool_id
][namespace
]:
490 self
.remove_from_queue(pool_id
, namespace
, image_id
)
492 for pool_id
in current_images
:
493 for namespace
in current_images
[pool_id
]:
494 for image_id
in current_images
[pool_id
][namespace
]:
495 if pool_id
not in self
.images
or \
496 namespace
not in self
.images
[pool_id
] or \
497 image_id
not in self
.images
[pool_id
][namespace
]:
498 self
.enqueue(now
, pool_id
, namespace
, image_id
)
500 self
.condition
.notify()
502 def enqueue(self
, now
: datetime
, pool_id
: str, namespace
: str, image_id
: str) -> None:
503 schedule
= self
.schedules
.find(pool_id
, namespace
, image_id
)
506 "MirrorSnapshotScheduleHandler: no schedule for {}/{}/{}".format(
507 pool_id
, namespace
, image_id
))
510 schedule_time
= schedule
.next_run(now
)
511 if schedule_time
not in self
.queue
:
512 self
.queue
[schedule_time
] = []
514 "MirrorSnapshotScheduleHandler: scheduling {}/{}/{} at {}".format(
515 pool_id
, namespace
, image_id
, schedule_time
))
516 image_spec
= ImageSpec(pool_id
, namespace
, image_id
)
517 if image_spec
not in self
.queue
[schedule_time
]:
518 self
.queue
[schedule_time
].append(image_spec
)
520 def dequeue(self
) -> Tuple
[Optional
[ImageSpec
], float]:
525 schedule_time
= sorted(self
.queue
)[0]
527 if datetime
.strftime(now
, "%Y-%m-%d %H:%M:%S") < schedule_time
:
528 wait_time
= (datetime
.strptime(schedule_time
,
529 "%Y-%m-%d %H:%M:%S") - now
)
530 return None, wait_time
.total_seconds()
532 images
= self
.queue
[schedule_time
]
533 image
= images
.pop(0)
535 del self
.queue
[schedule_time
]
538 def remove_from_queue(self
, pool_id
: str, namespace
: str, image_id
: str) -> None:
540 "MirrorSnapshotScheduleHandler: descheduling {}/{}/{}".format(
541 pool_id
, namespace
, image_id
))
544 image_spec
= ImageSpec(pool_id
, namespace
, image_id
)
545 for schedule_time
, images
in self
.queue
.items():
546 if image_spec
in images
:
547 images
.remove(image_spec
)
549 empty_slots
.append(schedule_time
)
550 for schedule_time
in empty_slots
:
551 del self
.queue
[schedule_time
]
553 def add_schedule(self
,
554 level_spec
: LevelSpec
,
556 start_time
: Optional
[str]) -> Tuple
[int, str, str]:
558 "MirrorSnapshotScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
559 level_spec
.name
, interval
, start_time
))
561 # TODO: optimize to rebuild only affected part of the queue
563 self
.schedules
.add(level_spec
, interval
, start_time
)
567 def remove_schedule(self
,
568 level_spec
: LevelSpec
,
569 interval
: Optional
[str],
570 start_time
: Optional
[str]) -> Tuple
[int, str, str]:
572 "MirrorSnapshotScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
573 level_spec
.name
, interval
, start_time
))
575 # TODO: optimize to rebuild only affected part of the queue
577 self
.schedules
.remove(level_spec
, interval
, start_time
)
581 def list(self
, level_spec
: LevelSpec
) -> Tuple
[int, str, str]:
583 "MirrorSnapshotScheduleHandler: list: level_spec={}".format(
587 result
= self
.schedules
.to_list(level_spec
)
589 return 0, json
.dumps(result
, indent
=4, sort_keys
=True), ""
591 def status(self
, level_spec
: LevelSpec
) -> Tuple
[int, str, str]:
593 "MirrorSnapshotScheduleHandler: status: level_spec={}".format(
596 scheduled_images
= []
598 for schedule_time
in sorted(self
.queue
):
599 for pool_id
, namespace
, image_id
in self
.queue
[schedule_time
]:
600 if not level_spec
.matches(pool_id
, namespace
, image_id
):
602 image_name
= self
.images
[pool_id
][namespace
][image_id
]
603 scheduled_images
.append({
604 'schedule_time' : schedule_time
,
607 return 0, json
.dumps({'scheduled_images' : scheduled_images
},
608 indent
=4, sort_keys
=True), ""