]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rbd_support/schedule.py
import quincy 17.2.0
[ceph.git] / ceph / src / pybind / mgr / rbd_support / schedule.py
CommitLineData
20effc67 1import datetime
9f95a23c
TL
2import json
3import rados
4import rbd
5import re
6
9f95a23c 7from dateutil.parser import parse
20effc67 8from typing import cast, Any, Callable, Dict, List, Optional, Set, Tuple, TYPE_CHECKING
9f95a23c
TL
9
10from .common import get_rbd_pools
20effc67
TL
11if TYPE_CHECKING:
12 from .module import Module
9f95a23c
TL
13
14SCHEDULE_INTERVAL = "interval"
15SCHEDULE_START_TIME = "start_time"
16
17
18class LevelSpec:
19
20effc67
TL
20 def __init__(self,
21 name: str,
22 id: str,
23 pool_id: Optional[str],
24 namespace: Optional[str],
25 image_id: Optional[str] = None) -> None:
9f95a23c
TL
26 self.name = name
27 self.id = id
28 self.pool_id = pool_id
29 self.namespace = namespace
30 self.image_id = image_id
31
20effc67 32 def __eq__(self, level_spec: Any) -> bool:
9f95a23c
TL
33 return self.id == level_spec.id
34
20effc67 35 def is_child_of(self, level_spec: 'LevelSpec') -> bool:
9f95a23c
TL
36 if level_spec.is_global():
37 return not self.is_global()
38 if level_spec.pool_id != self.pool_id:
39 return False
40 if level_spec.namespace is None:
41 return self.namespace is not None
42 if level_spec.namespace != self.namespace:
43 return False
44 if level_spec.image_id is None:
45 return self.image_id is not None
46 return False
47
20effc67 48 def is_global(self) -> bool:
9f95a23c
TL
49 return self.pool_id is None
50
20effc67 51 def get_pool_id(self) -> Optional[str]:
9f95a23c
TL
52 return self.pool_id
53
20effc67
TL
54 def matches(self,
55 pool_id: str,
56 namespace: str,
57 image_id: Optional[str] = None) -> bool:
9f95a23c
TL
58 if self.pool_id and self.pool_id != pool_id:
59 return False
60 if self.namespace and self.namespace != namespace:
61 return False
62 if self.image_id and self.image_id != image_id:
63 return False
64 return True
65
20effc67 66 def intersects(self, level_spec: 'LevelSpec') -> bool:
9f95a23c
TL
67 if self.pool_id is None or level_spec.pool_id is None:
68 return True
69 if self.pool_id != level_spec.pool_id:
70 return False
71 if self.namespace is None or level_spec.namespace is None:
72 return True
73 if self.namespace != level_spec.namespace:
74 return False
75 if self.image_id is None or level_spec.image_id is None:
76 return True
77 if self.image_id != level_spec.image_id:
78 return False
79 return True
80
81 @classmethod
20effc67 82 def make_global(cls) -> 'LevelSpec':
9f95a23c
TL
83 return LevelSpec("", "", None, None, None)
84
85 @classmethod
20effc67
TL
86 def from_pool_spec(cls,
87 pool_id: int,
88 pool_name: str,
89 namespace: Optional[str] = None) -> 'LevelSpec':
9f95a23c
TL
90 if namespace is None:
91 id = "{}".format(pool_id)
92 name = "{}/".format(pool_name)
93 else:
94 id = "{}/{}".format(pool_id, namespace)
95 name = "{}/{}/".format(pool_name, namespace)
96 return LevelSpec(name, id, str(pool_id), namespace, None)
97
98 @classmethod
20effc67
TL
99 def from_name(cls,
100 module: 'Module',
101 name: str,
102 namespace_validator: Optional[Callable] = None,
103 image_validator: Optional[Callable] = None,
104 allow_image_level: bool = True) -> 'LevelSpec':
9f95a23c
TL
105 # parse names like:
106 # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
107 match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
108 name)
109 if not match:
110 raise ValueError("failed to parse {}".format(name))
111 if match.group(3) and not allow_image_level:
112 raise ValueError(
113 "invalid name {}: image level is not allowed".format(name))
114
115 id = ""
116 pool_id = None
117 namespace = None
118 image_name = None
119 image_id = None
120 if match.group(1):
121 pool_name = match.group(1)
122 try:
20effc67 123 pool_id = module.rados.pool_lookup(pool_name)
9f95a23c
TL
124 if pool_id is None:
125 raise ValueError("pool {} does not exist".format(pool_name))
20effc67 126 if pool_id not in get_rbd_pools(module):
9f95a23c 127 raise ValueError("{} is not an RBD pool".format(pool_name))
1d09f67e
TL
128 pool_id = str(pool_id)
129 id += pool_id
9f95a23c
TL
130 if match.group(2) is not None or match.group(3):
131 id += "/"
20effc67 132 with module.rados.open_ioctx(pool_name) as ioctx:
9f95a23c
TL
133 namespace = match.group(2) or ""
134 if namespace:
135 namespaces = rbd.RBD().namespace_list(ioctx)
136 if namespace not in namespaces:
137 raise ValueError(
138 "namespace {} does not exist".format(
139 namespace))
140 id += namespace
141 ioctx.set_namespace(namespace)
142 if namespace_validator:
143 namespace_validator(ioctx)
144 if match.group(3):
145 image_name = match.group(3)
146 try:
147 with rbd.Image(ioctx, image_name,
148 read_only=True) as image:
149 image_id = image.id()
150 id += "/" + image_id
151 if image_validator:
152 image_validator(image)
153 except rbd.ImageNotFound:
154 raise ValueError("image {} does not exist".format(
155 image_name))
156 except rbd.InvalidArgument:
157 raise ValueError(
158 "image {} is not in snapshot mirror mode".format(
159 image_name))
160
161 except rados.ObjectNotFound:
162 raise ValueError("pool {} does not exist".format(pool_name))
163
164 # normalize possible input name like 'rbd//image'
165 if not namespace and image_name:
166 name = "{}/{}".format(pool_name, image_name)
167
168 return LevelSpec(name, id, pool_id, namespace, image_id)
169
170 @classmethod
20effc67
TL
171 def from_id(cls,
172 handler: Any,
173 id: str,
174 namespace_validator: Optional[Callable] = None,
175 image_validator: Optional[Callable] = None) -> 'LevelSpec':
9f95a23c
TL
176 # parse ids like:
177 # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
178 match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
179 if not match:
180 raise ValueError("failed to parse: {}".format(id))
181
182 name = ""
183 pool_id = None
184 namespace = None
185 image_id = None
186 if match.group(1):
187 pool_id = match.group(1)
188 try:
189 pool_name = handler.module.rados.pool_reverse_lookup(
190 int(pool_id))
191 if pool_name is None:
192 raise ValueError("pool {} does not exist".format(pool_name))
193 name += pool_name + "/"
194 if match.group(2) is not None or match.group(3):
195 with handler.module.rados.open_ioctx(pool_name) as ioctx:
196 namespace = match.group(2) or ""
197 if namespace:
198 namespaces = rbd.RBD().namespace_list(ioctx)
199 if namespace not in namespaces:
200 raise ValueError(
201 "namespace {} does not exist".format(
202 namespace))
203 name += namespace + "/"
204 if namespace_validator:
205 ioctx.set_namespace(namespace)
206 elif not match.group(3):
207 name += "/"
208 if match.group(3):
209 image_id = match.group(3)
210 try:
211 with rbd.Image(ioctx, image_id=image_id,
212 read_only=True) as image:
213 image_name = image.get_name()
214 name += image_name
215 if image_validator:
216 image_validator(image)
217 except rbd.ImageNotFound:
218 raise ValueError("image {} does not exist".format(
219 image_id))
220 except rbd.InvalidArgument:
221 raise ValueError(
222 "image {} is not in snapshot mirror mode".format(
223 image_id))
224
225 except rados.ObjectNotFound:
226 raise ValueError("pool {} does not exist".format(pool_id))
227
228 return LevelSpec(name, id, pool_id, namespace, image_id)
229
230
231class Interval:
232
20effc67 233 def __init__(self, minutes: int) -> None:
9f95a23c
TL
234 self.minutes = minutes
235
20effc67 236 def __eq__(self, interval: Any) -> bool:
9f95a23c
TL
237 return self.minutes == interval.minutes
238
20effc67 239 def __hash__(self) -> int:
9f95a23c
TL
240 return hash(self.minutes)
241
20effc67 242 def to_string(self) -> str:
9f95a23c
TL
243 if self.minutes % (60 * 24) == 0:
244 interval = int(self.minutes / (60 * 24))
245 units = 'd'
246 elif self.minutes % 60 == 0:
247 interval = int(self.minutes / 60)
248 units = 'h'
249 else:
250 interval = int(self.minutes)
251 units = 'm'
252
253 return "{}{}".format(interval, units)
254
255 @classmethod
20effc67 256 def from_string(cls, interval: str) -> 'Interval':
9f95a23c
TL
257 match = re.match(r'^(\d+)(d|h|m)?$', interval)
258 if not match:
259 raise ValueError("Invalid interval ({})".format(interval))
260
261 minutes = int(match.group(1))
262 if match.group(2) == 'd':
263 minutes *= 60 * 24
264 elif match.group(2) == 'h':
265 minutes *= 60
266
267 return Interval(minutes)
268
269
270class StartTime:
271
20effc67
TL
272 def __init__(self,
273 hour: int,
274 minute: int,
275 tzinfo: Optional[datetime.tzinfo]) -> None:
276 self.time = datetime.time(hour, minute, tzinfo=tzinfo)
9f95a23c
TL
277 self.minutes = self.time.hour * 60 + self.time.minute
278 if self.time.tzinfo:
20effc67
TL
279 utcoffset = cast(datetime.timedelta, self.time.utcoffset())
280 self.minutes += int(utcoffset.seconds / 60)
9f95a23c 281
20effc67 282 def __eq__(self, start_time: Any) -> bool:
9f95a23c
TL
283 return self.minutes == start_time.minutes
284
20effc67 285 def __hash__(self) -> int:
9f95a23c
TL
286 return hash(self.minutes)
287
20effc67 288 def to_string(self) -> str:
9f95a23c
TL
289 return self.time.isoformat()
290
291 @classmethod
20effc67 292 def from_string(cls, start_time: Optional[str]) -> Optional['StartTime']:
9f95a23c
TL
293 if not start_time:
294 return None
295
296 try:
297 t = parse(start_time).timetz()
298 except ValueError as e:
299 raise ValueError("Invalid start time {}: {}".format(start_time, e))
300
301 return StartTime(t.hour, t.minute, tzinfo=t.tzinfo)
302
303
304class Schedule:
305
20effc67 306 def __init__(self, name: str) -> None:
9f95a23c 307 self.name = name
20effc67 308 self.items: Set[Tuple[Interval, Optional[StartTime]]] = set()
9f95a23c 309
20effc67 310 def __len__(self) -> int:
9f95a23c
TL
311 return len(self.items)
312
20effc67
TL
313 def add(self,
314 interval: Interval,
315 start_time: Optional[StartTime] = None) -> None:
9f95a23c
TL
316 self.items.add((interval, start_time))
317
20effc67
TL
318 def remove(self,
319 interval: Interval,
320 start_time: Optional[StartTime] = None) -> None:
9f95a23c
TL
321 self.items.discard((interval, start_time))
322
20effc67 323 def next_run(self, now: datetime.datetime) -> str:
9f95a23c 324 schedule_time = None
20effc67
TL
325 for interval, opt_start in self.items:
326 period = datetime.timedelta(minutes=interval.minutes)
327 start_time = datetime.datetime(1970, 1, 1)
328 if opt_start:
329 start = cast(StartTime, opt_start)
330 start_time += datetime.timedelta(minutes=start.minutes)
9f95a23c
TL
331 time = start_time + \
332 (int((now - start_time) / period) + 1) * period
333 if schedule_time is None or time < schedule_time:
334 schedule_time = time
20effc67
TL
335 if schedule_time is None:
336 raise ValueError('no items is added')
337 return datetime.datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00")
338
339 def to_list(self) -> List[Dict[str, Optional[str]]]:
340 def item_to_dict(interval: Interval,
341 start_time: Optional[StartTime]) -> Dict[str, Optional[str]]:
342 if start_time:
343 schedule_start_time: Optional[str] = start_time.to_string()
344 else:
345 schedule_start_time = None
346 return {SCHEDULE_INTERVAL: interval.to_string(),
347 SCHEDULE_START_TIME: schedule_start_time}
348 return [item_to_dict(interval, start_time)
349 for interval, start_time in self.items]
9f95a23c 350
20effc67 351 def to_json(self) -> str:
9f95a23c
TL
352 return json.dumps(self.to_list(), indent=4, sort_keys=True)
353
354 @classmethod
20effc67 355 def from_json(cls, name: str, val: str) -> 'Schedule':
9f95a23c
TL
356 try:
357 items = json.loads(val)
358 schedule = Schedule(name)
359 for item in items:
360 interval = Interval.from_string(item[SCHEDULE_INTERVAL])
361 start_time = item[SCHEDULE_START_TIME] and \
362 StartTime.from_string(item[SCHEDULE_START_TIME]) or None
363 schedule.add(interval, start_time)
364 return schedule
365 except json.JSONDecodeError as e:
366 raise ValueError("Invalid JSON ({})".format(str(e)))
367 except KeyError as e:
368 raise ValueError(
369 "Invalid schedule format (missing key {})".format(str(e)))
370 except TypeError as e:
371 raise ValueError("Invalid schedule format ({})".format(str(e)))
372
20effc67 373
9f95a23c
TL
374class Schedules:
375
20effc67 376 def __init__(self, handler: Any) -> None:
9f95a23c 377 self.handler = handler
20effc67
TL
378 self.level_specs: Dict[str, LevelSpec] = {}
379 self.schedules: Dict[str, Schedule] = {}
9f95a23c 380
20effc67 381 def __len__(self) -> int:
9f95a23c
TL
382 return len(self.schedules)
383
20effc67
TL
384 def load(self,
385 namespace_validator: Optional[Callable] = None,
386 image_validator: Optional[Callable] = None) -> None:
9f95a23c 387
adb31ebb 388 schedule_cfg = self.handler.module.get_module_option(
9f95a23c 389 self.handler.MODULE_OPTION_NAME, '')
adb31ebb
TL
390
391 # Previous versions incorrectly stored the global config in
392 # the localized module option. Check the config is here and fix it.
393 if not schedule_cfg:
394 schedule_cfg = self.handler.module.get_localized_module_option(
395 self.handler.MODULE_OPTION_NAME, '')
396 if schedule_cfg:
397 self.handler.module.set_module_option(
398 self.handler.MODULE_OPTION_NAME, schedule_cfg)
399 self.handler.module.set_localized_module_option(
400 self.handler.MODULE_OPTION_NAME, None)
401
9f95a23c
TL
402 if schedule_cfg:
403 try:
404 level_spec = LevelSpec.make_global()
405 self.level_specs[level_spec.id] = level_spec
406 schedule = Schedule.from_json(level_spec.name, schedule_cfg)
407 self.schedules[level_spec.id] = schedule
408 except ValueError:
409 self.handler.log.error(
410 "Failed to decode configured schedule {}".format(
411 schedule_cfg))
412
413 for pool_id, pool_name in get_rbd_pools(self.handler.module).items():
414 try:
415 with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
416 self.load_from_pool(ioctx, namespace_validator,
417 image_validator)
418 except rados.Error as e:
419 self.handler.log.error(
420 "Failed to load schedules for pool {}: {}".format(
421 pool_name, e))
422
20effc67
TL
423 def load_from_pool(self,
424 ioctx: rados.Ioctx,
425 namespace_validator: Optional[Callable],
426 image_validator: Optional[Callable]) -> None:
9f95a23c
TL
427 pool_id = ioctx.get_pool_id()
428 pool_name = ioctx.get_pool_name()
20effc67 429 stale_keys = []
9f95a23c
TL
430 start_after = ''
431 try:
432 while True:
433 with rados.ReadOpCtx() as read_op:
434 self.handler.log.info(
435 "load_schedules: {}, start_after={}".format(
436 pool_name, start_after))
437 it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
438 ioctx.operate_read_op(read_op, self.handler.SCHEDULE_OID)
439
440 it = list(it)
441 for k, v in it:
442 start_after = k
443 v = v.decode()
444 self.handler.log.info(
445 "load_schedule: {} {}".format(k, v))
446 try:
447 try:
448 level_spec = LevelSpec.from_id(
449 self.handler, k, namespace_validator,
450 image_validator)
451 except ValueError:
452 self.handler.log.debug(
20effc67
TL
453 "Stale schedule key %s in pool %s",
454 k, pool_name)
455 stale_keys.append(k)
9f95a23c
TL
456 continue
457
458 self.level_specs[level_spec.id] = level_spec
459 schedule = Schedule.from_json(level_spec.name, v)
460 self.schedules[level_spec.id] = schedule
461 except ValueError:
462 self.handler.log.error(
463 "Failed to decode schedule: pool={}, {} {}".format(
464 pool_name, k, v))
465 if not it:
466 break
467
468 except StopIteration:
469 pass
470 except rados.ObjectNotFound:
471 pass
472
473 if stale_keys:
474 with rados.WriteOpCtx() as write_op:
475 ioctx.remove_omap_keys(write_op, stale_keys)
476 ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
477
20effc67 478 def save(self, level_spec: LevelSpec, schedule: Optional[Schedule]) -> None:
9f95a23c 479 if level_spec.is_global():
adb31ebb
TL
480 schedule_cfg = schedule and schedule.to_json() or None
481 self.handler.module.set_module_option(
9f95a23c
TL
482 self.handler.MODULE_OPTION_NAME, schedule_cfg)
483 return
484
485 pool_id = level_spec.get_pool_id()
20effc67 486 assert pool_id
9f95a23c
TL
487 with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
488 with rados.WriteOpCtx() as write_op:
489 if schedule:
490 ioctx.set_omap(write_op, (level_spec.id, ),
491 (schedule.to_json(), ))
492 else:
493 ioctx.remove_omap_keys(write_op, (level_spec.id, ))
494 ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
495
20effc67
TL
496 def add(self,
497 level_spec: LevelSpec,
498 interval: str,
499 start_time: Optional[str]) -> None:
9f95a23c
TL
500 schedule = self.schedules.get(level_spec.id, Schedule(level_spec.name))
501 schedule.add(Interval.from_string(interval),
502 StartTime.from_string(start_time))
503 self.schedules[level_spec.id] = schedule
504 self.level_specs[level_spec.id] = level_spec
505 self.save(level_spec, schedule)
506
20effc67
TL
507 def remove(self,
508 level_spec: LevelSpec,
509 interval: Optional[str],
510 start_time: Optional[str]) -> None:
9f95a23c
TL
511 schedule = self.schedules.pop(level_spec.id, None)
512 if schedule:
513 if interval is None:
514 schedule = None
515 else:
20effc67
TL
516 try:
517 schedule.remove(Interval.from_string(interval),
518 StartTime.from_string(start_time))
519 finally:
520 if schedule:
521 self.schedules[level_spec.id] = schedule
9f95a23c
TL
522 if not schedule:
523 del self.level_specs[level_spec.id]
524 self.save(level_spec, schedule)
525
20effc67
TL
526 def find(self,
527 pool_id: str,
528 namespace: str,
529 image_id: Optional[str] = None) -> Optional['Schedule']:
530 levels = [pool_id, namespace]
9f95a23c
TL
531 if image_id:
532 levels.append(image_id)
20effc67
TL
533 nr_levels = len(levels)
534 while nr_levels >= 0:
535 # an empty spec id implies global schedule
536 level_spec_id = "/".join(levels[:nr_levels])
537 found = self.schedules.get(level_spec_id)
538 if found is not None:
539 return found
540 nr_levels -= 1
9f95a23c
TL
541 return None
542
20effc67 543 def intersects(self, level_spec: LevelSpec) -> bool:
9f95a23c
TL
544 for ls in self.level_specs.values():
545 if ls.intersects(level_spec):
546 return True
547 return False
548
20effc67 549 def to_list(self, level_spec: LevelSpec) -> Dict[str, dict]:
9f95a23c 550 if level_spec.id in self.schedules:
20effc67 551 parent: Optional[LevelSpec] = level_spec
9f95a23c
TL
552 else:
553 # try to find existing parent
554 parent = None
555 for level_spec_id in self.schedules:
556 ls = self.level_specs[level_spec_id]
557 if ls == level_spec:
558 parent = ls
559 break
560 if level_spec.is_child_of(ls) and \
561 (not parent or ls.is_child_of(parent)):
562 parent = ls
563 if not parent:
564 # set to non-existing parent so we still could list its children
565 parent = level_spec
566
567 result = {}
568 for level_spec_id, schedule in self.schedules.items():
569 ls = self.level_specs[level_spec_id]
570 if ls == parent or ls == level_spec or ls.is_child_of(level_spec):
571 result[level_spec_id] = {
572 'name' : schedule.name,
573 'schedule' : schedule.to_list(),
574 }
575 return result