]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/schedule.py
7 from dateutil
.parser
import parse
8 from typing
import cast
, Any
, Callable
, Dict
, List
, Optional
, Set
, Tuple
, TYPE_CHECKING
10 from .common
import get_rbd_pools
12 from .module
import Module
14 SCHEDULE_INTERVAL
= "interval"
15 SCHEDULE_START_TIME
= "start_time"
23 pool_id
: Optional
[str],
24 namespace
: Optional
[str],
25 image_id
: Optional
[str] = None) -> None:
28 self
.pool_id
= pool_id
29 self
.namespace
= namespace
30 self
.image_id
= image_id
32 def __eq__(self
, level_spec
: Any
) -> bool:
33 return self
.id == level_spec
.id
35 def is_child_of(self
, level_spec
: 'LevelSpec') -> bool:
36 if level_spec
.is_global():
37 return not self
.is_global()
38 if level_spec
.pool_id
!= self
.pool_id
:
40 if level_spec
.namespace
is None:
41 return self
.namespace
is not None
42 if level_spec
.namespace
!= self
.namespace
:
44 if level_spec
.image_id
is None:
45 return self
.image_id
is not None
48 def is_global(self
) -> bool:
49 return self
.pool_id
is None
51 def get_pool_id(self
) -> Optional
[str]:
57 image_id
: Optional
[str] = None) -> bool:
58 if self
.pool_id
and self
.pool_id
!= pool_id
:
60 if self
.namespace
and self
.namespace
!= namespace
:
62 if self
.image_id
and self
.image_id
!= image_id
:
66 def intersects(self
, level_spec
: 'LevelSpec') -> bool:
67 if self
.pool_id
is None or level_spec
.pool_id
is None:
69 if self
.pool_id
!= level_spec
.pool_id
:
71 if self
.namespace
is None or level_spec
.namespace
is None:
73 if self
.namespace
!= level_spec
.namespace
:
75 if self
.image_id
is None or level_spec
.image_id
is None:
77 if self
.image_id
!= level_spec
.image_id
:
82 def make_global(cls
) -> 'LevelSpec':
83 return LevelSpec("", "", None, None, None)
86 def from_pool_spec(cls
,
89 namespace
: Optional
[str] = None) -> 'LevelSpec':
91 id = "{}".format(pool_id
)
92 name
= "{}/".format(pool_name
)
94 id = "{}/{}".format(pool_id
, namespace
)
95 name
= "{}/{}/".format(pool_name
, namespace
)
96 return LevelSpec(name
, id, str(pool_id
), namespace
, None)
102 namespace_validator
: Optional
[Callable
] = None,
103 image_validator
: Optional
[Callable
] = None,
104 allow_image_level
: bool = True) -> 'LevelSpec':
106 # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
107 match
= re
.match(r
'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
110 raise ValueError("failed to parse {}".format(name
))
111 if match
.group(3) and not allow_image_level
:
113 "invalid name {}: image level is not allowed".format(name
))
121 pool_name
= match
.group(1)
123 pool_id
= module
.rados
.pool_lookup(pool_name
)
125 raise ValueError("pool {} does not exist".format(pool_name
))
126 if pool_id
not in get_rbd_pools(module
):
127 raise ValueError("{} is not an RBD pool".format(pool_name
))
129 if match
.group(2) is not None or match
.group(3):
131 with module
.rados
.open_ioctx(pool_name
) as ioctx
:
132 namespace
= match
.group(2) or ""
134 namespaces
= rbd
.RBD().namespace_list(ioctx
)
135 if namespace
not in namespaces
:
137 "namespace {} does not exist".format(
140 ioctx
.set_namespace(namespace
)
141 if namespace_validator
:
142 namespace_validator(ioctx
)
144 image_name
= match
.group(3)
146 with rbd
.Image(ioctx
, image_name
,
147 read_only
=True) as image
:
148 image_id
= image
.id()
151 image_validator(image
)
152 except rbd
.ImageNotFound
:
153 raise ValueError("image {} does not exist".format(
155 except rbd
.InvalidArgument
:
157 "image {} is not in snapshot mirror mode".format(
160 except rados
.ObjectNotFound
:
161 raise ValueError("pool {} does not exist".format(pool_name
))
163 # normalize possible input name like 'rbd//image'
164 if not namespace
and image_name
:
165 name
= "{}/{}".format(pool_name
, image_name
)
167 return LevelSpec(name
, id, pool_id
, namespace
, image_id
)
173 namespace_validator
: Optional
[Callable
] = None,
174 image_validator
: Optional
[Callable
] = None) -> 'LevelSpec':
176 # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
177 match
= re
.match(r
'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
179 raise ValueError("failed to parse: {}".format(id))
186 pool_id
= match
.group(1)
188 pool_name
= handler
.module
.rados
.pool_reverse_lookup(
190 if pool_name
is None:
191 raise ValueError("pool {} does not exist".format(pool_name
))
192 name
+= pool_name
+ "/"
193 if match
.group(2) is not None or match
.group(3):
194 with handler
.module
.rados
.open_ioctx(pool_name
) as ioctx
:
195 namespace
= match
.group(2) or ""
197 namespaces
= rbd
.RBD().namespace_list(ioctx
)
198 if namespace
not in namespaces
:
200 "namespace {} does not exist".format(
202 name
+= namespace
+ "/"
203 if namespace_validator
:
204 ioctx
.set_namespace(namespace
)
205 elif not match
.group(3):
208 image_id
= match
.group(3)
210 with rbd
.Image(ioctx
, image_id
=image_id
,
211 read_only
=True) as image
:
212 image_name
= image
.get_name()
215 image_validator(image
)
216 except rbd
.ImageNotFound
:
217 raise ValueError("image {} does not exist".format(
219 except rbd
.InvalidArgument
:
221 "image {} is not in snapshot mirror mode".format(
224 except rados
.ObjectNotFound
:
225 raise ValueError("pool {} does not exist".format(pool_id
))
227 return LevelSpec(name
, id, pool_id
, namespace
, image_id
)
232 def __init__(self
, minutes
: int) -> None:
233 self
.minutes
= minutes
235 def __eq__(self
, interval
: Any
) -> bool:
236 return self
.minutes
== interval
.minutes
238 def __hash__(self
) -> int:
239 return hash(self
.minutes
)
241 def to_string(self
) -> str:
242 if self
.minutes
% (60 * 24) == 0:
243 interval
= int(self
.minutes
/ (60 * 24))
245 elif self
.minutes
% 60 == 0:
246 interval
= int(self
.minutes
/ 60)
249 interval
= int(self
.minutes
)
252 return "{}{}".format(interval
, units
)
255 def from_string(cls
, interval
: str) -> 'Interval':
256 match
= re
.match(r
'^(\d+)(d|h|m)?$', interval
)
258 raise ValueError("Invalid interval ({})".format(interval
))
260 minutes
= int(match
.group(1))
261 if match
.group(2) == 'd':
263 elif match
.group(2) == 'h':
266 return Interval(minutes
)
274 tzinfo
: Optional
[datetime
.tzinfo
]) -> None:
275 self
.time
= datetime
.time(hour
, minute
, tzinfo
=tzinfo
)
276 self
.minutes
= self
.time
.hour
* 60 + self
.time
.minute
278 utcoffset
= cast(datetime
.timedelta
, self
.time
.utcoffset())
279 self
.minutes
+= int(utcoffset
.seconds
/ 60)
281 def __eq__(self
, start_time
: Any
) -> bool:
282 return self
.minutes
== start_time
.minutes
284 def __hash__(self
) -> int:
285 return hash(self
.minutes
)
287 def to_string(self
) -> str:
288 return self
.time
.isoformat()
291 def from_string(cls
, start_time
: Optional
[str]) -> Optional
['StartTime']:
296 t
= parse(start_time
).timetz()
297 except ValueError as e
:
298 raise ValueError("Invalid start time {}: {}".format(start_time
, e
))
300 return StartTime(t
.hour
, t
.minute
, tzinfo
=t
.tzinfo
)
305 def __init__(self
, name
: str) -> None:
307 self
.items
: Set
[Tuple
[Interval
, Optional
[StartTime
]]] = set()
309 def __len__(self
) -> int:
310 return len(self
.items
)
314 start_time
: Optional
[StartTime
] = None) -> None:
315 self
.items
.add((interval
, start_time
))
319 start_time
: Optional
[StartTime
] = None) -> None:
320 self
.items
.discard((interval
, start_time
))
322 def next_run(self
, now
: datetime
.datetime
) -> str:
324 for interval
, opt_start
in self
.items
:
325 period
= datetime
.timedelta(minutes
=interval
.minutes
)
326 start_time
= datetime
.datetime(1970, 1, 1)
328 start
= cast(StartTime
, opt_start
)
329 start_time
+= datetime
.timedelta(minutes
=start
.minutes
)
330 time
= start_time
+ \
331 (int((now
- start_time
) / period
) + 1) * period
332 if schedule_time
is None or time
< schedule_time
:
334 if schedule_time
is None:
335 raise ValueError('no items is added')
336 return datetime
.datetime
.strftime(schedule_time
, "%Y-%m-%d %H:%M:00")
338 def to_list(self
) -> List
[Dict
[str, Optional
[str]]]:
339 def item_to_dict(interval
: Interval
,
340 start_time
: Optional
[StartTime
]) -> Dict
[str, Optional
[str]]:
342 schedule_start_time
: Optional
[str] = start_time
.to_string()
344 schedule_start_time
= None
345 return {SCHEDULE_INTERVAL
: interval
.to_string(),
346 SCHEDULE_START_TIME
: schedule_start_time
}
347 return [item_to_dict(interval
, start_time
)
348 for interval
, start_time
in self
.items
]
350 def to_json(self
) -> str:
351 return json
.dumps(self
.to_list(), indent
=4, sort_keys
=True)
354 def from_json(cls
, name
: str, val
: str) -> 'Schedule':
356 items
= json
.loads(val
)
357 schedule
= Schedule(name
)
359 interval
= Interval
.from_string(item
[SCHEDULE_INTERVAL
])
360 start_time
= item
[SCHEDULE_START_TIME
] and \
361 StartTime
.from_string(item
[SCHEDULE_START_TIME
]) or None
362 schedule
.add(interval
, start_time
)
364 except json
.JSONDecodeError
as e
:
365 raise ValueError("Invalid JSON ({})".format(str(e
)))
366 except KeyError as e
:
368 "Invalid schedule format (missing key {})".format(str(e
)))
369 except TypeError as e
:
370 raise ValueError("Invalid schedule format ({})".format(str(e
)))
375 def __init__(self
, handler
: Any
) -> None:
376 self
.handler
= handler
377 self
.level_specs
: Dict
[str, LevelSpec
] = {}
378 self
.schedules
: Dict
[str, Schedule
] = {}
380 def __len__(self
) -> int:
381 return len(self
.schedules
)
384 namespace_validator
: Optional
[Callable
] = None,
385 image_validator
: Optional
[Callable
] = None) -> None:
387 schedule_cfg
= self
.handler
.module
.get_module_option(
388 self
.handler
.MODULE_OPTION_NAME
, '')
390 # Previous versions incorrectly stored the global config in
391 # the localized module option. Check the config is here and fix it.
393 schedule_cfg
= self
.handler
.module
.get_localized_module_option(
394 self
.handler
.MODULE_OPTION_NAME
, '')
396 self
.handler
.module
.set_module_option(
397 self
.handler
.MODULE_OPTION_NAME
, schedule_cfg
)
398 self
.handler
.module
.set_localized_module_option(
399 self
.handler
.MODULE_OPTION_NAME
, None)
403 level_spec
= LevelSpec
.make_global()
404 self
.level_specs
[level_spec
.id] = level_spec
405 schedule
= Schedule
.from_json(level_spec
.name
, schedule_cfg
)
406 self
.schedules
[level_spec
.id] = schedule
408 self
.handler
.log
.error(
409 "Failed to decode configured schedule {}".format(
412 for pool_id
, pool_name
in get_rbd_pools(self
.handler
.module
).items():
414 with self
.handler
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
415 self
.load_from_pool(ioctx
, namespace_validator
,
417 except rados
.Error
as e
:
418 self
.handler
.log
.error(
419 "Failed to load schedules for pool {}: {}".format(
422 def load_from_pool(self
,
424 namespace_validator
: Optional
[Callable
],
425 image_validator
: Optional
[Callable
]) -> None:
426 pool_id
= ioctx
.get_pool_id()
427 pool_name
= ioctx
.get_pool_name()
432 with rados
.ReadOpCtx() as read_op
:
433 self
.handler
.log
.info(
434 "load_schedules: {}, start_after={}".format(
435 pool_name
, start_after
))
436 it
, ret
= ioctx
.get_omap_vals(read_op
, start_after
, "", 128)
437 ioctx
.operate_read_op(read_op
, self
.handler
.SCHEDULE_OID
)
443 self
.handler
.log
.info(
444 "load_schedule: {} {}".format(k
, v
))
447 level_spec
= LevelSpec
.from_id(
448 self
.handler
, k
, namespace_validator
,
451 self
.handler
.log
.debug(
452 "Stale schedule key %s in pool %s",
457 self
.level_specs
[level_spec
.id] = level_spec
458 schedule
= Schedule
.from_json(level_spec
.name
, v
)
459 self
.schedules
[level_spec
.id] = schedule
461 self
.handler
.log
.error(
462 "Failed to decode schedule: pool={}, {} {}".format(
467 except StopIteration:
469 except rados
.ObjectNotFound
:
473 with rados
.WriteOpCtx() as write_op
:
474 ioctx
.remove_omap_keys(write_op
, stale_keys
)
475 ioctx
.operate_write_op(write_op
, self
.handler
.SCHEDULE_OID
)
477 def save(self
, level_spec
: LevelSpec
, schedule
: Optional
[Schedule
]) -> None:
478 if level_spec
.is_global():
479 schedule_cfg
= schedule
and schedule
.to_json() or None
480 self
.handler
.module
.set_module_option(
481 self
.handler
.MODULE_OPTION_NAME
, schedule_cfg
)
484 pool_id
= level_spec
.get_pool_id()
486 with self
.handler
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
487 with rados
.WriteOpCtx() as write_op
:
489 ioctx
.set_omap(write_op
, (level_spec
.id, ),
490 (schedule
.to_json(), ))
492 ioctx
.remove_omap_keys(write_op
, (level_spec
.id, ))
493 ioctx
.operate_write_op(write_op
, self
.handler
.SCHEDULE_OID
)
496 level_spec
: LevelSpec
,
498 start_time
: Optional
[str]) -> None:
499 schedule
= self
.schedules
.get(level_spec
.id, Schedule(level_spec
.name
))
500 schedule
.add(Interval
.from_string(interval
),
501 StartTime
.from_string(start_time
))
502 self
.schedules
[level_spec
.id] = schedule
503 self
.level_specs
[level_spec
.id] = level_spec
504 self
.save(level_spec
, schedule
)
507 level_spec
: LevelSpec
,
508 interval
: Optional
[str],
509 start_time
: Optional
[str]) -> None:
510 schedule
= self
.schedules
.pop(level_spec
.id, None)
516 schedule
.remove(Interval
.from_string(interval
),
517 StartTime
.from_string(start_time
))
520 self
.schedules
[level_spec
.id] = schedule
522 del self
.level_specs
[level_spec
.id]
523 self
.save(level_spec
, schedule
)
528 image_id
: Optional
[str] = None) -> Optional
['Schedule']:
529 levels
= [pool_id
, namespace
]
531 levels
.append(image_id
)
532 nr_levels
= len(levels
)
533 while nr_levels
>= 0:
534 # an empty spec id implies global schedule
535 level_spec_id
= "/".join(levels
[:nr_levels
])
536 found
= self
.schedules
.get(level_spec_id
)
537 if found
is not None:
542 def intersects(self
, level_spec
: LevelSpec
) -> bool:
543 for ls
in self
.level_specs
.values():
544 if ls
.intersects(level_spec
):
548 def to_list(self
, level_spec
: LevelSpec
) -> Dict
[str, dict]:
549 if level_spec
.id in self
.schedules
:
550 parent
: Optional
[LevelSpec
] = level_spec
552 # try to find existing parent
554 for level_spec_id
in self
.schedules
:
555 ls
= self
.level_specs
[level_spec_id
]
559 if level_spec
.is_child_of(ls
) and \
560 (not parent
or ls
.is_child_of(parent
)):
563 # set to non-existing parent so we still could list its children
567 for level_spec_id
, schedule
in self
.schedules
.items():
568 ls
= self
.level_specs
[level_spec_id
]
569 if ls
== parent
or ls
== level_spec
or ls
.is_child_of(level_spec
):
570 result
[level_spec_id
] = {
571 'name' : schedule
.name
,
572 'schedule' : schedule
.to_list(),