]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py
update ceph source to reef 18.2.0
[ceph.git] / ceph / src / pybind / mgr / rbd_support / mirror_snapshot_schedule.py
1 import errno
2 import json
3 import rados
4 import rbd
5 import traceback
6
7 from datetime import datetime
8 from threading import Condition, Lock, Thread
9 from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple, Union
10
11 from .common import get_rbd_pools
12 from .schedule import LevelSpec, Schedules
13
14
15 def namespace_validator(ioctx: rados.Ioctx) -> None:
16 mode = rbd.RBD().mirror_mode_get(ioctx)
17 if mode != rbd.RBD_MIRROR_MODE_IMAGE:
18 raise ValueError("namespace {} is not in mirror image mode".format(
19 ioctx.get_namespace()))
20
21
22 def image_validator(image: rbd.Image) -> None:
23 mode = image.mirror_image_get_mode()
24 if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
25 raise rbd.InvalidArgument("Invalid mirror image mode")
26
27
28 class ImageSpec(NamedTuple):
29 pool_id: str
30 namespace: str
31 image_id: str
32
33
34 class CreateSnapshotRequests:
35
36 lock = Lock()
37 condition = Condition(lock)
38
39 def __init__(self, handler: Any) -> None:
40 self.handler = handler
41 self.rados = handler.module.rados
42 self.log = handler.log
43 self.pending: Set[ImageSpec] = set()
44 self.queue: List[ImageSpec] = []
45 self.ioctxs: Dict[Tuple[str, str], Tuple[rados.Ioctx, Set[ImageSpec]]] = {}
46
47 def __del__(self) -> None:
48 self.wait_for_pending()
49
50 def wait_for_pending(self) -> None:
51 with self.lock:
52 while self.pending:
53 self.log.debug(
54 "CreateSnapshotRequests.wait_for_pending: "
55 "{} images".format(len(self.pending)))
56 self.condition.wait()
57 self.log.debug("CreateSnapshotRequests.wait_for_pending: done")
58
59 def add(self, pool_id: str, namespace: str, image_id: str) -> None:
60 image_spec = ImageSpec(pool_id, namespace, image_id)
61
62 self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format(
63 pool_id, namespace, image_id))
64
65 max_concurrent = self.handler.module.get_localized_module_option(
66 self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE)
67
68 with self.lock:
69 if image_spec in self.pending:
70 self.log.info(
71 "CreateSnapshotRequests.add: {}/{}/{}: {}".format(
72 pool_id, namespace, image_id,
73 "previous request is still in progress"))
74 return
75 self.pending.add(image_spec)
76
77 if len(self.pending) > max_concurrent:
78 self.queue.append(image_spec)
79 return
80
81 self.open_image(image_spec)
82
83 def open_image(self, image_spec: ImageSpec) -> None:
84 pool_id, namespace, image_id = image_spec
85
86 self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format(
87 pool_id, namespace, image_id))
88
89 try:
90 ioctx = self.get_ioctx(image_spec)
91
92 def cb(comp: rados.Completion, image: rbd.Image) -> None:
93 self.handle_open_image(image_spec, comp, image)
94
95 rbd.RBD().aio_open_image(cb, ioctx, image_id=image_id)
96 except Exception as e:
97 self.log.error(
98 "exception when opening {}/{}/{}: {}".format(
99 pool_id, namespace, image_id, e))
100 self.finish(image_spec)
101
102 def handle_open_image(self,
103 image_spec: ImageSpec,
104 comp: rados.Completion,
105 image: rbd.Image) -> None:
106 pool_id, namespace, image_id = image_spec
107
108 self.log.debug(
109 "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format(
110 pool_id, namespace, image_id, comp.get_return_value()))
111
112 if comp.get_return_value() < 0:
113 if comp.get_return_value() != -errno.ENOENT:
114 self.log.error(
115 "error when opening {}/{}/{}: {}".format(
116 pool_id, namespace, image_id, comp.get_return_value()))
117 self.finish(image_spec)
118 return
119
120 self.get_mirror_mode(image_spec, image)
121
122 def get_mirror_mode(self, image_spec: ImageSpec, image: rbd.Image) -> None:
123 pool_id, namespace, image_id = image_spec
124
125 self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format(
126 pool_id, namespace, image_id))
127
128 def cb(comp: rados.Completion, mode: int) -> None:
129 self.handle_get_mirror_mode(image_spec, image, comp, mode)
130
131 try:
132 image.aio_mirror_image_get_mode(cb)
133 except Exception as e:
134 self.log.error(
135 "exception when getting mirror mode for {}/{}/{}: {}".format(
136 pool_id, namespace, image_id, e))
137 self.close_image(image_spec, image)
138
139 def handle_get_mirror_mode(self,
140 image_spec: ImageSpec,
141 image: rbd.Image,
142 comp: rados.Completion,
143 mode: int) -> None:
144 pool_id, namespace, image_id = image_spec
145
146 self.log.debug(
147 "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format(
148 pool_id, namespace, image_id, comp.get_return_value(), mode))
149
150 if comp.get_return_value() < 0:
151 if comp.get_return_value() != -errno.ENOENT:
152 self.log.error(
153 "error when getting mirror mode for {}/{}/{}: {}".format(
154 pool_id, namespace, image_id, comp.get_return_value()))
155 self.close_image(image_spec, image)
156 return
157
158 if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
159 self.log.debug(
160 "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format(
161 pool_id, namespace, image_id,
162 "snapshot mirroring is not enabled"))
163 self.close_image(image_spec, image)
164 return
165
166 self.get_mirror_info(image_spec, image)
167
168 def get_mirror_info(self, image_spec: ImageSpec, image: rbd.Image) -> None:
169 pool_id, namespace, image_id = image_spec
170
171 self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
172 pool_id, namespace, image_id))
173
174 def cb(comp: rados.Completion, info: Dict[str, Union[str, int]]) -> None:
175 self.handle_get_mirror_info(image_spec, image, comp, info)
176
177 try:
178 image.aio_mirror_image_get_info(cb)
179 except Exception as e:
180 self.log.error(
181 "exception when getting mirror info for {}/{}/{}: {}".format(
182 pool_id, namespace, image_id, e))
183 self.close_image(image_spec, image)
184
185 def handle_get_mirror_info(self,
186 image_spec: ImageSpec,
187 image: rbd.Image,
188 comp: rados.Completion,
189 info: Dict[str, Union[str, int]]) -> None:
190 pool_id, namespace, image_id = image_spec
191
192 self.log.debug(
193 "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format(
194 pool_id, namespace, image_id, comp.get_return_value(), info))
195
196 if comp.get_return_value() < 0:
197 if comp.get_return_value() != -errno.ENOENT:
198 self.log.error(
199 "error when getting mirror info for {}/{}/{}: {}".format(
200 pool_id, namespace, image_id, comp.get_return_value()))
201 self.close_image(image_spec, image)
202 return
203
204 if not info['primary']:
205 self.log.debug(
206 "CreateSnapshotRequests.handle_get_mirror_info: {}/{}/{}: {}".format(
207 pool_id, namespace, image_id,
208 "is not primary"))
209 self.close_image(image_spec, image)
210 return
211
212 self.create_snapshot(image_spec, image)
213
214 def create_snapshot(self, image_spec: ImageSpec, image: rbd.Image) -> None:
215 pool_id, namespace, image_id = image_spec
216
217 self.log.debug(
218 "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format(
219 pool_id, namespace, image_id))
220
221 def cb(comp: rados.Completion, snap_id: int) -> None:
222 self.handle_create_snapshot(image_spec, image, comp, snap_id)
223
224 try:
225 image.aio_mirror_image_create_snapshot(0, cb)
226 except Exception as e:
227 self.log.error(
228 "exception when creating snapshot for {}/{}/{}: {}".format(
229 pool_id, namespace, image_id, e))
230 self.close_image(image_spec, image)
231
232 def handle_create_snapshot(self,
233 image_spec: ImageSpec,
234 image: rbd.Image,
235 comp: rados.Completion,
236 snap_id: int) -> None:
237 pool_id, namespace, image_id = image_spec
238
239 self.log.debug(
240 "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format(
241 pool_id, namespace, image_id, comp.get_return_value(), snap_id))
242
243 if comp.get_return_value() < 0 and \
244 comp.get_return_value() != -errno.ENOENT:
245 self.log.error(
246 "error when creating snapshot for {}/{}/{}: {}".format(
247 pool_id, namespace, image_id, comp.get_return_value()))
248
249 self.close_image(image_spec, image)
250
251 def close_image(self, image_spec: ImageSpec, image: rbd.Image) -> None:
252 pool_id, namespace, image_id = image_spec
253
254 self.log.debug(
255 "CreateSnapshotRequests.close_image {}/{}/{}".format(
256 pool_id, namespace, image_id))
257
258 def cb(comp: rados.Completion) -> None:
259 self.handle_close_image(image_spec, comp)
260
261 try:
262 image.aio_close(cb)
263 except Exception as e:
264 self.log.error(
265 "exception when closing {}/{}/{}: {}".format(
266 pool_id, namespace, image_id, e))
267 self.finish(image_spec)
268
269 def handle_close_image(self,
270 image_spec: ImageSpec,
271 comp: rados.Completion) -> None:
272 pool_id, namespace, image_id = image_spec
273
274 self.log.debug(
275 "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format(
276 pool_id, namespace, image_id, comp.get_return_value()))
277
278 if comp.get_return_value() < 0:
279 self.log.error(
280 "error when closing {}/{}/{}: {}".format(
281 pool_id, namespace, image_id, comp.get_return_value()))
282
283 self.finish(image_spec)
284
285 def finish(self, image_spec: ImageSpec) -> None:
286 pool_id, namespace, image_id = image_spec
287
288 self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format(
289 pool_id, namespace, image_id))
290
291 self.put_ioctx(image_spec)
292
293 with self.lock:
294 self.pending.remove(image_spec)
295 self.condition.notify()
296 if not self.queue:
297 return
298 image_spec = self.queue.pop(0)
299
300 self.open_image(image_spec)
301
302 def get_ioctx(self, image_spec: ImageSpec) -> rados.Ioctx:
303 pool_id, namespace, image_id = image_spec
304 nspec = (pool_id, namespace)
305
306 with self.lock:
307 ioctx, images = self.ioctxs.get(nspec, (None, None))
308 if not ioctx:
309 ioctx = self.rados.open_ioctx2(int(pool_id))
310 ioctx.set_namespace(namespace)
311 images = set()
312 self.ioctxs[nspec] = (ioctx, images)
313 assert images is not None
314 images.add(image_spec)
315
316 return ioctx
317
318 def put_ioctx(self, image_spec: ImageSpec) -> None:
319 pool_id, namespace, image_id = image_spec
320 nspec = (pool_id, namespace)
321
322 with self.lock:
323 ioctx, images = self.ioctxs[nspec]
324 images.remove(image_spec)
325 if not images:
326 del self.ioctxs[nspec]
327
328
329 class MirrorSnapshotScheduleHandler:
330 MODULE_OPTION_NAME = "mirror_snapshot_schedule"
331 MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE = "max_concurrent_snap_create"
332 SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
333 REFRESH_DELAY_SECONDS = 60.0
334
335 lock = Lock()
336 condition = Condition(lock)
337
338 def __init__(self, module: Any) -> None:
339 self.module = module
340 self.log = module.log
341 self.last_refresh_images = datetime(1970, 1, 1)
342 self.create_snapshot_requests = CreateSnapshotRequests(self)
343
344 self.stop_thread = False
345 self.thread = Thread(target=self.run)
346
347 def setup(self) -> None:
348 self.init_schedule_queue()
349 self.thread.start()
350
351 def shutdown(self) -> None:
352 self.log.info("MirrorSnapshotScheduleHandler: shutting down")
353 self.stop_thread = True
354 if self.thread.is_alive():
355 self.log.debug("MirrorSnapshotScheduleHandler: joining thread")
356 self.thread.join()
357 self.create_snapshot_requests.wait_for_pending()
358 self.log.info("MirrorSnapshotScheduleHandler: shut down")
359
360 def run(self) -> None:
361 try:
362 self.log.info("MirrorSnapshotScheduleHandler: starting")
363 while not self.stop_thread:
364 refresh_delay = self.refresh_images()
365 with self.lock:
366 (image_spec, wait_time) = self.dequeue()
367 if not image_spec:
368 self.condition.wait(min(wait_time, refresh_delay))
369 continue
370 pool_id, namespace, image_id = image_spec
371 self.create_snapshot_requests.add(pool_id, namespace, image_id)
372 with self.lock:
373 self.enqueue(datetime.now(), pool_id, namespace, image_id)
374
375 except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
376 self.log.exception("MirrorSnapshotScheduleHandler: client blocklisted")
377 self.module.client_blocklisted.set()
378 except Exception as ex:
379 self.log.fatal("Fatal runtime error: {}\n{}".format(
380 ex, traceback.format_exc()))
381
382 def init_schedule_queue(self) -> None:
383 # schedule_time => image_spec
384 self.queue: Dict[str, List[ImageSpec]] = {}
385 # pool_id => {namespace => image_id}
386 self.images: Dict[str, Dict[str, Dict[str, str]]] = {}
387 self.schedules = Schedules(self)
388 self.refresh_images()
389 self.log.debug("MirrorSnapshotScheduleHandler: queue is initialized")
390
391 def load_schedules(self) -> None:
392 self.log.info("MirrorSnapshotScheduleHandler: load_schedules")
393 self.schedules.load(namespace_validator, image_validator)
394
395 def refresh_images(self) -> float:
396 elapsed = (datetime.now() - self.last_refresh_images).total_seconds()
397 if elapsed < self.REFRESH_DELAY_SECONDS:
398 return self.REFRESH_DELAY_SECONDS - elapsed
399
400 self.log.debug("MirrorSnapshotScheduleHandler: refresh_images")
401
402 with self.lock:
403 self.load_schedules()
404 if not self.schedules:
405 self.log.debug("MirrorSnapshotScheduleHandler: no schedules")
406 self.images = {}
407 self.queue = {}
408 self.last_refresh_images = datetime.now()
409 return self.REFRESH_DELAY_SECONDS
410
411 images: Dict[str, Dict[str, Dict[str, str]]] = {}
412
413 for pool_id, pool_name in get_rbd_pools(self.module).items():
414 if not self.schedules.intersects(
415 LevelSpec.from_pool_spec(pool_id, pool_name)):
416 continue
417 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
418 self.load_pool_images(ioctx, images)
419
420 with self.lock:
421 self.refresh_queue(images)
422 self.images = images
423
424 self.last_refresh_images = datetime.now()
425 return self.REFRESH_DELAY_SECONDS
426
427 def load_pool_images(self,
428 ioctx: rados.Ioctx,
429 images: Dict[str, Dict[str, Dict[str, str]]]) -> None:
430 pool_id = str(ioctx.get_pool_id())
431 pool_name = ioctx.get_pool_name()
432 images[pool_id] = {}
433
434 self.log.debug("load_pool_images: pool={}".format(pool_name))
435
436 try:
437 namespaces = [''] + rbd.RBD().namespace_list(ioctx)
438 for namespace in namespaces:
439 if not self.schedules.intersects(
440 LevelSpec.from_pool_spec(int(pool_id), pool_name, namespace)):
441 continue
442 self.log.debug("load_pool_images: pool={}, namespace={}".format(
443 pool_name, namespace))
444 images[pool_id][namespace] = {}
445 ioctx.set_namespace(namespace)
446 mirror_images = dict(rbd.RBD().mirror_image_info_list(
447 ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT))
448 if not mirror_images:
449 continue
450 image_names = dict(
451 [(x['id'], x['name']) for x in filter(
452 lambda x: x['id'] in mirror_images,
453 rbd.RBD().list2(ioctx))])
454 for image_id, info in mirror_images.items():
455 if not info['primary']:
456 continue
457 image_name = image_names.get(image_id)
458 if not image_name:
459 continue
460 if namespace:
461 name = "{}/{}/{}".format(pool_name, namespace,
462 image_name)
463 else:
464 name = "{}/{}".format(pool_name, image_name)
465 self.log.debug(
466 "load_pool_images: adding image {}".format(name))
467 images[pool_id][namespace][image_id] = name
468 except rbd.ConnectionShutdown:
469 raise
470 except Exception as e:
471 self.log.error(
472 "load_pool_images: exception when scanning pool {}: {}".format(
473 pool_name, e))
474
475 def rebuild_queue(self) -> None:
476 now = datetime.now()
477
478 # don't remove from queue "due" images
479 now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
480
481 for schedule_time in list(self.queue):
482 if schedule_time > now_string:
483 del self.queue[schedule_time]
484
485 if not self.schedules:
486 return
487
488 for pool_id in self.images:
489 for namespace in self.images[pool_id]:
490 for image_id in self.images[pool_id][namespace]:
491 self.enqueue(now, pool_id, namespace, image_id)
492
493 self.condition.notify()
494
495 def refresh_queue(self,
496 current_images: Dict[str, Dict[str, Dict[str, str]]]) -> None:
497 now = datetime.now()
498
499 for pool_id in self.images:
500 for namespace in self.images[pool_id]:
501 for image_id in self.images[pool_id][namespace]:
502 if pool_id not in current_images or \
503 namespace not in current_images[pool_id] or \
504 image_id not in current_images[pool_id][namespace]:
505 self.remove_from_queue(pool_id, namespace, image_id)
506
507 for pool_id in current_images:
508 for namespace in current_images[pool_id]:
509 for image_id in current_images[pool_id][namespace]:
510 if pool_id not in self.images or \
511 namespace not in self.images[pool_id] or \
512 image_id not in self.images[pool_id][namespace]:
513 self.enqueue(now, pool_id, namespace, image_id)
514
515 self.condition.notify()
516
517 def enqueue(self, now: datetime, pool_id: str, namespace: str, image_id: str) -> None:
518 schedule = self.schedules.find(pool_id, namespace, image_id)
519 if not schedule:
520 self.log.debug(
521 "MirrorSnapshotScheduleHandler: no schedule for {}/{}/{}".format(
522 pool_id, namespace, image_id))
523 return
524
525 schedule_time = schedule.next_run(now)
526 if schedule_time not in self.queue:
527 self.queue[schedule_time] = []
528 self.log.debug(
529 "MirrorSnapshotScheduleHandler: scheduling {}/{}/{} at {}".format(
530 pool_id, namespace, image_id, schedule_time))
531 image_spec = ImageSpec(pool_id, namespace, image_id)
532 if image_spec not in self.queue[schedule_time]:
533 self.queue[schedule_time].append(image_spec)
534
535 def dequeue(self) -> Tuple[Optional[ImageSpec], float]:
536 if not self.queue:
537 return None, 1000.0
538
539 now = datetime.now()
540 schedule_time = sorted(self.queue)[0]
541
542 if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
543 wait_time = (datetime.strptime(schedule_time,
544 "%Y-%m-%d %H:%M:%S") - now)
545 return None, wait_time.total_seconds()
546
547 images = self.queue[schedule_time]
548 image = images.pop(0)
549 if not images:
550 del self.queue[schedule_time]
551 return image, 0.0
552
553 def remove_from_queue(self, pool_id: str, namespace: str, image_id: str) -> None:
554 self.log.debug(
555 "MirrorSnapshotScheduleHandler: descheduling {}/{}/{}".format(
556 pool_id, namespace, image_id))
557
558 empty_slots = []
559 image_spec = ImageSpec(pool_id, namespace, image_id)
560 for schedule_time, images in self.queue.items():
561 if image_spec in images:
562 images.remove(image_spec)
563 if not images:
564 empty_slots.append(schedule_time)
565 for schedule_time in empty_slots:
566 del self.queue[schedule_time]
567
568 def add_schedule(self,
569 level_spec: LevelSpec,
570 interval: str,
571 start_time: Optional[str]) -> Tuple[int, str, str]:
572 self.log.debug(
573 "MirrorSnapshotScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
574 level_spec.name, interval, start_time))
575
576 # TODO: optimize to rebuild only affected part of the queue
577 with self.lock:
578 self.schedules.add(level_spec, interval, start_time)
579 self.rebuild_queue()
580 return 0, "", ""
581
582 def remove_schedule(self,
583 level_spec: LevelSpec,
584 interval: Optional[str],
585 start_time: Optional[str]) -> Tuple[int, str, str]:
586 self.log.debug(
587 "MirrorSnapshotScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
588 level_spec.name, interval, start_time))
589
590 # TODO: optimize to rebuild only affected part of the queue
591 with self.lock:
592 self.schedules.remove(level_spec, interval, start_time)
593 self.rebuild_queue()
594 return 0, "", ""
595
596 def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
597 self.log.debug(
598 "MirrorSnapshotScheduleHandler: list: level_spec={}".format(
599 level_spec.name))
600
601 with self.lock:
602 result = self.schedules.to_list(level_spec)
603
604 return 0, json.dumps(result, indent=4, sort_keys=True), ""
605
606 def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
607 self.log.debug(
608 "MirrorSnapshotScheduleHandler: status: level_spec={}".format(
609 level_spec.name))
610
611 scheduled_images = []
612 with self.lock:
613 for schedule_time in sorted(self.queue):
614 for pool_id, namespace, image_id in self.queue[schedule_time]:
615 if not level_spec.matches(pool_id, namespace, image_id):
616 continue
617 image_name = self.images[pool_id][namespace][image_id]
618 scheduled_images.append({
619 'schedule_time': schedule_time,
620 'image': image_name
621 })
622 return 0, json.dumps({'scheduled_images': scheduled_images},
623 indent=4, sort_keys=True), ""