import json
import rados
import rbd
-import re
import traceback
from datetime import datetime
from threading import Condition, Lock, Thread
-from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Set, Tuple, Union
+from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple, Union
from .common import get_rbd_pools
-from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
+from .schedule import LevelSpec, Schedules
+
def namespace_validator(ioctx: rados.Ioctx) -> None:
mode = rbd.RBD().mirror_mode_get(ioctx)
raise ValueError("namespace {} is not in mirror image mode".format(
ioctx.get_namespace()))
+
def image_validator(image: rbd.Image) -> None:
mode = image.mirror_image_get_mode()
if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
self.wait_for_pending()
def wait_for_pending(self) -> None:
+ self.log.debug("CreateSnapshotRequests.wait_for_pending")
with self.lock:
while self.pending:
self.condition.wait()
pool_id, namespace, image_id, e))
self.close_image(image_spec, image)
-
def handle_create_snapshot(self,
image_spec: ImageSpec,
image: rbd.Image,
with self.lock:
self.pending.remove(image_spec)
+ self.condition.notify()
if not self.queue:
return
image_spec = self.queue.pop(0)
lock = Lock()
condition = Condition(lock)
- thread = None
def __init__(self, module: Any) -> None:
self.module = module
self.last_refresh_images = datetime(1970, 1, 1)
self.create_snapshot_requests = CreateSnapshotRequests(self)
- self.init_schedule_queue()
-
+ self.stop_thread = False
self.thread = Thread(target=self.run)
+
+ def setup(self) -> None:
+ self.init_schedule_queue()
self.thread.start()
- def _cleanup(self) -> None:
+ def shutdown(self) -> None:
+ self.log.info("MirrorSnapshotScheduleHandler: shutting down")
+ self.stop_thread = True
+ if self.thread.is_alive():
+ self.log.debug("MirrorSnapshotScheduleHandler: joining thread")
+ self.thread.join()
self.create_snapshot_requests.wait_for_pending()
+ self.log.info("MirrorSnapshotScheduleHandler: shut down")
def run(self) -> None:
try:
self.log.info("MirrorSnapshotScheduleHandler: starting")
- while True:
+ while not self.stop_thread:
refresh_delay = self.refresh_images()
with self.lock:
(image_spec, wait_time) = self.dequeue()
with self.lock:
self.enqueue(datetime.now(), pool_id, namespace, image_id)
+ except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
+ self.log.exception("MirrorSnapshotScheduleHandler: client blocklisted")
+ self.module.client_blocklisted.set()
except Exception as ex:
self.log.fatal("Fatal runtime error: {}\n{}".format(
ex, traceback.format_exc()))
self.log.debug(
"load_pool_images: adding image {}".format(name))
images[pool_id][namespace][image_id] = name
+ except rbd.ConnectionShutdown:
+ raise
except Exception as e:
self.log.error(
"load_pool_images: exception when scanning pool {}: {}".format(
continue
image_name = self.images[pool_id][namespace][image_id]
scheduled_images.append({
- 'schedule_time' : schedule_time,
- 'image' : image_name
+ 'schedule_time': schedule_time,
+ 'image': image_name
})
- return 0, json.dumps({'scheduled_images' : scheduled_images},
+ return 0, json.dumps({'scheduled_images': scheduled_images},
indent=4, sort_keys=True), ""