]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rbd_support/schedule.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / pybind / mgr / rbd_support / schedule.py
CommitLineData
20effc67 1import datetime
9f95a23c
TL
2import json
3import rados
4import rbd
5import re
6
9f95a23c 7from dateutil.parser import parse
20effc67 8from typing import cast, Any, Callable, Dict, List, Optional, Set, Tuple, TYPE_CHECKING
9f95a23c
TL
9
10from .common import get_rbd_pools
20effc67
TL
11if TYPE_CHECKING:
12 from .module import Module
9f95a23c
TL
13
14SCHEDULE_INTERVAL = "interval"
15SCHEDULE_START_TIME = "start_time"
16
17
18class LevelSpec:
19
20effc67
TL
20 def __init__(self,
21 name: str,
22 id: str,
23 pool_id: Optional[str],
24 namespace: Optional[str],
25 image_id: Optional[str] = None) -> None:
9f95a23c
TL
26 self.name = name
27 self.id = id
28 self.pool_id = pool_id
29 self.namespace = namespace
30 self.image_id = image_id
31
20effc67 32 def __eq__(self, level_spec: Any) -> bool:
9f95a23c
TL
33 return self.id == level_spec.id
34
20effc67 35 def is_child_of(self, level_spec: 'LevelSpec') -> bool:
9f95a23c
TL
36 if level_spec.is_global():
37 return not self.is_global()
38 if level_spec.pool_id != self.pool_id:
39 return False
40 if level_spec.namespace is None:
41 return self.namespace is not None
42 if level_spec.namespace != self.namespace:
43 return False
44 if level_spec.image_id is None:
45 return self.image_id is not None
46 return False
47
20effc67 48 def is_global(self) -> bool:
9f95a23c
TL
49 return self.pool_id is None
50
20effc67 51 def get_pool_id(self) -> Optional[str]:
9f95a23c
TL
52 return self.pool_id
53
20effc67
TL
54 def matches(self,
55 pool_id: str,
56 namespace: str,
57 image_id: Optional[str] = None) -> bool:
9f95a23c
TL
58 if self.pool_id and self.pool_id != pool_id:
59 return False
60 if self.namespace and self.namespace != namespace:
61 return False
62 if self.image_id and self.image_id != image_id:
63 return False
64 return True
65
20effc67 66 def intersects(self, level_spec: 'LevelSpec') -> bool:
9f95a23c
TL
67 if self.pool_id is None or level_spec.pool_id is None:
68 return True
69 if self.pool_id != level_spec.pool_id:
70 return False
71 if self.namespace is None or level_spec.namespace is None:
72 return True
73 if self.namespace != level_spec.namespace:
74 return False
75 if self.image_id is None or level_spec.image_id is None:
76 return True
77 if self.image_id != level_spec.image_id:
78 return False
79 return True
80
81 @classmethod
20effc67 82 def make_global(cls) -> 'LevelSpec':
9f95a23c
TL
83 return LevelSpec("", "", None, None, None)
84
85 @classmethod
20effc67
TL
86 def from_pool_spec(cls,
87 pool_id: int,
88 pool_name: str,
89 namespace: Optional[str] = None) -> 'LevelSpec':
9f95a23c
TL
90 if namespace is None:
91 id = "{}".format(pool_id)
92 name = "{}/".format(pool_name)
93 else:
94 id = "{}/{}".format(pool_id, namespace)
95 name = "{}/{}/".format(pool_name, namespace)
96 return LevelSpec(name, id, str(pool_id), namespace, None)
97
98 @classmethod
20effc67
TL
99 def from_name(cls,
100 module: 'Module',
101 name: str,
102 namespace_validator: Optional[Callable] = None,
103 image_validator: Optional[Callable] = None,
104 allow_image_level: bool = True) -> 'LevelSpec':
9f95a23c
TL
105 # parse names like:
106 # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
107 match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
108 name)
109 if not match:
110 raise ValueError("failed to parse {}".format(name))
111 if match.group(3) and not allow_image_level:
112 raise ValueError(
113 "invalid name {}: image level is not allowed".format(name))
114
115 id = ""
116 pool_id = None
117 namespace = None
118 image_name = None
119 image_id = None
120 if match.group(1):
121 pool_name = match.group(1)
122 try:
20effc67 123 pool_id = module.rados.pool_lookup(pool_name)
9f95a23c
TL
124 if pool_id is None:
125 raise ValueError("pool {} does not exist".format(pool_name))
20effc67 126 if pool_id not in get_rbd_pools(module):
9f95a23c 127 raise ValueError("{} is not an RBD pool".format(pool_name))
1d09f67e
TL
128 pool_id = str(pool_id)
129 id += pool_id
9f95a23c
TL
130 if match.group(2) is not None or match.group(3):
131 id += "/"
20effc67 132 with module.rados.open_ioctx(pool_name) as ioctx:
9f95a23c
TL
133 namespace = match.group(2) or ""
134 if namespace:
135 namespaces = rbd.RBD().namespace_list(ioctx)
136 if namespace not in namespaces:
137 raise ValueError(
138 "namespace {} does not exist".format(
139 namespace))
140 id += namespace
141 ioctx.set_namespace(namespace)
142 if namespace_validator:
143 namespace_validator(ioctx)
144 if match.group(3):
145 image_name = match.group(3)
146 try:
147 with rbd.Image(ioctx, image_name,
148 read_only=True) as image:
149 image_id = image.id()
150 id += "/" + image_id
151 if image_validator:
152 image_validator(image)
153 except rbd.ImageNotFound:
154 raise ValueError("image {} does not exist".format(
155 image_name))
156 except rbd.InvalidArgument:
157 raise ValueError(
158 "image {} is not in snapshot mirror mode".format(
159 image_name))
160
161 except rados.ObjectNotFound:
162 raise ValueError("pool {} does not exist".format(pool_name))
163
164 # normalize possible input name like 'rbd//image'
165 if not namespace and image_name:
166 name = "{}/{}".format(pool_name, image_name)
167
168 return LevelSpec(name, id, pool_id, namespace, image_id)
169
170 @classmethod
20effc67
TL
171 def from_id(cls,
172 handler: Any,
173 id: str,
174 namespace_validator: Optional[Callable] = None,
175 image_validator: Optional[Callable] = None) -> 'LevelSpec':
9f95a23c
TL
176 # parse ids like:
177 # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
178 match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
179 if not match:
180 raise ValueError("failed to parse: {}".format(id))
181
182 name = ""
183 pool_id = None
184 namespace = None
185 image_id = None
186 if match.group(1):
187 pool_id = match.group(1)
188 try:
189 pool_name = handler.module.rados.pool_reverse_lookup(
190 int(pool_id))
191 if pool_name is None:
192 raise ValueError("pool {} does not exist".format(pool_name))
193 name += pool_name + "/"
194 if match.group(2) is not None or match.group(3):
195 with handler.module.rados.open_ioctx(pool_name) as ioctx:
196 namespace = match.group(2) or ""
197 if namespace:
198 namespaces = rbd.RBD().namespace_list(ioctx)
199 if namespace not in namespaces:
200 raise ValueError(
201 "namespace {} does not exist".format(
202 namespace))
203 name += namespace + "/"
204 if namespace_validator:
205 ioctx.set_namespace(namespace)
206 elif not match.group(3):
207 name += "/"
208 if match.group(3):
209 image_id = match.group(3)
210 try:
211 with rbd.Image(ioctx, image_id=image_id,
212 read_only=True) as image:
213 image_name = image.get_name()
214 name += image_name
215 if image_validator:
216 image_validator(image)
217 except rbd.ImageNotFound:
218 raise ValueError("image {} does not exist".format(
219 image_id))
220 except rbd.InvalidArgument:
221 raise ValueError(
222 "image {} is not in snapshot mirror mode".format(
223 image_id))
224
225 except rados.ObjectNotFound:
226 raise ValueError("pool {} does not exist".format(pool_id))
227
228 return LevelSpec(name, id, pool_id, namespace, image_id)
229
230
231class Interval:
232
20effc67 233 def __init__(self, minutes: int) -> None:
9f95a23c
TL
234 self.minutes = minutes
235
20effc67 236 def __eq__(self, interval: Any) -> bool:
9f95a23c
TL
237 return self.minutes == interval.minutes
238
20effc67 239 def __hash__(self) -> int:
9f95a23c
TL
240 return hash(self.minutes)
241
20effc67 242 def to_string(self) -> str:
9f95a23c
TL
243 if self.minutes % (60 * 24) == 0:
244 interval = int(self.minutes / (60 * 24))
245 units = 'd'
246 elif self.minutes % 60 == 0:
247 interval = int(self.minutes / 60)
248 units = 'h'
249 else:
250 interval = int(self.minutes)
251 units = 'm'
252
253 return "{}{}".format(interval, units)
254
255 @classmethod
20effc67 256 def from_string(cls, interval: str) -> 'Interval':
9f95a23c
TL
257 match = re.match(r'^(\d+)(d|h|m)?$', interval)
258 if not match:
259 raise ValueError("Invalid interval ({})".format(interval))
260
261 minutes = int(match.group(1))
262 if match.group(2) == 'd':
263 minutes *= 60 * 24
264 elif match.group(2) == 'h':
265 minutes *= 60
266
267 return Interval(minutes)
268
269
270class StartTime:
271
20effc67
TL
272 def __init__(self,
273 hour: int,
274 minute: int,
275 tzinfo: Optional[datetime.tzinfo]) -> None:
276 self.time = datetime.time(hour, minute, tzinfo=tzinfo)
9f95a23c
TL
277 self.minutes = self.time.hour * 60 + self.time.minute
278 if self.time.tzinfo:
20effc67
TL
279 utcoffset = cast(datetime.timedelta, self.time.utcoffset())
280 self.minutes += int(utcoffset.seconds / 60)
9f95a23c 281
20effc67 282 def __eq__(self, start_time: Any) -> bool:
9f95a23c
TL
283 return self.minutes == start_time.minutes
284
20effc67 285 def __hash__(self) -> int:
9f95a23c
TL
286 return hash(self.minutes)
287
20effc67 288 def to_string(self) -> str:
9f95a23c
TL
289 return self.time.isoformat()
290
291 @classmethod
20effc67 292 def from_string(cls, start_time: Optional[str]) -> Optional['StartTime']:
9f95a23c
TL
293 if not start_time:
294 return None
295
296 try:
297 t = parse(start_time).timetz()
298 except ValueError as e:
299 raise ValueError("Invalid start time {}: {}".format(start_time, e))
300
301 return StartTime(t.hour, t.minute, tzinfo=t.tzinfo)
302
303
304class Schedule:
305
20effc67 306 def __init__(self, name: str) -> None:
9f95a23c 307 self.name = name
20effc67 308 self.items: Set[Tuple[Interval, Optional[StartTime]]] = set()
9f95a23c 309
20effc67 310 def __len__(self) -> int:
9f95a23c
TL
311 return len(self.items)
312
20effc67
TL
313 def add(self,
314 interval: Interval,
315 start_time: Optional[StartTime] = None) -> None:
9f95a23c
TL
316 self.items.add((interval, start_time))
317
20effc67
TL
318 def remove(self,
319 interval: Interval,
320 start_time: Optional[StartTime] = None) -> None:
9f95a23c
TL
321 self.items.discard((interval, start_time))
322
20effc67 323 def next_run(self, now: datetime.datetime) -> str:
9f95a23c 324 schedule_time = None
20effc67
TL
325 for interval, opt_start in self.items:
326 period = datetime.timedelta(minutes=interval.minutes)
327 start_time = datetime.datetime(1970, 1, 1)
328 if opt_start:
329 start = cast(StartTime, opt_start)
330 start_time += datetime.timedelta(minutes=start.minutes)
9f95a23c
TL
331 time = start_time + \
332 (int((now - start_time) / period) + 1) * period
333 if schedule_time is None or time < schedule_time:
334 schedule_time = time
20effc67
TL
335 if schedule_time is None:
336 raise ValueError('no items is added')
337 return datetime.datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00")
338
339 def to_list(self) -> List[Dict[str, Optional[str]]]:
340 def item_to_dict(interval: Interval,
341 start_time: Optional[StartTime]) -> Dict[str, Optional[str]]:
342 if start_time:
343 schedule_start_time: Optional[str] = start_time.to_string()
344 else:
345 schedule_start_time = None
346 return {SCHEDULE_INTERVAL: interval.to_string(),
347 SCHEDULE_START_TIME: schedule_start_time}
348 return [item_to_dict(interval, start_time)
349 for interval, start_time in self.items]
9f95a23c 350
20effc67 351 def to_json(self) -> str:
9f95a23c
TL
352 return json.dumps(self.to_list(), indent=4, sort_keys=True)
353
354 @classmethod
20effc67 355 def from_json(cls, name: str, val: str) -> 'Schedule':
9f95a23c
TL
356 try:
357 items = json.loads(val)
358 schedule = Schedule(name)
359 for item in items:
360 interval = Interval.from_string(item[SCHEDULE_INTERVAL])
361 start_time = item[SCHEDULE_START_TIME] and \
362 StartTime.from_string(item[SCHEDULE_START_TIME]) or None
363 schedule.add(interval, start_time)
364 return schedule
365 except json.JSONDecodeError as e:
366 raise ValueError("Invalid JSON ({})".format(str(e)))
367 except KeyError as e:
368 raise ValueError(
369 "Invalid schedule format (missing key {})".format(str(e)))
370 except TypeError as e:
371 raise ValueError("Invalid schedule format ({})".format(str(e)))
372
20effc67 373
9f95a23c
TL
374class Schedules:
375
20effc67 376 def __init__(self, handler: Any) -> None:
9f95a23c 377 self.handler = handler
20effc67
TL
378 self.level_specs: Dict[str, LevelSpec] = {}
379 self.schedules: Dict[str, Schedule] = {}
9f95a23c 380
adb31ebb
TL
381 # Previous versions incorrectly stored the global config in
382 # the localized module option. Check the config is here and fix it.
39ae355f
TL
383 schedule_cfg = self.handler.module.get_module_option(
384 self.handler.MODULE_OPTION_NAME, '')
adb31ebb
TL
385 if not schedule_cfg:
386 schedule_cfg = self.handler.module.get_localized_module_option(
387 self.handler.MODULE_OPTION_NAME, '')
388 if schedule_cfg:
389 self.handler.module.set_module_option(
390 self.handler.MODULE_OPTION_NAME, schedule_cfg)
391 self.handler.module.set_localized_module_option(
392 self.handler.MODULE_OPTION_NAME, None)
393
39ae355f
TL
394 def __len__(self) -> int:
395 return len(self.schedules)
396
397 def load(self,
398 namespace_validator: Optional[Callable] = None,
399 image_validator: Optional[Callable] = None) -> None:
400 self.level_specs = {}
401 self.schedules = {}
402
403 schedule_cfg = self.handler.module.get_module_option(
404 self.handler.MODULE_OPTION_NAME, '')
9f95a23c
TL
405 if schedule_cfg:
406 try:
407 level_spec = LevelSpec.make_global()
408 self.level_specs[level_spec.id] = level_spec
409 schedule = Schedule.from_json(level_spec.name, schedule_cfg)
410 self.schedules[level_spec.id] = schedule
411 except ValueError:
412 self.handler.log.error(
413 "Failed to decode configured schedule {}".format(
414 schedule_cfg))
415
416 for pool_id, pool_name in get_rbd_pools(self.handler.module).items():
417 try:
418 with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
419 self.load_from_pool(ioctx, namespace_validator,
420 image_validator)
1e59de90
TL
421 except rados.ConnectionShutdown:
422 raise
9f95a23c
TL
423 except rados.Error as e:
424 self.handler.log.error(
425 "Failed to load schedules for pool {}: {}".format(
426 pool_name, e))
427
20effc67
TL
428 def load_from_pool(self,
429 ioctx: rados.Ioctx,
430 namespace_validator: Optional[Callable],
431 image_validator: Optional[Callable]) -> None:
9f95a23c 432 pool_name = ioctx.get_pool_name()
20effc67 433 stale_keys = []
9f95a23c
TL
434 start_after = ''
435 try:
436 while True:
437 with rados.ReadOpCtx() as read_op:
438 self.handler.log.info(
439 "load_schedules: {}, start_after={}".format(
440 pool_name, start_after))
441 it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
442 ioctx.operate_read_op(read_op, self.handler.SCHEDULE_OID)
443
444 it = list(it)
445 for k, v in it:
446 start_after = k
447 v = v.decode()
448 self.handler.log.info(
449 "load_schedule: {} {}".format(k, v))
450 try:
451 try:
452 level_spec = LevelSpec.from_id(
453 self.handler, k, namespace_validator,
454 image_validator)
455 except ValueError:
456 self.handler.log.debug(
20effc67
TL
457 "Stale schedule key %s in pool %s",
458 k, pool_name)
459 stale_keys.append(k)
9f95a23c
TL
460 continue
461
462 self.level_specs[level_spec.id] = level_spec
463 schedule = Schedule.from_json(level_spec.name, v)
464 self.schedules[level_spec.id] = schedule
465 except ValueError:
466 self.handler.log.error(
467 "Failed to decode schedule: pool={}, {} {}".format(
468 pool_name, k, v))
469 if not it:
470 break
471
472 except StopIteration:
473 pass
474 except rados.ObjectNotFound:
475 pass
476
477 if stale_keys:
478 with rados.WriteOpCtx() as write_op:
479 ioctx.remove_omap_keys(write_op, stale_keys)
480 ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
481
20effc67 482 def save(self, level_spec: LevelSpec, schedule: Optional[Schedule]) -> None:
9f95a23c 483 if level_spec.is_global():
adb31ebb
TL
484 schedule_cfg = schedule and schedule.to_json() or None
485 self.handler.module.set_module_option(
9f95a23c
TL
486 self.handler.MODULE_OPTION_NAME, schedule_cfg)
487 return
488
489 pool_id = level_spec.get_pool_id()
20effc67 490 assert pool_id
9f95a23c
TL
491 with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
492 with rados.WriteOpCtx() as write_op:
493 if schedule:
494 ioctx.set_omap(write_op, (level_spec.id, ),
495 (schedule.to_json(), ))
496 else:
497 ioctx.remove_omap_keys(write_op, (level_spec.id, ))
498 ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
499
20effc67
TL
500 def add(self,
501 level_spec: LevelSpec,
502 interval: str,
503 start_time: Optional[str]) -> None:
9f95a23c
TL
504 schedule = self.schedules.get(level_spec.id, Schedule(level_spec.name))
505 schedule.add(Interval.from_string(interval),
506 StartTime.from_string(start_time))
507 self.schedules[level_spec.id] = schedule
508 self.level_specs[level_spec.id] = level_spec
509 self.save(level_spec, schedule)
510
20effc67
TL
511 def remove(self,
512 level_spec: LevelSpec,
513 interval: Optional[str],
514 start_time: Optional[str]) -> None:
9f95a23c
TL
515 schedule = self.schedules.pop(level_spec.id, None)
516 if schedule:
517 if interval is None:
518 schedule = None
519 else:
20effc67
TL
520 try:
521 schedule.remove(Interval.from_string(interval),
522 StartTime.from_string(start_time))
523 finally:
524 if schedule:
525 self.schedules[level_spec.id] = schedule
9f95a23c
TL
526 if not schedule:
527 del self.level_specs[level_spec.id]
528 self.save(level_spec, schedule)
529
20effc67
TL
530 def find(self,
531 pool_id: str,
532 namespace: str,
533 image_id: Optional[str] = None) -> Optional['Schedule']:
534 levels = [pool_id, namespace]
9f95a23c
TL
535 if image_id:
536 levels.append(image_id)
20effc67
TL
537 nr_levels = len(levels)
538 while nr_levels >= 0:
539 # an empty spec id implies global schedule
540 level_spec_id = "/".join(levels[:nr_levels])
541 found = self.schedules.get(level_spec_id)
542 if found is not None:
543 return found
544 nr_levels -= 1
9f95a23c
TL
545 return None
546
20effc67 547 def intersects(self, level_spec: LevelSpec) -> bool:
9f95a23c
TL
548 for ls in self.level_specs.values():
549 if ls.intersects(level_spec):
550 return True
551 return False
552
20effc67 553 def to_list(self, level_spec: LevelSpec) -> Dict[str, dict]:
9f95a23c 554 if level_spec.id in self.schedules:
20effc67 555 parent: Optional[LevelSpec] = level_spec
9f95a23c
TL
556 else:
557 # try to find existing parent
558 parent = None
559 for level_spec_id in self.schedules:
560 ls = self.level_specs[level_spec_id]
561 if ls == level_spec:
562 parent = ls
563 break
564 if level_spec.is_child_of(ls) and \
565 (not parent or ls.is_child_of(parent)):
566 parent = ls
567 if not parent:
568 # set to non-existing parent so we still could list its children
569 parent = level_spec
570
571 result = {}
572 for level_spec_id, schedule in self.schedules.items():
573 ls = self.level_specs[level_spec_id]
574 if ls == parent or ls == level_spec or ls.is_child_of(level_spec):
575 result[level_spec_id] = {
1e59de90
TL
576 'name': schedule.name,
577 'schedule': schedule.to_list(),
9f95a23c
TL
578 }
579 return result