]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / rbd_support / mirror_snapshot_schedule.py
1 import errno
2 import json
3 import rados
4 import rbd
5 import traceback
6
7 from datetime import datetime
8 from threading import Condition, Lock, Thread
9 from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple, Union
10
11 from .common import get_rbd_pools
12 from .schedule import LevelSpec, Schedules
13
14
15 def namespace_validator(ioctx: rados.Ioctx) -> None:
16 mode = rbd.RBD().mirror_mode_get(ioctx)
17 if mode != rbd.RBD_MIRROR_MODE_IMAGE:
18 raise ValueError("namespace {} is not in mirror image mode".format(
19 ioctx.get_namespace()))
20
21
22 def image_validator(image: rbd.Image) -> None:
23 mode = image.mirror_image_get_mode()
24 if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
25 raise rbd.InvalidArgument("Invalid mirror image mode")
26
27
28 class ImageSpec(NamedTuple):
29 pool_id: str
30 namespace: str
31 image_id: str
32
33
34 class CreateSnapshotRequests:
35
36 def __init__(self, handler: Any) -> None:
37 self.lock = Lock()
38 self.condition = Condition(self.lock)
39 self.handler = handler
40 self.rados = handler.module.rados
41 self.log = handler.log
42 self.pending: Set[ImageSpec] = set()
43 self.queue: List[ImageSpec] = []
44 self.ioctxs: Dict[Tuple[str, str], Tuple[rados.Ioctx, Set[ImageSpec]]] = {}
45
46 def wait_for_pending(self) -> None:
47 with self.lock:
48 while self.pending:
49 self.log.debug(
50 "CreateSnapshotRequests.wait_for_pending: "
51 "{} images".format(len(self.pending)))
52 self.condition.wait()
53 self.log.debug("CreateSnapshotRequests.wait_for_pending: done")
54
55 def add(self, pool_id: str, namespace: str, image_id: str) -> None:
56 image_spec = ImageSpec(pool_id, namespace, image_id)
57
58 self.log.debug("CreateSnapshotRequests.add: {}/{}/{}".format(
59 pool_id, namespace, image_id))
60
61 max_concurrent = self.handler.module.get_localized_module_option(
62 self.handler.MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE)
63
64 with self.lock:
65 if image_spec in self.pending:
66 self.log.info(
67 "CreateSnapshotRequests.add: {}/{}/{}: {}".format(
68 pool_id, namespace, image_id,
69 "previous request is still in progress"))
70 return
71 self.pending.add(image_spec)
72
73 if len(self.pending) > max_concurrent:
74 self.queue.append(image_spec)
75 return
76
77 self.open_image(image_spec)
78
79 def open_image(self, image_spec: ImageSpec) -> None:
80 pool_id, namespace, image_id = image_spec
81
82 self.log.debug("CreateSnapshotRequests.open_image: {}/{}/{}".format(
83 pool_id, namespace, image_id))
84
85 try:
86 ioctx = self.get_ioctx(image_spec)
87
88 def cb(comp: rados.Completion, image: rbd.Image) -> None:
89 self.handle_open_image(image_spec, comp, image)
90
91 rbd.RBD().aio_open_image(cb, ioctx, image_id=image_id)
92 except Exception as e:
93 self.log.error(
94 "exception when opening {}/{}/{}: {}".format(
95 pool_id, namespace, image_id, e))
96 self.finish(image_spec)
97
98 def handle_open_image(self,
99 image_spec: ImageSpec,
100 comp: rados.Completion,
101 image: rbd.Image) -> None:
102 pool_id, namespace, image_id = image_spec
103
104 self.log.debug(
105 "CreateSnapshotRequests.handle_open_image {}/{}/{}: r={}".format(
106 pool_id, namespace, image_id, comp.get_return_value()))
107
108 if comp.get_return_value() < 0:
109 if comp.get_return_value() != -errno.ENOENT:
110 self.log.error(
111 "error when opening {}/{}/{}: {}".format(
112 pool_id, namespace, image_id, comp.get_return_value()))
113 self.finish(image_spec)
114 return
115
116 self.get_mirror_mode(image_spec, image)
117
118 def get_mirror_mode(self, image_spec: ImageSpec, image: rbd.Image) -> None:
119 pool_id, namespace, image_id = image_spec
120
121 self.log.debug("CreateSnapshotRequests.get_mirror_mode: {}/{}/{}".format(
122 pool_id, namespace, image_id))
123
124 def cb(comp: rados.Completion, mode: Optional[int]) -> None:
125 self.handle_get_mirror_mode(image_spec, image, comp, mode)
126
127 try:
128 image.aio_mirror_image_get_mode(cb)
129 except Exception as e:
130 self.log.error(
131 "exception when getting mirror mode for {}/{}/{}: {}".format(
132 pool_id, namespace, image_id, e))
133 self.close_image(image_spec, image)
134
135 def handle_get_mirror_mode(self,
136 image_spec: ImageSpec,
137 image: rbd.Image,
138 comp: rados.Completion,
139 mode: Optional[int]) -> None:
140 pool_id, namespace, image_id = image_spec
141
142 self.log.debug(
143 "CreateSnapshotRequests.handle_get_mirror_mode {}/{}/{}: r={} mode={}".format(
144 pool_id, namespace, image_id, comp.get_return_value(), mode))
145
146 if mode is None:
147 if comp.get_return_value() != -errno.ENOENT:
148 self.log.error(
149 "error when getting mirror mode for {}/{}/{}: {}".format(
150 pool_id, namespace, image_id, comp.get_return_value()))
151 self.close_image(image_spec, image)
152 return
153
154 if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
155 self.log.debug(
156 "CreateSnapshotRequests.handle_get_mirror_mode: {}/{}/{}: {}".format(
157 pool_id, namespace, image_id,
158 "snapshot mirroring is not enabled"))
159 self.close_image(image_spec, image)
160 return
161
162 self.get_mirror_info(image_spec, image)
163
164 def get_mirror_info(self, image_spec: ImageSpec, image: rbd.Image) -> None:
165 pool_id, namespace, image_id = image_spec
166
167 self.log.debug("CreateSnapshotRequests.get_mirror_info: {}/{}/{}".format(
168 pool_id, namespace, image_id))
169
170 def cb(comp: rados.Completion, info: Optional[Dict[str, Union[str, int]]]) -> None:
171 self.handle_get_mirror_info(image_spec, image, comp, info)
172
173 try:
174 image.aio_mirror_image_get_info(cb)
175 except Exception as e:
176 self.log.error(
177 "exception when getting mirror info for {}/{}/{}: {}".format(
178 pool_id, namespace, image_id, e))
179 self.close_image(image_spec, image)
180
181 def handle_get_mirror_info(self,
182 image_spec: ImageSpec,
183 image: rbd.Image,
184 comp: rados.Completion,
185 info: Optional[Dict[str, Union[str, int]]]) -> None:
186 pool_id, namespace, image_id = image_spec
187
188 self.log.debug(
189 "CreateSnapshotRequests.handle_get_mirror_info {}/{}/{}: r={} info={}".format(
190 pool_id, namespace, image_id, comp.get_return_value(), info))
191
192 if info is None:
193 if comp.get_return_value() != -errno.ENOENT:
194 self.log.error(
195 "error when getting mirror info for {}/{}/{}: {}".format(
196 pool_id, namespace, image_id, comp.get_return_value()))
197 self.close_image(image_spec, image)
198 return
199
200 if not info['primary']:
201 self.log.debug(
202 "CreateSnapshotRequests.handle_get_mirror_info: {}/{}/{}: {}".format(
203 pool_id, namespace, image_id,
204 "is not primary"))
205 self.close_image(image_spec, image)
206 return
207
208 self.create_snapshot(image_spec, image)
209
210 def create_snapshot(self, image_spec: ImageSpec, image: rbd.Image) -> None:
211 pool_id, namespace, image_id = image_spec
212
213 self.log.debug(
214 "CreateSnapshotRequests.create_snapshot for {}/{}/{}".format(
215 pool_id, namespace, image_id))
216
217 def cb(comp: rados.Completion, snap_id: Optional[int]) -> None:
218 self.handle_create_snapshot(image_spec, image, comp, snap_id)
219
220 try:
221 image.aio_mirror_image_create_snapshot(0, cb)
222 except Exception as e:
223 self.log.error(
224 "exception when creating snapshot for {}/{}/{}: {}".format(
225 pool_id, namespace, image_id, e))
226 self.close_image(image_spec, image)
227
228 def handle_create_snapshot(self,
229 image_spec: ImageSpec,
230 image: rbd.Image,
231 comp: rados.Completion,
232 snap_id: Optional[int]) -> None:
233 pool_id, namespace, image_id = image_spec
234
235 self.log.debug(
236 "CreateSnapshotRequests.handle_create_snapshot for {}/{}/{}: r={}, snap_id={}".format(
237 pool_id, namespace, image_id, comp.get_return_value(), snap_id))
238
239 if snap_id is None and comp.get_return_value() != -errno.ENOENT:
240 self.log.error(
241 "error when creating snapshot for {}/{}/{}: {}".format(
242 pool_id, namespace, image_id, comp.get_return_value()))
243
244 self.close_image(image_spec, image)
245
246 def close_image(self, image_spec: ImageSpec, image: rbd.Image) -> None:
247 pool_id, namespace, image_id = image_spec
248
249 self.log.debug(
250 "CreateSnapshotRequests.close_image {}/{}/{}".format(
251 pool_id, namespace, image_id))
252
253 def cb(comp: rados.Completion) -> None:
254 self.handle_close_image(image_spec, comp)
255
256 try:
257 image.aio_close(cb)
258 except Exception as e:
259 self.log.error(
260 "exception when closing {}/{}/{}: {}".format(
261 pool_id, namespace, image_id, e))
262 self.finish(image_spec)
263
264 def handle_close_image(self,
265 image_spec: ImageSpec,
266 comp: rados.Completion) -> None:
267 pool_id, namespace, image_id = image_spec
268
269 self.log.debug(
270 "CreateSnapshotRequests.handle_close_image {}/{}/{}: r={}".format(
271 pool_id, namespace, image_id, comp.get_return_value()))
272
273 if comp.get_return_value() < 0:
274 self.log.error(
275 "error when closing {}/{}/{}: {}".format(
276 pool_id, namespace, image_id, comp.get_return_value()))
277
278 self.finish(image_spec)
279
280 def finish(self, image_spec: ImageSpec) -> None:
281 pool_id, namespace, image_id = image_spec
282
283 self.log.debug("CreateSnapshotRequests.finish: {}/{}/{}".format(
284 pool_id, namespace, image_id))
285
286 self.put_ioctx(image_spec)
287
288 with self.lock:
289 self.pending.remove(image_spec)
290 self.condition.notify()
291 if not self.queue:
292 return
293 image_spec = self.queue.pop(0)
294
295 self.open_image(image_spec)
296
297 def get_ioctx(self, image_spec: ImageSpec) -> rados.Ioctx:
298 pool_id, namespace, image_id = image_spec
299 nspec = (pool_id, namespace)
300
301 with self.lock:
302 ioctx, images = self.ioctxs.get(nspec, (None, None))
303 if not ioctx:
304 ioctx = self.rados.open_ioctx2(int(pool_id))
305 ioctx.set_namespace(namespace)
306 images = set()
307 self.ioctxs[nspec] = (ioctx, images)
308 assert images is not None
309 images.add(image_spec)
310
311 return ioctx
312
313 def put_ioctx(self, image_spec: ImageSpec) -> None:
314 pool_id, namespace, image_id = image_spec
315 nspec = (pool_id, namespace)
316
317 with self.lock:
318 ioctx, images = self.ioctxs[nspec]
319 images.remove(image_spec)
320 if not images:
321 del self.ioctxs[nspec]
322
323
324 class MirrorSnapshotScheduleHandler:
325 MODULE_OPTION_NAME = "mirror_snapshot_schedule"
326 MODULE_OPTION_NAME_MAX_CONCURRENT_SNAP_CREATE = "max_concurrent_snap_create"
327 SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
328 REFRESH_DELAY_SECONDS = 60.0
329
330 def __init__(self, module: Any) -> None:
331 self.lock = Lock()
332 self.condition = Condition(self.lock)
333 self.module = module
334 self.log = module.log
335 self.last_refresh_images = datetime(1970, 1, 1)
336 self.create_snapshot_requests = CreateSnapshotRequests(self)
337
338 self.stop_thread = False
339 self.thread = Thread(target=self.run)
340
341 def setup(self) -> None:
342 self.init_schedule_queue()
343 self.thread.start()
344
345 def shutdown(self) -> None:
346 self.log.info("MirrorSnapshotScheduleHandler: shutting down")
347 self.stop_thread = True
348 if self.thread.is_alive():
349 self.log.debug("MirrorSnapshotScheduleHandler: joining thread")
350 self.thread.join()
351 self.create_snapshot_requests.wait_for_pending()
352 self.log.info("MirrorSnapshotScheduleHandler: shut down")
353
354 def run(self) -> None:
355 try:
356 self.log.info("MirrorSnapshotScheduleHandler: starting")
357 while not self.stop_thread:
358 refresh_delay = self.refresh_images()
359 with self.lock:
360 (image_spec, wait_time) = self.dequeue()
361 if not image_spec:
362 self.condition.wait(min(wait_time, refresh_delay))
363 continue
364 pool_id, namespace, image_id = image_spec
365 self.create_snapshot_requests.add(pool_id, namespace, image_id)
366 with self.lock:
367 self.enqueue(datetime.now(), pool_id, namespace, image_id)
368
369 except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
370 self.log.exception("MirrorSnapshotScheduleHandler: client blocklisted")
371 self.module.client_blocklisted.set()
372 except Exception as ex:
373 self.log.fatal("Fatal runtime error: {}\n{}".format(
374 ex, traceback.format_exc()))
375
376 def init_schedule_queue(self) -> None:
377 # schedule_time => image_spec
378 self.queue: Dict[str, List[ImageSpec]] = {}
379 # pool_id => {namespace => image_id}
380 self.images: Dict[str, Dict[str, Dict[str, str]]] = {}
381 self.schedules = Schedules(self)
382 self.refresh_images()
383 self.log.debug("MirrorSnapshotScheduleHandler: queue is initialized")
384
385 def load_schedules(self) -> None:
386 self.log.info("MirrorSnapshotScheduleHandler: load_schedules")
387 self.schedules.load(namespace_validator, image_validator)
388
389 def refresh_images(self) -> float:
390 elapsed = (datetime.now() - self.last_refresh_images).total_seconds()
391 if elapsed < self.REFRESH_DELAY_SECONDS:
392 return self.REFRESH_DELAY_SECONDS - elapsed
393
394 self.log.debug("MirrorSnapshotScheduleHandler: refresh_images")
395
396 with self.lock:
397 self.load_schedules()
398 if not self.schedules:
399 self.log.debug("MirrorSnapshotScheduleHandler: no schedules")
400 self.images = {}
401 self.queue = {}
402 self.last_refresh_images = datetime.now()
403 return self.REFRESH_DELAY_SECONDS
404
405 images: Dict[str, Dict[str, Dict[str, str]]] = {}
406
407 for pool_id, pool_name in get_rbd_pools(self.module).items():
408 if not self.schedules.intersects(
409 LevelSpec.from_pool_spec(pool_id, pool_name)):
410 continue
411 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
412 self.load_pool_images(ioctx, images)
413
414 with self.lock:
415 self.refresh_queue(images)
416 self.images = images
417
418 self.last_refresh_images = datetime.now()
419 return self.REFRESH_DELAY_SECONDS
420
421 def load_pool_images(self,
422 ioctx: rados.Ioctx,
423 images: Dict[str, Dict[str, Dict[str, str]]]) -> None:
424 pool_id = str(ioctx.get_pool_id())
425 pool_name = ioctx.get_pool_name()
426 images[pool_id] = {}
427
428 self.log.debug("load_pool_images: pool={}".format(pool_name))
429
430 try:
431 namespaces = [''] + rbd.RBD().namespace_list(ioctx)
432 for namespace in namespaces:
433 if not self.schedules.intersects(
434 LevelSpec.from_pool_spec(int(pool_id), pool_name, namespace)):
435 continue
436 self.log.debug("load_pool_images: pool={}, namespace={}".format(
437 pool_name, namespace))
438 images[pool_id][namespace] = {}
439 ioctx.set_namespace(namespace)
440 mirror_images = dict(rbd.RBD().mirror_image_info_list(
441 ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT))
442 if not mirror_images:
443 continue
444 image_names = dict(
445 [(x['id'], x['name']) for x in filter(
446 lambda x: x['id'] in mirror_images,
447 rbd.RBD().list2(ioctx))])
448 for image_id, info in mirror_images.items():
449 if not info['primary']:
450 continue
451 image_name = image_names.get(image_id)
452 if not image_name:
453 continue
454 if namespace:
455 name = "{}/{}/{}".format(pool_name, namespace,
456 image_name)
457 else:
458 name = "{}/{}".format(pool_name, image_name)
459 self.log.debug(
460 "load_pool_images: adding image {}".format(name))
461 images[pool_id][namespace][image_id] = name
462 except rbd.ConnectionShutdown:
463 raise
464 except Exception as e:
465 self.log.error(
466 "load_pool_images: exception when scanning pool {}: {}".format(
467 pool_name, e))
468
469 def rebuild_queue(self) -> None:
470 now = datetime.now()
471
472 # don't remove from queue "due" images
473 now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
474
475 for schedule_time in list(self.queue):
476 if schedule_time > now_string:
477 del self.queue[schedule_time]
478
479 if not self.schedules:
480 return
481
482 for pool_id in self.images:
483 for namespace in self.images[pool_id]:
484 for image_id in self.images[pool_id][namespace]:
485 self.enqueue(now, pool_id, namespace, image_id)
486
487 self.condition.notify()
488
489 def refresh_queue(self,
490 current_images: Dict[str, Dict[str, Dict[str, str]]]) -> None:
491 now = datetime.now()
492
493 for pool_id in self.images:
494 for namespace in self.images[pool_id]:
495 for image_id in self.images[pool_id][namespace]:
496 if pool_id not in current_images or \
497 namespace not in current_images[pool_id] or \
498 image_id not in current_images[pool_id][namespace]:
499 self.remove_from_queue(pool_id, namespace, image_id)
500
501 for pool_id in current_images:
502 for namespace in current_images[pool_id]:
503 for image_id in current_images[pool_id][namespace]:
504 if pool_id not in self.images or \
505 namespace not in self.images[pool_id] or \
506 image_id not in self.images[pool_id][namespace]:
507 self.enqueue(now, pool_id, namespace, image_id)
508
509 self.condition.notify()
510
511 def enqueue(self, now: datetime, pool_id: str, namespace: str, image_id: str) -> None:
512 schedule = self.schedules.find(pool_id, namespace, image_id)
513 if not schedule:
514 self.log.debug(
515 "MirrorSnapshotScheduleHandler: no schedule for {}/{}/{}".format(
516 pool_id, namespace, image_id))
517 return
518
519 schedule_time = schedule.next_run(now)
520 if schedule_time not in self.queue:
521 self.queue[schedule_time] = []
522 self.log.debug(
523 "MirrorSnapshotScheduleHandler: scheduling {}/{}/{} at {}".format(
524 pool_id, namespace, image_id, schedule_time))
525 image_spec = ImageSpec(pool_id, namespace, image_id)
526 if image_spec not in self.queue[schedule_time]:
527 self.queue[schedule_time].append(image_spec)
528
529 def dequeue(self) -> Tuple[Optional[ImageSpec], float]:
530 if not self.queue:
531 return None, 1000.0
532
533 now = datetime.now()
534 schedule_time = sorted(self.queue)[0]
535
536 if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
537 wait_time = (datetime.strptime(schedule_time,
538 "%Y-%m-%d %H:%M:%S") - now)
539 return None, wait_time.total_seconds()
540
541 images = self.queue[schedule_time]
542 image = images.pop(0)
543 if not images:
544 del self.queue[schedule_time]
545 return image, 0.0
546
547 def remove_from_queue(self, pool_id: str, namespace: str, image_id: str) -> None:
548 self.log.debug(
549 "MirrorSnapshotScheduleHandler: descheduling {}/{}/{}".format(
550 pool_id, namespace, image_id))
551
552 empty_slots = []
553 image_spec = ImageSpec(pool_id, namespace, image_id)
554 for schedule_time, images in self.queue.items():
555 if image_spec in images:
556 images.remove(image_spec)
557 if not images:
558 empty_slots.append(schedule_time)
559 for schedule_time in empty_slots:
560 del self.queue[schedule_time]
561
562 def add_schedule(self,
563 level_spec: LevelSpec,
564 interval: str,
565 start_time: Optional[str]) -> Tuple[int, str, str]:
566 self.log.debug(
567 "MirrorSnapshotScheduleHandler: add_schedule: level_spec={}, interval={}, start_time={}".format(
568 level_spec.name, interval, start_time))
569
570 # TODO: optimize to rebuild only affected part of the queue
571 with self.lock:
572 self.schedules.add(level_spec, interval, start_time)
573 self.rebuild_queue()
574 return 0, "", ""
575
576 def remove_schedule(self,
577 level_spec: LevelSpec,
578 interval: Optional[str],
579 start_time: Optional[str]) -> Tuple[int, str, str]:
580 self.log.debug(
581 "MirrorSnapshotScheduleHandler: remove_schedule: level_spec={}, interval={}, start_time={}".format(
582 level_spec.name, interval, start_time))
583
584 # TODO: optimize to rebuild only affected part of the queue
585 with self.lock:
586 self.schedules.remove(level_spec, interval, start_time)
587 self.rebuild_queue()
588 return 0, "", ""
589
590 def list(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
591 self.log.debug(
592 "MirrorSnapshotScheduleHandler: list: level_spec={}".format(
593 level_spec.name))
594
595 with self.lock:
596 result = self.schedules.to_list(level_spec)
597
598 return 0, json.dumps(result, indent=4, sort_keys=True), ""
599
600 def status(self, level_spec: LevelSpec) -> Tuple[int, str, str]:
601 self.log.debug(
602 "MirrorSnapshotScheduleHandler: status: level_spec={}".format(
603 level_spec.name))
604
605 scheduled_images = []
606 with self.lock:
607 for schedule_time in sorted(self.queue):
608 for pool_id, namespace, image_id in self.queue[schedule_time]:
609 if not level_spec.matches(pool_id, namespace, image_id):
610 continue
611 image_name = self.images[pool_id][namespace][image_id]
612 scheduled_images.append({
613 'schedule_time': schedule_time,
614 'image': image_name
615 })
616 return 0, json.dumps({'scheduled_images': scheduled_images},
617 indent=4, sort_keys=True), ""