]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rbd_support/schedule.py
buildsys: change download over to reef release
[ceph.git] / ceph / src / pybind / mgr / rbd_support / schedule.py
CommitLineData
20effc67 1import datetime
9f95a23c
TL
2import json
3import rados
4import rbd
5import re
6
9f95a23c 7from dateutil.parser import parse
20effc67 8from typing import cast, Any, Callable, Dict, List, Optional, Set, Tuple, TYPE_CHECKING
9f95a23c
TL
9
10from .common import get_rbd_pools
20effc67
TL
11if TYPE_CHECKING:
12 from .module import Module
9f95a23c
TL
13
14SCHEDULE_INTERVAL = "interval"
15SCHEDULE_START_TIME = "start_time"
16
17
18class LevelSpec:
19
20effc67
TL
20 def __init__(self,
21 name: str,
22 id: str,
23 pool_id: Optional[str],
24 namespace: Optional[str],
25 image_id: Optional[str] = None) -> None:
9f95a23c
TL
26 self.name = name
27 self.id = id
28 self.pool_id = pool_id
29 self.namespace = namespace
30 self.image_id = image_id
31
20effc67 32 def __eq__(self, level_spec: Any) -> bool:
9f95a23c
TL
33 return self.id == level_spec.id
34
20effc67 35 def is_child_of(self, level_spec: 'LevelSpec') -> bool:
9f95a23c
TL
36 if level_spec.is_global():
37 return not self.is_global()
38 if level_spec.pool_id != self.pool_id:
39 return False
40 if level_spec.namespace is None:
41 return self.namespace is not None
42 if level_spec.namespace != self.namespace:
43 return False
44 if level_spec.image_id is None:
45 return self.image_id is not None
46 return False
47
20effc67 48 def is_global(self) -> bool:
9f95a23c
TL
49 return self.pool_id is None
50
20effc67 51 def get_pool_id(self) -> Optional[str]:
9f95a23c
TL
52 return self.pool_id
53
20effc67
TL
54 def matches(self,
55 pool_id: str,
56 namespace: str,
57 image_id: Optional[str] = None) -> bool:
9f95a23c
TL
58 if self.pool_id and self.pool_id != pool_id:
59 return False
60 if self.namespace and self.namespace != namespace:
61 return False
62 if self.image_id and self.image_id != image_id:
63 return False
64 return True
65
20effc67 66 def intersects(self, level_spec: 'LevelSpec') -> bool:
9f95a23c
TL
67 if self.pool_id is None or level_spec.pool_id is None:
68 return True
69 if self.pool_id != level_spec.pool_id:
70 return False
71 if self.namespace is None or level_spec.namespace is None:
72 return True
73 if self.namespace != level_spec.namespace:
74 return False
75 if self.image_id is None or level_spec.image_id is None:
76 return True
77 if self.image_id != level_spec.image_id:
78 return False
79 return True
80
81 @classmethod
20effc67 82 def make_global(cls) -> 'LevelSpec':
9f95a23c
TL
83 return LevelSpec("", "", None, None, None)
84
85 @classmethod
20effc67
TL
86 def from_pool_spec(cls,
87 pool_id: int,
88 pool_name: str,
89 namespace: Optional[str] = None) -> 'LevelSpec':
9f95a23c
TL
90 if namespace is None:
91 id = "{}".format(pool_id)
92 name = "{}/".format(pool_name)
93 else:
94 id = "{}/{}".format(pool_id, namespace)
95 name = "{}/{}/".format(pool_name, namespace)
96 return LevelSpec(name, id, str(pool_id), namespace, None)
97
98 @classmethod
20effc67
TL
99 def from_name(cls,
100 module: 'Module',
101 name: str,
102 namespace_validator: Optional[Callable] = None,
103 image_validator: Optional[Callable] = None,
104 allow_image_level: bool = True) -> 'LevelSpec':
9f95a23c
TL
105 # parse names like:
106 # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
107 match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
108 name)
109 if not match:
110 raise ValueError("failed to parse {}".format(name))
111 if match.group(3) and not allow_image_level:
112 raise ValueError(
113 "invalid name {}: image level is not allowed".format(name))
114
115 id = ""
116 pool_id = None
117 namespace = None
118 image_name = None
119 image_id = None
120 if match.group(1):
121 pool_name = match.group(1)
122 try:
20effc67 123 pool_id = module.rados.pool_lookup(pool_name)
9f95a23c
TL
124 if pool_id is None:
125 raise ValueError("pool {} does not exist".format(pool_name))
20effc67 126 if pool_id not in get_rbd_pools(module):
9f95a23c 127 raise ValueError("{} is not an RBD pool".format(pool_name))
1d09f67e
TL
128 pool_id = str(pool_id)
129 id += pool_id
9f95a23c
TL
130 if match.group(2) is not None or match.group(3):
131 id += "/"
20effc67 132 with module.rados.open_ioctx(pool_name) as ioctx:
9f95a23c
TL
133 namespace = match.group(2) or ""
134 if namespace:
135 namespaces = rbd.RBD().namespace_list(ioctx)
136 if namespace not in namespaces:
137 raise ValueError(
138 "namespace {} does not exist".format(
139 namespace))
140 id += namespace
141 ioctx.set_namespace(namespace)
142 if namespace_validator:
143 namespace_validator(ioctx)
144 if match.group(3):
145 image_name = match.group(3)
146 try:
147 with rbd.Image(ioctx, image_name,
148 read_only=True) as image:
149 image_id = image.id()
150 id += "/" + image_id
151 if image_validator:
152 image_validator(image)
153 except rbd.ImageNotFound:
154 raise ValueError("image {} does not exist".format(
155 image_name))
156 except rbd.InvalidArgument:
157 raise ValueError(
158 "image {} is not in snapshot mirror mode".format(
159 image_name))
160
161 except rados.ObjectNotFound:
162 raise ValueError("pool {} does not exist".format(pool_name))
163
164 # normalize possible input name like 'rbd//image'
165 if not namespace and image_name:
166 name = "{}/{}".format(pool_name, image_name)
167
168 return LevelSpec(name, id, pool_id, namespace, image_id)
169
170 @classmethod
20effc67
TL
171 def from_id(cls,
172 handler: Any,
173 id: str,
174 namespace_validator: Optional[Callable] = None,
175 image_validator: Optional[Callable] = None) -> 'LevelSpec':
9f95a23c
TL
176 # parse ids like:
177 # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
178 match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
179 if not match:
180 raise ValueError("failed to parse: {}".format(id))
181
182 name = ""
183 pool_id = None
184 namespace = None
185 image_id = None
186 if match.group(1):
187 pool_id = match.group(1)
188 try:
189 pool_name = handler.module.rados.pool_reverse_lookup(
190 int(pool_id))
191 if pool_name is None:
192 raise ValueError("pool {} does not exist".format(pool_name))
193 name += pool_name + "/"
194 if match.group(2) is not None or match.group(3):
195 with handler.module.rados.open_ioctx(pool_name) as ioctx:
196 namespace = match.group(2) or ""
197 if namespace:
198 namespaces = rbd.RBD().namespace_list(ioctx)
199 if namespace not in namespaces:
200 raise ValueError(
201 "namespace {} does not exist".format(
202 namespace))
203 name += namespace + "/"
204 if namespace_validator:
205 ioctx.set_namespace(namespace)
206 elif not match.group(3):
207 name += "/"
208 if match.group(3):
209 image_id = match.group(3)
210 try:
211 with rbd.Image(ioctx, image_id=image_id,
212 read_only=True) as image:
213 image_name = image.get_name()
214 name += image_name
215 if image_validator:
216 image_validator(image)
217 except rbd.ImageNotFound:
218 raise ValueError("image {} does not exist".format(
219 image_id))
220 except rbd.InvalidArgument:
221 raise ValueError(
222 "image {} is not in snapshot mirror mode".format(
223 image_id))
224
225 except rados.ObjectNotFound:
226 raise ValueError("pool {} does not exist".format(pool_id))
227
228 return LevelSpec(name, id, pool_id, namespace, image_id)
229
230
231class Interval:
232
20effc67 233 def __init__(self, minutes: int) -> None:
9f95a23c
TL
234 self.minutes = minutes
235
20effc67 236 def __eq__(self, interval: Any) -> bool:
9f95a23c
TL
237 return self.minutes == interval.minutes
238
20effc67 239 def __hash__(self) -> int:
9f95a23c
TL
240 return hash(self.minutes)
241
20effc67 242 def to_string(self) -> str:
9f95a23c
TL
243 if self.minutes % (60 * 24) == 0:
244 interval = int(self.minutes / (60 * 24))
245 units = 'd'
246 elif self.minutes % 60 == 0:
247 interval = int(self.minutes / 60)
248 units = 'h'
249 else:
250 interval = int(self.minutes)
251 units = 'm'
252
253 return "{}{}".format(interval, units)
254
255 @classmethod
20effc67 256 def from_string(cls, interval: str) -> 'Interval':
9f95a23c
TL
257 match = re.match(r'^(\d+)(d|h|m)?$', interval)
258 if not match:
259 raise ValueError("Invalid interval ({})".format(interval))
260
261 minutes = int(match.group(1))
262 if match.group(2) == 'd':
263 minutes *= 60 * 24
264 elif match.group(2) == 'h':
265 minutes *= 60
266
267 return Interval(minutes)
268
269
270class StartTime:
271
20effc67
TL
272 def __init__(self,
273 hour: int,
274 minute: int,
275 tzinfo: Optional[datetime.tzinfo]) -> None:
276 self.time = datetime.time(hour, minute, tzinfo=tzinfo)
9f95a23c
TL
277 self.minutes = self.time.hour * 60 + self.time.minute
278 if self.time.tzinfo:
20effc67
TL
279 utcoffset = cast(datetime.timedelta, self.time.utcoffset())
280 self.minutes += int(utcoffset.seconds / 60)
9f95a23c 281
20effc67 282 def __eq__(self, start_time: Any) -> bool:
9f95a23c
TL
283 return self.minutes == start_time.minutes
284
20effc67 285 def __hash__(self) -> int:
9f95a23c
TL
286 return hash(self.minutes)
287
20effc67 288 def to_string(self) -> str:
9f95a23c
TL
289 return self.time.isoformat()
290
291 @classmethod
20effc67 292 def from_string(cls, start_time: Optional[str]) -> Optional['StartTime']:
9f95a23c
TL
293 if not start_time:
294 return None
295
296 try:
297 t = parse(start_time).timetz()
298 except ValueError as e:
299 raise ValueError("Invalid start time {}: {}".format(start_time, e))
300
301 return StartTime(t.hour, t.minute, tzinfo=t.tzinfo)
302
303
304class Schedule:
305
20effc67 306 def __init__(self, name: str) -> None:
9f95a23c 307 self.name = name
20effc67 308 self.items: Set[Tuple[Interval, Optional[StartTime]]] = set()
9f95a23c 309
20effc67 310 def __len__(self) -> int:
9f95a23c
TL
311 return len(self.items)
312
20effc67
TL
313 def add(self,
314 interval: Interval,
315 start_time: Optional[StartTime] = None) -> None:
9f95a23c
TL
316 self.items.add((interval, start_time))
317
20effc67
TL
318 def remove(self,
319 interval: Interval,
320 start_time: Optional[StartTime] = None) -> None:
9f95a23c
TL
321 self.items.discard((interval, start_time))
322
20effc67 323 def next_run(self, now: datetime.datetime) -> str:
9f95a23c 324 schedule_time = None
20effc67
TL
325 for interval, opt_start in self.items:
326 period = datetime.timedelta(minutes=interval.minutes)
327 start_time = datetime.datetime(1970, 1, 1)
328 if opt_start:
329 start = cast(StartTime, opt_start)
330 start_time += datetime.timedelta(minutes=start.minutes)
9f95a23c
TL
331 time = start_time + \
332 (int((now - start_time) / period) + 1) * period
333 if schedule_time is None or time < schedule_time:
334 schedule_time = time
20effc67
TL
335 if schedule_time is None:
336 raise ValueError('no items is added')
337 return datetime.datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00")
338
339 def to_list(self) -> List[Dict[str, Optional[str]]]:
340 def item_to_dict(interval: Interval,
341 start_time: Optional[StartTime]) -> Dict[str, Optional[str]]:
342 if start_time:
343 schedule_start_time: Optional[str] = start_time.to_string()
344 else:
345 schedule_start_time = None
346 return {SCHEDULE_INTERVAL: interval.to_string(),
347 SCHEDULE_START_TIME: schedule_start_time}
348 return [item_to_dict(interval, start_time)
349 for interval, start_time in self.items]
9f95a23c 350
20effc67 351 def to_json(self) -> str:
9f95a23c
TL
352 return json.dumps(self.to_list(), indent=4, sort_keys=True)
353
354 @classmethod
20effc67 355 def from_json(cls, name: str, val: str) -> 'Schedule':
9f95a23c
TL
356 try:
357 items = json.loads(val)
358 schedule = Schedule(name)
359 for item in items:
360 interval = Interval.from_string(item[SCHEDULE_INTERVAL])
361 start_time = item[SCHEDULE_START_TIME] and \
362 StartTime.from_string(item[SCHEDULE_START_TIME]) or None
363 schedule.add(interval, start_time)
364 return schedule
365 except json.JSONDecodeError as e:
366 raise ValueError("Invalid JSON ({})".format(str(e)))
367 except KeyError as e:
368 raise ValueError(
369 "Invalid schedule format (missing key {})".format(str(e)))
370 except TypeError as e:
371 raise ValueError("Invalid schedule format ({})".format(str(e)))
372
20effc67 373
9f95a23c
TL
374class Schedules:
375
20effc67 376 def __init__(self, handler: Any) -> None:
9f95a23c 377 self.handler = handler
20effc67
TL
378 self.level_specs: Dict[str, LevelSpec] = {}
379 self.schedules: Dict[str, Schedule] = {}
9f95a23c 380
adb31ebb
TL
381 # Previous versions incorrectly stored the global config in
382 # the localized module option. Check the config is here and fix it.
39ae355f
TL
383 schedule_cfg = self.handler.module.get_module_option(
384 self.handler.MODULE_OPTION_NAME, '')
adb31ebb
TL
385 if not schedule_cfg:
386 schedule_cfg = self.handler.module.get_localized_module_option(
387 self.handler.MODULE_OPTION_NAME, '')
388 if schedule_cfg:
389 self.handler.module.set_module_option(
390 self.handler.MODULE_OPTION_NAME, schedule_cfg)
391 self.handler.module.set_localized_module_option(
392 self.handler.MODULE_OPTION_NAME, None)
393
39ae355f
TL
394 def __len__(self) -> int:
395 return len(self.schedules)
396
397 def load(self,
398 namespace_validator: Optional[Callable] = None,
399 image_validator: Optional[Callable] = None) -> None:
400 self.level_specs = {}
401 self.schedules = {}
402
403 schedule_cfg = self.handler.module.get_module_option(
404 self.handler.MODULE_OPTION_NAME, '')
9f95a23c
TL
405 if schedule_cfg:
406 try:
407 level_spec = LevelSpec.make_global()
408 self.level_specs[level_spec.id] = level_spec
409 schedule = Schedule.from_json(level_spec.name, schedule_cfg)
410 self.schedules[level_spec.id] = schedule
411 except ValueError:
412 self.handler.log.error(
413 "Failed to decode configured schedule {}".format(
414 schedule_cfg))
415
416 for pool_id, pool_name in get_rbd_pools(self.handler.module).items():
417 try:
418 with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
419 self.load_from_pool(ioctx, namespace_validator,
420 image_validator)
421 except rados.Error as e:
422 self.handler.log.error(
423 "Failed to load schedules for pool {}: {}".format(
424 pool_name, e))
425
20effc67
TL
426 def load_from_pool(self,
427 ioctx: rados.Ioctx,
428 namespace_validator: Optional[Callable],
429 image_validator: Optional[Callable]) -> None:
9f95a23c
TL
430 pool_id = ioctx.get_pool_id()
431 pool_name = ioctx.get_pool_name()
20effc67 432 stale_keys = []
9f95a23c
TL
433 start_after = ''
434 try:
435 while True:
436 with rados.ReadOpCtx() as read_op:
437 self.handler.log.info(
438 "load_schedules: {}, start_after={}".format(
439 pool_name, start_after))
440 it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
441 ioctx.operate_read_op(read_op, self.handler.SCHEDULE_OID)
442
443 it = list(it)
444 for k, v in it:
445 start_after = k
446 v = v.decode()
447 self.handler.log.info(
448 "load_schedule: {} {}".format(k, v))
449 try:
450 try:
451 level_spec = LevelSpec.from_id(
452 self.handler, k, namespace_validator,
453 image_validator)
454 except ValueError:
455 self.handler.log.debug(
20effc67
TL
456 "Stale schedule key %s in pool %s",
457 k, pool_name)
458 stale_keys.append(k)
9f95a23c
TL
459 continue
460
461 self.level_specs[level_spec.id] = level_spec
462 schedule = Schedule.from_json(level_spec.name, v)
463 self.schedules[level_spec.id] = schedule
464 except ValueError:
465 self.handler.log.error(
466 "Failed to decode schedule: pool={}, {} {}".format(
467 pool_name, k, v))
468 if not it:
469 break
470
471 except StopIteration:
472 pass
473 except rados.ObjectNotFound:
474 pass
475
476 if stale_keys:
477 with rados.WriteOpCtx() as write_op:
478 ioctx.remove_omap_keys(write_op, stale_keys)
479 ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
480
20effc67 481 def save(self, level_spec: LevelSpec, schedule: Optional[Schedule]) -> None:
9f95a23c 482 if level_spec.is_global():
adb31ebb
TL
483 schedule_cfg = schedule and schedule.to_json() or None
484 self.handler.module.set_module_option(
9f95a23c
TL
485 self.handler.MODULE_OPTION_NAME, schedule_cfg)
486 return
487
488 pool_id = level_spec.get_pool_id()
20effc67 489 assert pool_id
9f95a23c
TL
490 with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
491 with rados.WriteOpCtx() as write_op:
492 if schedule:
493 ioctx.set_omap(write_op, (level_spec.id, ),
494 (schedule.to_json(), ))
495 else:
496 ioctx.remove_omap_keys(write_op, (level_spec.id, ))
497 ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
498
20effc67
TL
499 def add(self,
500 level_spec: LevelSpec,
501 interval: str,
502 start_time: Optional[str]) -> None:
9f95a23c
TL
503 schedule = self.schedules.get(level_spec.id, Schedule(level_spec.name))
504 schedule.add(Interval.from_string(interval),
505 StartTime.from_string(start_time))
506 self.schedules[level_spec.id] = schedule
507 self.level_specs[level_spec.id] = level_spec
508 self.save(level_spec, schedule)
509
20effc67
TL
510 def remove(self,
511 level_spec: LevelSpec,
512 interval: Optional[str],
513 start_time: Optional[str]) -> None:
9f95a23c
TL
514 schedule = self.schedules.pop(level_spec.id, None)
515 if schedule:
516 if interval is None:
517 schedule = None
518 else:
20effc67
TL
519 try:
520 schedule.remove(Interval.from_string(interval),
521 StartTime.from_string(start_time))
522 finally:
523 if schedule:
524 self.schedules[level_spec.id] = schedule
9f95a23c
TL
525 if not schedule:
526 del self.level_specs[level_spec.id]
527 self.save(level_spec, schedule)
528
20effc67
TL
529 def find(self,
530 pool_id: str,
531 namespace: str,
532 image_id: Optional[str] = None) -> Optional['Schedule']:
533 levels = [pool_id, namespace]
9f95a23c
TL
534 if image_id:
535 levels.append(image_id)
20effc67
TL
536 nr_levels = len(levels)
537 while nr_levels >= 0:
538 # an empty spec id implies global schedule
539 level_spec_id = "/".join(levels[:nr_levels])
540 found = self.schedules.get(level_spec_id)
541 if found is not None:
542 return found
543 nr_levels -= 1
9f95a23c
TL
544 return None
545
20effc67 546 def intersects(self, level_spec: LevelSpec) -> bool:
9f95a23c
TL
547 for ls in self.level_specs.values():
548 if ls.intersects(level_spec):
549 return True
550 return False
551
20effc67 552 def to_list(self, level_spec: LevelSpec) -> Dict[str, dict]:
9f95a23c 553 if level_spec.id in self.schedules:
20effc67 554 parent: Optional[LevelSpec] = level_spec
9f95a23c
TL
555 else:
556 # try to find existing parent
557 parent = None
558 for level_spec_id in self.schedules:
559 ls = self.level_specs[level_spec_id]
560 if ls == level_spec:
561 parent = ls
562 break
563 if level_spec.is_child_of(ls) and \
564 (not parent or ls.is_child_of(parent)):
565 parent = ls
566 if not parent:
567 # set to non-existing parent so we still could list its children
568 parent = level_spec
569
570 result = {}
571 for level_spec_id, schedule in self.schedules.items():
572 ls = self.level_specs[level_spec_id]
573 if ls == parent or ls == level_spec or ls.is_child_of(level_spec):
574 result[level_spec_id] = {
575 'name' : schedule.name,
576 'schedule' : schedule.to_list(),
577 }
578 return result