]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/schedule.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / rbd_support / schedule.py
1 import datetime
2 import json
3 import rados
4 import rbd
5 import re
6
7 from dateutil.parser import parse
8 from typing import cast, Any, Callable, Dict, List, Optional, Set, Tuple, TYPE_CHECKING
9
10 from .common import get_rbd_pools
11 if TYPE_CHECKING:
12 from .module import Module
13
14 SCHEDULE_INTERVAL = "interval"
15 SCHEDULE_START_TIME = "start_time"
16
17
18 class LevelSpec:
19
20 def __init__(self,
21 name: str,
22 id: str,
23 pool_id: Optional[str],
24 namespace: Optional[str],
25 image_id: Optional[str] = None) -> None:
26 self.name = name
27 self.id = id
28 self.pool_id = pool_id
29 self.namespace = namespace
30 self.image_id = image_id
31
32 def __eq__(self, level_spec: Any) -> bool:
33 return self.id == level_spec.id
34
35 def is_child_of(self, level_spec: 'LevelSpec') -> bool:
36 if level_spec.is_global():
37 return not self.is_global()
38 if level_spec.pool_id != self.pool_id:
39 return False
40 if level_spec.namespace is None:
41 return self.namespace is not None
42 if level_spec.namespace != self.namespace:
43 return False
44 if level_spec.image_id is None:
45 return self.image_id is not None
46 return False
47
48 def is_global(self) -> bool:
49 return self.pool_id is None
50
51 def get_pool_id(self) -> Optional[str]:
52 return self.pool_id
53
54 def matches(self,
55 pool_id: str,
56 namespace: str,
57 image_id: Optional[str] = None) -> bool:
58 if self.pool_id and self.pool_id != pool_id:
59 return False
60 if self.namespace and self.namespace != namespace:
61 return False
62 if self.image_id and self.image_id != image_id:
63 return False
64 return True
65
66 def intersects(self, level_spec: 'LevelSpec') -> bool:
67 if self.pool_id is None or level_spec.pool_id is None:
68 return True
69 if self.pool_id != level_spec.pool_id:
70 return False
71 if self.namespace is None or level_spec.namespace is None:
72 return True
73 if self.namespace != level_spec.namespace:
74 return False
75 if self.image_id is None or level_spec.image_id is None:
76 return True
77 if self.image_id != level_spec.image_id:
78 return False
79 return True
80
81 @classmethod
82 def make_global(cls) -> 'LevelSpec':
83 return LevelSpec("", "", None, None, None)
84
85 @classmethod
86 def from_pool_spec(cls,
87 pool_id: int,
88 pool_name: str,
89 namespace: Optional[str] = None) -> 'LevelSpec':
90 if namespace is None:
91 id = "{}".format(pool_id)
92 name = "{}/".format(pool_name)
93 else:
94 id = "{}/{}".format(pool_id, namespace)
95 name = "{}/{}/".format(pool_name, namespace)
96 return LevelSpec(name, id, str(pool_id), namespace, None)
97
98 @classmethod
99 def from_name(cls,
100 module: 'Module',
101 name: str,
102 namespace_validator: Optional[Callable] = None,
103 image_validator: Optional[Callable] = None,
104 allow_image_level: bool = True) -> 'LevelSpec':
105 # parse names like:
106 # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
107 match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
108 name)
109 if not match:
110 raise ValueError("failed to parse {}".format(name))
111 if match.group(3) and not allow_image_level:
112 raise ValueError(
113 "invalid name {}: image level is not allowed".format(name))
114
115 id = ""
116 pool_id = None
117 namespace = None
118 image_name = None
119 image_id = None
120 if match.group(1):
121 pool_name = match.group(1)
122 try:
123 pool_id = module.rados.pool_lookup(pool_name)
124 if pool_id is None:
125 raise ValueError("pool {} does not exist".format(pool_name))
126 if pool_id not in get_rbd_pools(module):
127 raise ValueError("{} is not an RBD pool".format(pool_name))
128 id += str(pool_id)
129 if match.group(2) is not None or match.group(3):
130 id += "/"
131 with module.rados.open_ioctx(pool_name) as ioctx:
132 namespace = match.group(2) or ""
133 if namespace:
134 namespaces = rbd.RBD().namespace_list(ioctx)
135 if namespace not in namespaces:
136 raise ValueError(
137 "namespace {} does not exist".format(
138 namespace))
139 id += namespace
140 ioctx.set_namespace(namespace)
141 if namespace_validator:
142 namespace_validator(ioctx)
143 if match.group(3):
144 image_name = match.group(3)
145 try:
146 with rbd.Image(ioctx, image_name,
147 read_only=True) as image:
148 image_id = image.id()
149 id += "/" + image_id
150 if image_validator:
151 image_validator(image)
152 except rbd.ImageNotFound:
153 raise ValueError("image {} does not exist".format(
154 image_name))
155 except rbd.InvalidArgument:
156 raise ValueError(
157 "image {} is not in snapshot mirror mode".format(
158 image_name))
159
160 except rados.ObjectNotFound:
161 raise ValueError("pool {} does not exist".format(pool_name))
162
163 # normalize possible input name like 'rbd//image'
164 if not namespace and image_name:
165 name = "{}/{}".format(pool_name, image_name)
166
167 return LevelSpec(name, id, pool_id, namespace, image_id)
168
169 @classmethod
170 def from_id(cls,
171 handler: Any,
172 id: str,
173 namespace_validator: Optional[Callable] = None,
174 image_validator: Optional[Callable] = None) -> 'LevelSpec':
175 # parse ids like:
176 # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
177 match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
178 if not match:
179 raise ValueError("failed to parse: {}".format(id))
180
181 name = ""
182 pool_id = None
183 namespace = None
184 image_id = None
185 if match.group(1):
186 pool_id = match.group(1)
187 try:
188 pool_name = handler.module.rados.pool_reverse_lookup(
189 int(pool_id))
190 if pool_name is None:
191 raise ValueError("pool {} does not exist".format(pool_name))
192 name += pool_name + "/"
193 if match.group(2) is not None or match.group(3):
194 with handler.module.rados.open_ioctx(pool_name) as ioctx:
195 namespace = match.group(2) or ""
196 if namespace:
197 namespaces = rbd.RBD().namespace_list(ioctx)
198 if namespace not in namespaces:
199 raise ValueError(
200 "namespace {} does not exist".format(
201 namespace))
202 name += namespace + "/"
203 if namespace_validator:
204 ioctx.set_namespace(namespace)
205 elif not match.group(3):
206 name += "/"
207 if match.group(3):
208 image_id = match.group(3)
209 try:
210 with rbd.Image(ioctx, image_id=image_id,
211 read_only=True) as image:
212 image_name = image.get_name()
213 name += image_name
214 if image_validator:
215 image_validator(image)
216 except rbd.ImageNotFound:
217 raise ValueError("image {} does not exist".format(
218 image_id))
219 except rbd.InvalidArgument:
220 raise ValueError(
221 "image {} is not in snapshot mirror mode".format(
222 image_id))
223
224 except rados.ObjectNotFound:
225 raise ValueError("pool {} does not exist".format(pool_id))
226
227 return LevelSpec(name, id, pool_id, namespace, image_id)
228
229
230 class Interval:
231
232 def __init__(self, minutes: int) -> None:
233 self.minutes = minutes
234
235 def __eq__(self, interval: Any) -> bool:
236 return self.minutes == interval.minutes
237
238 def __hash__(self) -> int:
239 return hash(self.minutes)
240
241 def to_string(self) -> str:
242 if self.minutes % (60 * 24) == 0:
243 interval = int(self.minutes / (60 * 24))
244 units = 'd'
245 elif self.minutes % 60 == 0:
246 interval = int(self.minutes / 60)
247 units = 'h'
248 else:
249 interval = int(self.minutes)
250 units = 'm'
251
252 return "{}{}".format(interval, units)
253
254 @classmethod
255 def from_string(cls, interval: str) -> 'Interval':
256 match = re.match(r'^(\d+)(d|h|m)?$', interval)
257 if not match:
258 raise ValueError("Invalid interval ({})".format(interval))
259
260 minutes = int(match.group(1))
261 if match.group(2) == 'd':
262 minutes *= 60 * 24
263 elif match.group(2) == 'h':
264 minutes *= 60
265
266 return Interval(minutes)
267
268
269 class StartTime:
270
271 def __init__(self,
272 hour: int,
273 minute: int,
274 tzinfo: Optional[datetime.tzinfo]) -> None:
275 self.time = datetime.time(hour, minute, tzinfo=tzinfo)
276 self.minutes = self.time.hour * 60 + self.time.minute
277 if self.time.tzinfo:
278 utcoffset = cast(datetime.timedelta, self.time.utcoffset())
279 self.minutes += int(utcoffset.seconds / 60)
280
281 def __eq__(self, start_time: Any) -> bool:
282 return self.minutes == start_time.minutes
283
284 def __hash__(self) -> int:
285 return hash(self.minutes)
286
287 def to_string(self) -> str:
288 return self.time.isoformat()
289
290 @classmethod
291 def from_string(cls, start_time: Optional[str]) -> Optional['StartTime']:
292 if not start_time:
293 return None
294
295 try:
296 t = parse(start_time).timetz()
297 except ValueError as e:
298 raise ValueError("Invalid start time {}: {}".format(start_time, e))
299
300 return StartTime(t.hour, t.minute, tzinfo=t.tzinfo)
301
302
303 class Schedule:
304
305 def __init__(self, name: str) -> None:
306 self.name = name
307 self.items: Set[Tuple[Interval, Optional[StartTime]]] = set()
308
309 def __len__(self) -> int:
310 return len(self.items)
311
312 def add(self,
313 interval: Interval,
314 start_time: Optional[StartTime] = None) -> None:
315 self.items.add((interval, start_time))
316
317 def remove(self,
318 interval: Interval,
319 start_time: Optional[StartTime] = None) -> None:
320 self.items.discard((interval, start_time))
321
322 def next_run(self, now: datetime.datetime) -> str:
323 schedule_time = None
324 for interval, opt_start in self.items:
325 period = datetime.timedelta(minutes=interval.minutes)
326 start_time = datetime.datetime(1970, 1, 1)
327 if opt_start:
328 start = cast(StartTime, opt_start)
329 start_time += datetime.timedelta(minutes=start.minutes)
330 time = start_time + \
331 (int((now - start_time) / period) + 1) * period
332 if schedule_time is None or time < schedule_time:
333 schedule_time = time
334 if schedule_time is None:
335 raise ValueError('no items is added')
336 return datetime.datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00")
337
338 def to_list(self) -> List[Dict[str, Optional[str]]]:
339 def item_to_dict(interval: Interval,
340 start_time: Optional[StartTime]) -> Dict[str, Optional[str]]:
341 if start_time:
342 schedule_start_time: Optional[str] = start_time.to_string()
343 else:
344 schedule_start_time = None
345 return {SCHEDULE_INTERVAL: interval.to_string(),
346 SCHEDULE_START_TIME: schedule_start_time}
347 return [item_to_dict(interval, start_time)
348 for interval, start_time in self.items]
349
350 def to_json(self) -> str:
351 return json.dumps(self.to_list(), indent=4, sort_keys=True)
352
353 @classmethod
354 def from_json(cls, name: str, val: str) -> 'Schedule':
355 try:
356 items = json.loads(val)
357 schedule = Schedule(name)
358 for item in items:
359 interval = Interval.from_string(item[SCHEDULE_INTERVAL])
360 start_time = item[SCHEDULE_START_TIME] and \
361 StartTime.from_string(item[SCHEDULE_START_TIME]) or None
362 schedule.add(interval, start_time)
363 return schedule
364 except json.JSONDecodeError as e:
365 raise ValueError("Invalid JSON ({})".format(str(e)))
366 except KeyError as e:
367 raise ValueError(
368 "Invalid schedule format (missing key {})".format(str(e)))
369 except TypeError as e:
370 raise ValueError("Invalid schedule format ({})".format(str(e)))
371
372
373 class Schedules:
374
375 def __init__(self, handler: Any) -> None:
376 self.handler = handler
377 self.level_specs: Dict[str, LevelSpec] = {}
378 self.schedules: Dict[str, Schedule] = {}
379
380 def __len__(self) -> int:
381 return len(self.schedules)
382
383 def load(self,
384 namespace_validator: Optional[Callable] = None,
385 image_validator: Optional[Callable] = None) -> None:
386
387 schedule_cfg = self.handler.module.get_module_option(
388 self.handler.MODULE_OPTION_NAME, '')
389
390 # Previous versions incorrectly stored the global config in
391 # the localized module option. Check the config is here and fix it.
392 if not schedule_cfg:
393 schedule_cfg = self.handler.module.get_localized_module_option(
394 self.handler.MODULE_OPTION_NAME, '')
395 if schedule_cfg:
396 self.handler.module.set_module_option(
397 self.handler.MODULE_OPTION_NAME, schedule_cfg)
398 self.handler.module.set_localized_module_option(
399 self.handler.MODULE_OPTION_NAME, None)
400
401 if schedule_cfg:
402 try:
403 level_spec = LevelSpec.make_global()
404 self.level_specs[level_spec.id] = level_spec
405 schedule = Schedule.from_json(level_spec.name, schedule_cfg)
406 self.schedules[level_spec.id] = schedule
407 except ValueError:
408 self.handler.log.error(
409 "Failed to decode configured schedule {}".format(
410 schedule_cfg))
411
412 for pool_id, pool_name in get_rbd_pools(self.handler.module).items():
413 try:
414 with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
415 self.load_from_pool(ioctx, namespace_validator,
416 image_validator)
417 except rados.Error as e:
418 self.handler.log.error(
419 "Failed to load schedules for pool {}: {}".format(
420 pool_name, e))
421
422 def load_from_pool(self,
423 ioctx: rados.Ioctx,
424 namespace_validator: Optional[Callable],
425 image_validator: Optional[Callable]) -> None:
426 pool_id = ioctx.get_pool_id()
427 pool_name = ioctx.get_pool_name()
428 stale_keys = []
429 start_after = ''
430 try:
431 while True:
432 with rados.ReadOpCtx() as read_op:
433 self.handler.log.info(
434 "load_schedules: {}, start_after={}".format(
435 pool_name, start_after))
436 it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
437 ioctx.operate_read_op(read_op, self.handler.SCHEDULE_OID)
438
439 it = list(it)
440 for k, v in it:
441 start_after = k
442 v = v.decode()
443 self.handler.log.info(
444 "load_schedule: {} {}".format(k, v))
445 try:
446 try:
447 level_spec = LevelSpec.from_id(
448 self.handler, k, namespace_validator,
449 image_validator)
450 except ValueError:
451 self.handler.log.debug(
452 "Stale schedule key %s in pool %s",
453 k, pool_name)
454 stale_keys.append(k)
455 continue
456
457 self.level_specs[level_spec.id] = level_spec
458 schedule = Schedule.from_json(level_spec.name, v)
459 self.schedules[level_spec.id] = schedule
460 except ValueError:
461 self.handler.log.error(
462 "Failed to decode schedule: pool={}, {} {}".format(
463 pool_name, k, v))
464 if not it:
465 break
466
467 except StopIteration:
468 pass
469 except rados.ObjectNotFound:
470 pass
471
472 if stale_keys:
473 with rados.WriteOpCtx() as write_op:
474 ioctx.remove_omap_keys(write_op, stale_keys)
475 ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
476
477 def save(self, level_spec: LevelSpec, schedule: Optional[Schedule]) -> None:
478 if level_spec.is_global():
479 schedule_cfg = schedule and schedule.to_json() or None
480 self.handler.module.set_module_option(
481 self.handler.MODULE_OPTION_NAME, schedule_cfg)
482 return
483
484 pool_id = level_spec.get_pool_id()
485 assert pool_id
486 with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
487 with rados.WriteOpCtx() as write_op:
488 if schedule:
489 ioctx.set_omap(write_op, (level_spec.id, ),
490 (schedule.to_json(), ))
491 else:
492 ioctx.remove_omap_keys(write_op, (level_spec.id, ))
493 ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
494
495 def add(self,
496 level_spec: LevelSpec,
497 interval: str,
498 start_time: Optional[str]) -> None:
499 schedule = self.schedules.get(level_spec.id, Schedule(level_spec.name))
500 schedule.add(Interval.from_string(interval),
501 StartTime.from_string(start_time))
502 self.schedules[level_spec.id] = schedule
503 self.level_specs[level_spec.id] = level_spec
504 self.save(level_spec, schedule)
505
506 def remove(self,
507 level_spec: LevelSpec,
508 interval: Optional[str],
509 start_time: Optional[str]) -> None:
510 schedule = self.schedules.pop(level_spec.id, None)
511 if schedule:
512 if interval is None:
513 schedule = None
514 else:
515 try:
516 schedule.remove(Interval.from_string(interval),
517 StartTime.from_string(start_time))
518 finally:
519 if schedule:
520 self.schedules[level_spec.id] = schedule
521 if not schedule:
522 del self.level_specs[level_spec.id]
523 self.save(level_spec, schedule)
524
525 def find(self,
526 pool_id: str,
527 namespace: str,
528 image_id: Optional[str] = None) -> Optional['Schedule']:
529 levels = [pool_id, namespace]
530 if image_id:
531 levels.append(image_id)
532 nr_levels = len(levels)
533 while nr_levels >= 0:
534 # an empty spec id implies global schedule
535 level_spec_id = "/".join(levels[:nr_levels])
536 found = self.schedules.get(level_spec_id)
537 if found is not None:
538 return found
539 nr_levels -= 1
540 return None
541
542 def intersects(self, level_spec: LevelSpec) -> bool:
543 for ls in self.level_specs.values():
544 if ls.intersects(level_spec):
545 return True
546 return False
547
548 def to_list(self, level_spec: LevelSpec) -> Dict[str, dict]:
549 if level_spec.id in self.schedules:
550 parent: Optional[LevelSpec] = level_spec
551 else:
552 # try to find existing parent
553 parent = None
554 for level_spec_id in self.schedules:
555 ls = self.level_specs[level_spec_id]
556 if ls == level_spec:
557 parent = ls
558 break
559 if level_spec.is_child_of(ls) and \
560 (not parent or ls.is_child_of(parent)):
561 parent = ls
562 if not parent:
563 # set to non-existing parent so we still could list its children
564 parent = level_spec
565
566 result = {}
567 for level_spec_id, schedule in self.schedules.items():
568 ls = self.level_specs[level_spec_id]
569 if ls == parent or ls == level_spec or ls.is_child_of(level_spec):
570 result[level_spec_id] = {
571 'name' : schedule.name,
572 'schedule' : schedule.to_list(),
573 }
574 return result