]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py
c8cf4e4ee7ea303745bfcf9f3cd9f6f00b27e008
[ceph.git] / ceph / src / pybind / mgr / rbd_support / mirror_snapshot_schedule.py
1 import errno
2 import json
3 import rados
4 import rbd
5 import re
6 import traceback
7
8 from datetime import datetime
9 from threading import Condition, Lock, Thread
10 from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Set, Tuple, Union
11
12 from .common import get_rbd_pools
13 from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
14
15 def namespace_validator(ioctx: rados.Ioctx) -> None:
16 mode = rbd.RBD().mirror_mode_get(ioctx)
17 if mode != rbd.RBD_MIRROR_MODE_IMAGE:
18 raise ValueError("namespace {} is not in mirror image mode".format(
19 ioctx.get_namespace()))
20
21 def image_validator(image: rbd.Image) -> None:
22 mode = image.mirror_image_get_mode()
23 if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
24 raise rbd.InvalidArgument("Invalid mirror image mode")
25
26
27 class ImageSpec(NamedTuple):
28 pool_id: str
29 namespace: str
30 image_id: str
31
32
33 class CreateSnapshotRequests:
34
35 lock = Lock()
36 condition = Condition(lock)
37
38 def __init__(self, handler: Any) -> None:
39 self.handler = handler
40 self.rados = handler.module.rados
41 self.log = handler.log
42 self.pending: Set[ImageSpec] = set()
43 self.queue: List[ImageSpec] = []
44 self.ioctxs: Dict[Tuple[str, str], Tuple[rados.Ioctx, Set[ImageSpec]]] = {}
45
46 def __del__(self) -> None:
47 self.wait_for_pending()
48
49 def wait_for_pending(self) -> None:
50 with self.lock:
51 while self.pending:
52 self.condition.wait()
53
54 def add(self, pool_id: str, namespace: str, image_id: str) -> None:
55 image_spec = ImageSpec(pool_id, namespace, image_id)
56
57 self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format(
58 pool_id, namespace, image_id))
59
60 max_concurrent = self.handler.module.get_localized_module_option(
61 self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE)
62
63 with self.lock:
64 if image_spec in self.pending:
65 self.log.info(
66 "CreateSnapshotRequests.add: {}/{}/{}: {}".format(
67 pool_id, namespace, image_id,
68 "previous request is still in progress"))
69 return
70 self.pending.add(image_spec)
71
72 if len(self.pending) > max_concurrent:
73 self.queue.append(image_spec)
74 return
75
76 self.open_image(image_spec)
77
78 def open_image(self, image_spec: ImageSpec) -> None:
79 pool_id, namespace, image_id = image_spec
80
81 self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format(
82 pool_id, namespace, image_id))
83
84 try:
85 ioctx = self.get_ioctx(image_spec)
86
87 def cb(comp: rados.Completion, image: rbd.Image) -> None:
88 self.handle_open_image(image_spec, comp, image)
89
90 rbd.RBD().aio_open_image(cb, ioctx, image_id=image_id)
91 except Exception as e:
92 self.log.error(
93 "exception when opening {}/{}/{}: {}".format(
94 pool_id, namespace, image_id, e))
95 self.finish(image_spec)
96
97 def handle_open_image(self,
98 image_spec: ImageSpec,
99 comp: rados.Completion,
100 image: rbd.Image) -> None:
101 pool_id, namespace, image_id = image_spec
102
103 self.log.debug(
104 "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format(
105 pool_id, namespace, image_id, comp.get_return_value()))
106
107 if comp.get_return_value() < 0:
108 if comp.get_return_value() != -errno.ENOENT:
109 self.log.error(
110 "error when opening {}/{}/{}: {}".format(
111 pool_id, namespace, image_id, comp.get_return_value()))
112 self.finish(image_spec)
113 return
114
115 self.get_mirror_mode(image_spec, image)
116
117 def get_mirror_mode(self, image_spec: ImageSpec, image: rbd.Image) -> None:
118 pool_id, namespace, image_id = image_spec
119
120 self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format(
121 pool_id, namespace, image_id))
122
123 def cb(comp: rados.Completion, mode: int) -> None:
124 self.handle_get_mirror_mode(image_spec, image, comp, mode)
125
126 try:
127 image.aio_mirror_image_get_mode(cb)
128 except Exception as e:
129 self.log.error(
130 "exception when getting mirror mode for {}/{}/{}: {}".format(
131 pool_id, namespace, image_id, e))
132 self.close_image(image_spec, image)
133
134 def handle_get_mirror_mode(self,
135 image_spec: ImageSpec,
136 image: rbd.Image,
137 comp: rados.Completion,
138 mode: int) -> None:
139 pool_id, namespace, image_id = image_spec
140
141 self.log.debug(
142 "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format(
143 pool_id, namespace, image_id, comp.get_return_value(), mode))
144
145 if comp.get_return_value() < 0:
146 if comp.get_return_value() != -errno.ENOENT:
147 self.log.error(
148 "error when getting mirror mode for {}/{}/{}: {}".format(
149 pool_id, namespace, image_id, comp.get_return_value()))
150 self.close_image(image_spec, image)
151 return
152
153 if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
154 self.log.debug(
155 "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format(
156 pool_id, namespace, image_id,
157 "snapshot mirroring is not enabled"))
158 self.close_image(image_spec, image)
159 return
160
161 self.get_mirror_info(image_spec, image)
162
163 def get_mirror_info(self, image_spec: ImageSpec, image: rbd.Image) -> None:
164 pool_id, namespace, image_id = image_spec
165
166 self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
167 pool_id, namespace, image_id))
168
169 def cb(comp: rados.Completion, info: Dict[str, Union[str, int]]) -> None:
170 self.handle_get_mirror_info(image_spec, image, comp, info)
171
172 try:
173 image.aio_mirror_image_get_info(cb)
174 except Exception as e:
175 self.log.error(
176 "exception when getting mirror info for {}/{}/{}: {}".format(
177 pool_id, namespace, image_id, e))
178 self.close_image(image_spec, image)
179
180 def handle_get_mirror_info(self,
181 image_spec: ImageSpec,
182 image: rbd.Image,
183 comp: rados.Completion,
184 info: Dict[str, Union[str, int]]) -> None:
185 pool_id, namespace, image_id = image_spec
186
187 self.log.debug(
188 "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format(
189 pool_id, namespace, image_id, comp.get_return_value(), info))
190
191 if comp.get_return_value() < 0:
192 if comp.get_return_value() != -errno.ENOENT:
193 self.log.error(
194 "error when getting mirror info for {}/{}/{}: {}".format(
195 pool_id, namespace, image_id, comp.get_return_value()))
196 self.close_image(image_spec, image)
197 return
198
199 if not info['primary']:
200 self.log.debug(
201 "CreateSnapshotRequests.handle_get_mirror_info: {}/{}/{}: {}".format(
202 pool_id, namespace, image_id,
203 "is not primary"))
204 self.close_image(image_spec, image)
205 return
206
207 self.create_snapshot(image_spec, image)
208
209 def create_snapshot(self, image_spec: ImageSpec, image: rbd.Image) -> None:
210 pool_id, namespace, image_id = image_spec
211
212 self.log.debug(
213 "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format(
214 pool_id, namespace, image_id))
215
216 def cb(comp: rados.Completion, snap_id: int) -> None:
217 self.handle_create_snapshot(image_spec, image, comp, snap_id)
218
219 try:
220 image.aio_mirror_image_create_snapshot(0, cb)
221 except Exception as e:
222 self.log.error(
223 "exception when creating snapshot for {}/{}/{}: {}".format(
224 pool_id, namespace, image_id, e))
225 self.close_image(image_spec, image)
226
227
228 def handle_create_snapshot(self,
229 image_spec: ImageSpec,
230 image: rbd.Image,
231 comp: rados.Completion,
232 snap_id: int) -> None:
233 pool_id, namespace, image_id = image_spec
234
235 self.log.debug(
236 "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format(
237 pool_id, namespace, image_id, comp.get_return_value(), snap_id))
238
239 if comp.get_return_value() < 0 and \
240 comp.get_return_value() != -errno.ENOENT:
241 self.log.error(
242 "error when creating snapshot for {}/{}/{}: {}".format(
243 pool_id, namespace, image_id, comp.get_return_value()))
244
245 self.close_image(image_spec, image)
246
247 def close_image(self, image_spec: ImageSpec, image: rbd.Image) -> None:
248 pool_id, namespace, image_id = image_spec
249
250 self.log.debug(
251 "CreateSnapshotRequests.close_image {}/{}/{}".format(
252 pool_id, namespace, image_id))
253
254 def cb(comp: rados.Completion) -> None:
255 self.handle_close_image(image_spec, comp)
256
257 try:
258 image.aio_close(cb)
259 except Exception as e:
260 self.log.error(
261 "exception when closing {}/{}/{}: {}".format(
262 pool_id, namespace, image_id, e))
263 self.finish(image_spec)
264
265 def handle_close_image(self,
266 image_spec: ImageSpec,
267 comp: rados.Completion) -> None:
268 pool_id, namespace, image_id = image_spec
269
270 self.log.debug(
271 "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format(
272 pool_id, namespace, image_id, comp.get_return_value()))
273
274 if comp.get_return_value() < 0:
275 self.log.error(
276 "error when closing {}/{}/{}: {}".format(
277 pool_id, namespace, image_id, comp.get_return_value()))
278
279 self.finish(image_spec)
280
281 def finish(self, image_spec: ImageSpec) -> None:
282 pool_id, namespace, image_id = image_spec
283
284 self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format(
285 pool_id, namespace, image_id))
286
287 self.put_ioctx(image_spec)
288
289 with self.lock:
290 self.pending.remove(image_spec)
291 if not self.queue:
292 return
293 image_spec = self.queue.pop(0)
294
295 self.open_image(image_spec)
296
297 def get_ioctx(self, image_spec: ImageSpec) -> rados.Ioctx:
298 pool_id, namespace, image_id = image_spec
299 nspec = (pool_id, namespace)
300
301 with self.lock:
302 ioctx, images = self.ioctxs.get(nspec, (None, None))
303 if not ioctx:
304 ioctx = self.rados.open_ioctx2(int(pool_id))
305 ioctx.set_namespace(namespace)
306 images = set()
307 self.ioctxs[nspec] = (ioctx, images)
308 assert images is not None
309 images.add(image_spec)
310
311 return ioctx
312
313 def put_ioctx(self, image_spec: ImageSpec) -> None:
314 pool_id, namespace, image_id = image_spec
315 nspec = (pool_id, namespace)
316
317 with self.lock:
318 ioctx, images = self.ioctxs[nspec]
319 images.remove(image_spec)
320 if not images:
321 del self.ioctxs[nspec]
322
323
324 class MirrorSnapshotScheduleHandler:
325 MODULE_OPTION_NAME = "mirror_snapshot_schedule"
326 MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE = "max_concurrent_snap_create"
327 SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
328 REFRESH_DELAY_SECONDS = 60.0
329
330 lock = Lock()
331 condition = Condition(lock)
332 thread = None
333
334 def __init__(self, module: Any) -> None:
335 self.module = module
336 self.log = module.log
337 self.last_refresh_images = datetime(1970, 1, 1)
338 self.create_snapshot_requests = CreateSnapshotRequests(self)
339
340 self.init_schedule_queue()
341
342 self.thread = Thread(target=self.run)
343 self.thread.start()
344
345 def _cleanup(self) -> None:
346 self.create_snapshot_requests.wait_for_pending()
347
348 def run(self) -> None:
349 try:
350 self.log.info("MirrorSnapshotScheduleHandler: starting")
351 while True:
352 refresh_delay = self.refresh_images()
353 with self.lock:
354 (image_spec, wait_time) = self.dequeue()
355 if not image_spec:
356 self.condition.wait(min(wait_time, refresh_delay))
357 continue
358 pool_id, namespace, image_id = image_spec
359 self.create_snapshot_requests.add(pool_id, namespace, image_id)
360 with self.lock:
361 self.enqueue(datetime.now(), pool_id, namespace, image_id)
362
363 except Exception as ex:
364 self.log.fatal("Fatal runtime error: {}\n{}".format(
365 ex, traceback.format_exc()))
366
367 def init_schedule_queue(self) -> None:
368 # schedule_time => image_spec
369 self.queue: Dict[str, List[ImageSpec]] = {}
370 # pool_id => {namespace => image_id}
371 self.images: Dict[str, Dict[str, Dict[str, str]]] = {}
372 self.refresh_images()
373 self.log.debug("MirrorSnapshotScheduleHandler: queue is initialized")
374
375 def load_schedules(self) -> None:
376 self.log.info("MirrorSnapshotScheduleHandler: load_schedules")
377
378 schedules = Schedules(self)
379 schedules.load(namespace_validator, image_validator)
380 self.schedules = schedules
381
382 def refresh_images(self) -> float:
383 elapsed = (datetime.now() - self.last_refresh_images).total_seconds()
384 if elapsed < self.REFRESH_DELAY_SECONDS:
385 return self.REFRESH_DELAY_SECONDS - elapsed
386
387 self.log.debug("MirrorSnapshotScheduleHandler: refresh_images")
388
389 with self.lock:
390 self.load_schedules()
391 if not self.schedules:
392 self.log.debug("MirrorSnapshotScheduleHandler: no schedules")
393 self.images = {}
394 self.queue = {}
395 self.last_refresh_images = datetime.now()
396 return self.REFRESH_DELAY_SECONDS
397
398 images: Dict[str, Dict[str, Dict[str, str]]] = {}
399
400 for pool_id, pool_name in get_rbd_pools(self.module).items():
401 if not self.schedules.intersects(
402 LevelSpec.from_pool_spec(pool_id, pool_name)):
403 continue
404 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
405 self.load_pool_images(ioctx, images)
406
407 with self.lock:
408 self.refresh_queue(images)
409 self.images = images
410
411 self.last_refresh_images = datetime.now()
412 return self.REFRESH_DELAY_SECONDS
413
414 def load_pool_images(self,
415 ioctx: rados.Ioctx,
416 images: Dict[str, Dict[str, Dict[str, str]]]) -> None:
417 pool_id = str(ioctx.get_pool_id())
418 pool_name = ioctx.get_pool_name()
419 images[pool_id] = {}
420
421 self.log.debug("load_pool_images: pool={}".format(pool_name))
422
423 try:
424 namespaces = [''] + rbd.RBD().namespace_list(ioctx)
425 for namespace in namespaces:
426 if not self.schedules.intersects(
427 LevelSpec.from_pool_spec(int(pool_id), pool_name, namespace)):
428 continue
429 self.log.debug("load_pool_images: pool={}, namespace={}".format(
430 pool_name, namespace))
431 images[pool_id][namespace] = {}
432 ioctx.set_namespace(namespace)
433 mirror_images = dict(rbd.RBD().mirror_image_info_list(
434 ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT))
435 if not mirror_images:
436 continue
437 image_names = dict(
438 [(x['id'], x['name']) for x in filter(
439 lambda x: x['id'] in mirror_images,
440 rbd.RBD().list2(ioctx))])
441 for image_id, info in mirror_images.items():
442 if not info['primary']:
443 continue
444 image_name = image_names.get(image_id)
445 if not image_name:
446 continue
447 if namespace:
448 name = "{}/{}/{}".format(pool_name, namespace,
449 image_name)
450 else:
451 name = "{}/{}".format(pool_name, image_name)
452 self.log.debug(
453 "load_pool_images: adding image {}".format(name))
454 images[pool_id][namespace][image_id] = name
455 except Exception as e:
456 self.log.error(
457 "load_pool_images: exception when scanning pool {}: {}".format(
458 pool_name, e))
459
460 def rebuild_queue(self) -> None:
461 now = datetime.now()
462
463 # don't remove from queue "due" images
464 now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
465
466 for schedule_time in list(self.queue):
467 if schedule_time > now_string:
468 del self.queue[schedule_time]
469
470 if not self.schedules:
471 return
472
473 for pool_id in self.images:
474 for namespace in self.images[pool_id]:
475 for image_id in self.images[pool_id][namespace]:
476 self.enqueue(now, pool_id, namespace, image_id)
477
478 self.condition.notify()
479
480 def refresh_queue(self,
481 current_images: Dict[str, Dict[str, Dict[str, str]]]) -> None:
482 now = datetime.now()
483
484 for pool_id in self.images:
485 for namespace in self.images[pool_id]:
486 for image_id in self.images[pool_id][namespace]:
487 if pool_id not in current_images or \
488 namespace not in current_images[pool_id] or \
489 image_id not in current_images[pool_id][namespace]:
490 self.remove_from_queue(pool_id, namespace, image_id)
491
492 for pool_id in current_images:
493 for namespace in current_images[pool_id]:
494 for image_id in current_images[pool_id][namespace]:
495 if pool_id not in self.images or \
496 namespace not in self.images[pool_id] or \
497 image_id not in self.images[pool_id][namespace]:
498 self.enqueue(now, pool_id, namespace, image_id)
499
500 self.condition.notify()
501
502 def enqueue(self, now: datetime, pool_id: str, namespace: str, image_id: str) -> None:
503 schedule = self.schedules.find(pool_id, namespace, image_id)
504 if not schedule:
505 self.log.debug(
506 "MirrorSnapshotScheduleHandler: no schedule for {}/{}/{}".format(
507 pool_id, namespace, image_id))
508 return
509
510 schedule_time = schedule.next_run(now)
511 if schedule_time not in self.queue:
512 self.queue[schedule_time] = []
513 self.log.debug(
514 "MirrorSnapshotScheduleHandler: scheduling {}/{}/{} at {}".format(
515 pool_id, namespace, image_id, schedule_time))
516 image_spec = ImageSpec(pool_id, namespace, image_id)
517 if image_spec not in self.queue[schedule_time]:
518 self.queue[schedule_time].append(image_spec)
519
520 def dequeue(self) -> Tuple[Optional[ImageSpec], float]:
521 if not self.queue:
522 return None, 1000.0
523
524 now = datetime.now()
525 schedule_time = sorted(self.queue)[0]
526
527 if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
528 wait_time = (datetime.strptime(schedule_time,
529 "%Y-%m-%d %H:%M:%S") - now)
530 return None, wait_time.total_seconds()
531
532 images = self.queue[schedule_time]
533 image = images.pop(0)
534 if not images:
535 del self.queue[schedule_time]
536 return image, 0.0
537
538 def remove_from_queue(self, pool_id: str, namespace: str, image_id: str) -> None:
539 self.log.debug(
540 "MirrorSnapshotScheduleHandler: descheduling {}/{}/{}".format(
541 pool_id, namespace, image_id))
542
543 empty_slots = []
544 image_spec = ImageSpec(pool_id, namespace, image_id)
545 for schedule_time, images in self.queue.items():
546 if image_spec in images:
547 images.remove(image_spec)
548 if not images:
549 empty_slots.append(schedule_time)
550 for schedule_time in empty_slots:
551 del self.queue[schedule_time]
552
553 def add_schedule(self,
554 level_spec: LevelSpec,
555 interval: str,
556 start_time: Optional[str]) -> Tuple[int, str, str]:
557 self.log.debug(
558 "MirrorSnapshotScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
559 level_spec.name, interval, start_time))
560
561 # TODO: optimize to rebuild only affected part of the queue
562 with self.lock:
563 self.schedules.add(level_spec, interval, start_time)
564 self.rebuild_queue()
565 return 0, "", ""
566
567 def remove_schedule(self,
568 level_spec: LevelSpec,
569 interval: Optional[str],
570 start_time: Optional[str]) -> Tuple[int, str, str]:
571 self.log.debug(
572 "MirrorSnapshotScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
573 level_spec.name, interval, start_time))
574
575 # TODO: optimize to rebuild only affected part of the queue
576 with self.lock:
577 self.schedules.remove(level_spec, interval, start_time)
578 self.rebuild_queue()
579 return 0, "", ""
580
581 def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
582 self.log.debug(
583 "MirrorSnapshotScheduleHandler: list: level_spec={}".format(
584 level_spec.name))
585
586 with self.lock:
587 result = self.schedules.to_list(level_spec)
588
589 return 0, json.dumps(result, indent=4, sort_keys=True), ""
590
591 def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
592 self.log.debug(
593 "MirrorSnapshotScheduleHandler: status: level_spec={}".format(
594 level_spec.name))
595
596 scheduled_images = []
597 with self.lock:
598 for schedule_time in sorted(self.queue):
599 for pool_id, namespace, image_id in self.queue[schedule_time]:
600 if not level_spec.matches(pool_id, namespace, image_id):
601 continue
602 image_name = self.images[pool_id][namespace][image_id]
603 scheduled_images.append({
604 'schedule_time' : schedule_time,
605 'image' : image_name
606 })
607 return 0, json.dumps({'scheduled_images' : scheduled_images},
608 indent=4, sort_keys=True), ""