]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/schedule.py
b8507a1b8fe5b8876ed05d44171ac17a7dc7cf29
7 from dateutil
.parser
import parse
8 from typing
import cast
, Any
, Callable
, Dict
, List
, Optional
, Set
, Tuple
, TYPE_CHECKING
10 from .common
import get_rbd_pools
12 from .module
import Module
14 SCHEDULE_INTERVAL
= "interval"
15 SCHEDULE_START_TIME
= "start_time"
23 pool_id
: Optional
[str],
24 namespace
: Optional
[str],
25 image_id
: Optional
[str] = None) -> None:
28 self
.pool_id
= pool_id
29 self
.namespace
= namespace
30 self
.image_id
= image_id
32 def __eq__(self
, level_spec
: Any
) -> bool:
33 return self
.id == level_spec
.id
35 def is_child_of(self
, level_spec
: 'LevelSpec') -> bool:
36 if level_spec
.is_global():
37 return not self
.is_global()
38 if level_spec
.pool_id
!= self
.pool_id
:
40 if level_spec
.namespace
is None:
41 return self
.namespace
is not None
42 if level_spec
.namespace
!= self
.namespace
:
44 if level_spec
.image_id
is None:
45 return self
.image_id
is not None
48 def is_global(self
) -> bool:
49 return self
.pool_id
is None
51 def get_pool_id(self
) -> Optional
[str]:
57 image_id
: Optional
[str] = None) -> bool:
58 if self
.pool_id
and self
.pool_id
!= pool_id
:
60 if self
.namespace
and self
.namespace
!= namespace
:
62 if self
.image_id
and self
.image_id
!= image_id
:
66 def intersects(self
, level_spec
: 'LevelSpec') -> bool:
67 if self
.pool_id
is None or level_spec
.pool_id
is None:
69 if self
.pool_id
!= level_spec
.pool_id
:
71 if self
.namespace
is None or level_spec
.namespace
is None:
73 if self
.namespace
!= level_spec
.namespace
:
75 if self
.image_id
is None or level_spec
.image_id
is None:
77 if self
.image_id
!= level_spec
.image_id
:
82 def make_global(cls
) -> 'LevelSpec':
83 return LevelSpec("", "", None, None, None)
86 def from_pool_spec(cls
,
89 namespace
: Optional
[str] = None) -> 'LevelSpec':
91 id = "{}".format(pool_id
)
92 name
= "{}/".format(pool_name
)
94 id = "{}/{}".format(pool_id
, namespace
)
95 name
= "{}/{}/".format(pool_name
, namespace
)
96 return LevelSpec(name
, id, str(pool_id
), namespace
, None)
102 namespace_validator
: Optional
[Callable
] = None,
103 image_validator
: Optional
[Callable
] = None,
104 allow_image_level
: bool = True) -> 'LevelSpec':
106 # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
107 match
= re
.match(r
'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
110 raise ValueError("failed to parse {}".format(name
))
111 if match
.group(3) and not allow_image_level
:
113 "invalid name {}: image level is not allowed".format(name
))
121 pool_name
= match
.group(1)
123 pool_id
= module
.rados
.pool_lookup(pool_name
)
125 raise ValueError("pool {} does not exist".format(pool_name
))
126 if pool_id
not in get_rbd_pools(module
):
127 raise ValueError("{} is not an RBD pool".format(pool_name
))
128 pool_id
= str(pool_id
)
130 if match
.group(2) is not None or match
.group(3):
132 with module
.rados
.open_ioctx(pool_name
) as ioctx
:
133 namespace
= match
.group(2) or ""
135 namespaces
= rbd
.RBD().namespace_list(ioctx
)
136 if namespace
not in namespaces
:
138 "namespace {} does not exist".format(
141 ioctx
.set_namespace(namespace
)
142 if namespace_validator
:
143 namespace_validator(ioctx
)
145 image_name
= match
.group(3)
147 with rbd
.Image(ioctx
, image_name
,
148 read_only
=True) as image
:
149 image_id
= image
.id()
152 image_validator(image
)
153 except rbd
.ImageNotFound
:
154 raise ValueError("image {} does not exist".format(
156 except rbd
.InvalidArgument
:
158 "image {} is not in snapshot mirror mode".format(
161 except rados
.ObjectNotFound
:
162 raise ValueError("pool {} does not exist".format(pool_name
))
164 # normalize possible input name like 'rbd//image'
165 if not namespace
and image_name
:
166 name
= "{}/{}".format(pool_name
, image_name
)
168 return LevelSpec(name
, id, pool_id
, namespace
, image_id
)
174 namespace_validator
: Optional
[Callable
] = None,
175 image_validator
: Optional
[Callable
] = None) -> 'LevelSpec':
177 # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
178 match
= re
.match(r
'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
180 raise ValueError("failed to parse: {}".format(id))
187 pool_id
= match
.group(1)
189 pool_name
= handler
.module
.rados
.pool_reverse_lookup(
191 if pool_name
is None:
192 raise ValueError("pool {} does not exist".format(pool_name
))
193 name
+= pool_name
+ "/"
194 if match
.group(2) is not None or match
.group(3):
195 with handler
.module
.rados
.open_ioctx(pool_name
) as ioctx
:
196 namespace
= match
.group(2) or ""
198 namespaces
= rbd
.RBD().namespace_list(ioctx
)
199 if namespace
not in namespaces
:
201 "namespace {} does not exist".format(
203 name
+= namespace
+ "/"
204 if namespace_validator
:
205 ioctx
.set_namespace(namespace
)
206 elif not match
.group(3):
209 image_id
= match
.group(3)
211 with rbd
.Image(ioctx
, image_id
=image_id
,
212 read_only
=True) as image
:
213 image_name
= image
.get_name()
216 image_validator(image
)
217 except rbd
.ImageNotFound
:
218 raise ValueError("image {} does not exist".format(
220 except rbd
.InvalidArgument
:
222 "image {} is not in snapshot mirror mode".format(
225 except rados
.ObjectNotFound
:
226 raise ValueError("pool {} does not exist".format(pool_id
))
228 return LevelSpec(name
, id, pool_id
, namespace
, image_id
)
233 def __init__(self
, minutes
: int) -> None:
234 self
.minutes
= minutes
236 def __eq__(self
, interval
: Any
) -> bool:
237 return self
.minutes
== interval
.minutes
239 def __hash__(self
) -> int:
240 return hash(self
.minutes
)
242 def to_string(self
) -> str:
243 if self
.minutes
% (60 * 24) == 0:
244 interval
= int(self
.minutes
/ (60 * 24))
246 elif self
.minutes
% 60 == 0:
247 interval
= int(self
.minutes
/ 60)
250 interval
= int(self
.minutes
)
253 return "{}{}".format(interval
, units
)
256 def from_string(cls
, interval
: str) -> 'Interval':
257 match
= re
.match(r
'^(\d+)(d|h|m)?$', interval
)
259 raise ValueError("Invalid interval ({})".format(interval
))
261 minutes
= int(match
.group(1))
262 if match
.group(2) == 'd':
264 elif match
.group(2) == 'h':
267 return Interval(minutes
)
275 tzinfo
: Optional
[datetime
.tzinfo
]) -> None:
276 self
.time
= datetime
.time(hour
, minute
, tzinfo
=tzinfo
)
277 self
.minutes
= self
.time
.hour
* 60 + self
.time
.minute
279 utcoffset
= cast(datetime
.timedelta
, self
.time
.utcoffset())
280 self
.minutes
+= int(utcoffset
.seconds
/ 60)
282 def __eq__(self
, start_time
: Any
) -> bool:
283 return self
.minutes
== start_time
.minutes
285 def __hash__(self
) -> int:
286 return hash(self
.minutes
)
288 def to_string(self
) -> str:
289 return self
.time
.isoformat()
292 def from_string(cls
, start_time
: Optional
[str]) -> Optional
['StartTime']:
297 t
= parse(start_time
).timetz()
298 except ValueError as e
:
299 raise ValueError("Invalid start time {}: {}".format(start_time
, e
))
301 return StartTime(t
.hour
, t
.minute
, tzinfo
=t
.tzinfo
)
306 def __init__(self
, name
: str) -> None:
308 self
.items
: Set
[Tuple
[Interval
, Optional
[StartTime
]]] = set()
310 def __len__(self
) -> int:
311 return len(self
.items
)
315 start_time
: Optional
[StartTime
] = None) -> None:
316 self
.items
.add((interval
, start_time
))
320 start_time
: Optional
[StartTime
] = None) -> None:
321 self
.items
.discard((interval
, start_time
))
323 def next_run(self
, now
: datetime
.datetime
) -> str:
325 for interval
, opt_start
in self
.items
:
326 period
= datetime
.timedelta(minutes
=interval
.minutes
)
327 start_time
= datetime
.datetime(1970, 1, 1)
329 start
= cast(StartTime
, opt_start
)
330 start_time
+= datetime
.timedelta(minutes
=start
.minutes
)
331 time
= start_time
+ \
332 (int((now
- start_time
) / period
) + 1) * period
333 if schedule_time
is None or time
< schedule_time
:
335 if schedule_time
is None:
336 raise ValueError('no items is added')
337 return datetime
.datetime
.strftime(schedule_time
, "%Y-%m-%d %H:%M:00")
339 def to_list(self
) -> List
[Dict
[str, Optional
[str]]]:
340 def item_to_dict(interval
: Interval
,
341 start_time
: Optional
[StartTime
]) -> Dict
[str, Optional
[str]]:
343 schedule_start_time
: Optional
[str] = start_time
.to_string()
345 schedule_start_time
= None
346 return {SCHEDULE_INTERVAL
: interval
.to_string(),
347 SCHEDULE_START_TIME
: schedule_start_time
}
348 return [item_to_dict(interval
, start_time
)
349 for interval
, start_time
in self
.items
]
351 def to_json(self
) -> str:
352 return json
.dumps(self
.to_list(), indent
=4, sort_keys
=True)
355 def from_json(cls
, name
: str, val
: str) -> 'Schedule':
357 items
= json
.loads(val
)
358 schedule
= Schedule(name
)
360 interval
= Interval
.from_string(item
[SCHEDULE_INTERVAL
])
361 start_time
= item
[SCHEDULE_START_TIME
] and \
362 StartTime
.from_string(item
[SCHEDULE_START_TIME
]) or None
363 schedule
.add(interval
, start_time
)
365 except json
.JSONDecodeError
as e
:
366 raise ValueError("Invalid JSON ({})".format(str(e
)))
367 except KeyError as e
:
369 "Invalid schedule format (missing key {})".format(str(e
)))
370 except TypeError as e
:
371 raise ValueError("Invalid schedule format ({})".format(str(e
)))
376 def __init__(self
, handler
: Any
) -> None:
377 self
.handler
= handler
378 self
.level_specs
: Dict
[str, LevelSpec
] = {}
379 self
.schedules
: Dict
[str, Schedule
] = {}
381 def __len__(self
) -> int:
382 return len(self
.schedules
)
385 namespace_validator
: Optional
[Callable
] = None,
386 image_validator
: Optional
[Callable
] = None) -> None:
388 schedule_cfg
= self
.handler
.module
.get_module_option(
389 self
.handler
.MODULE_OPTION_NAME
, '')
391 # Previous versions incorrectly stored the global config in
392 # the localized module option. Check the config is here and fix it.
394 schedule_cfg
= self
.handler
.module
.get_localized_module_option(
395 self
.handler
.MODULE_OPTION_NAME
, '')
397 self
.handler
.module
.set_module_option(
398 self
.handler
.MODULE_OPTION_NAME
, schedule_cfg
)
399 self
.handler
.module
.set_localized_module_option(
400 self
.handler
.MODULE_OPTION_NAME
, None)
404 level_spec
= LevelSpec
.make_global()
405 self
.level_specs
[level_spec
.id] = level_spec
406 schedule
= Schedule
.from_json(level_spec
.name
, schedule_cfg
)
407 self
.schedules
[level_spec
.id] = schedule
409 self
.handler
.log
.error(
410 "Failed to decode configured schedule {}".format(
413 for pool_id
, pool_name
in get_rbd_pools(self
.handler
.module
).items():
415 with self
.handler
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
416 self
.load_from_pool(ioctx
, namespace_validator
,
418 except rados
.Error
as e
:
419 self
.handler
.log
.error(
420 "Failed to load schedules for pool {}: {}".format(
423 def load_from_pool(self
,
425 namespace_validator
: Optional
[Callable
],
426 image_validator
: Optional
[Callable
]) -> None:
427 pool_id
= ioctx
.get_pool_id()
428 pool_name
= ioctx
.get_pool_name()
433 with rados
.ReadOpCtx() as read_op
:
434 self
.handler
.log
.info(
435 "load_schedules: {}, start_after={}".format(
436 pool_name
, start_after
))
437 it
, ret
= ioctx
.get_omap_vals(read_op
, start_after
, "", 128)
438 ioctx
.operate_read_op(read_op
, self
.handler
.SCHEDULE_OID
)
444 self
.handler
.log
.info(
445 "load_schedule: {} {}".format(k
, v
))
448 level_spec
= LevelSpec
.from_id(
449 self
.handler
, k
, namespace_validator
,
452 self
.handler
.log
.debug(
453 "Stale schedule key %s in pool %s",
458 self
.level_specs
[level_spec
.id] = level_spec
459 schedule
= Schedule
.from_json(level_spec
.name
, v
)
460 self
.schedules
[level_spec
.id] = schedule
462 self
.handler
.log
.error(
463 "Failed to decode schedule: pool={}, {} {}".format(
468 except StopIteration:
470 except rados
.ObjectNotFound
:
474 with rados
.WriteOpCtx() as write_op
:
475 ioctx
.remove_omap_keys(write_op
, stale_keys
)
476 ioctx
.operate_write_op(write_op
, self
.handler
.SCHEDULE_OID
)
478 def save(self
, level_spec
: LevelSpec
, schedule
: Optional
[Schedule
]) -> None:
479 if level_spec
.is_global():
480 schedule_cfg
= schedule
and schedule
.to_json() or None
481 self
.handler
.module
.set_module_option(
482 self
.handler
.MODULE_OPTION_NAME
, schedule_cfg
)
485 pool_id
= level_spec
.get_pool_id()
487 with self
.handler
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
488 with rados
.WriteOpCtx() as write_op
:
490 ioctx
.set_omap(write_op
, (level_spec
.id, ),
491 (schedule
.to_json(), ))
493 ioctx
.remove_omap_keys(write_op
, (level_spec
.id, ))
494 ioctx
.operate_write_op(write_op
, self
.handler
.SCHEDULE_OID
)
497 level_spec
: LevelSpec
,
499 start_time
: Optional
[str]) -> None:
500 schedule
= self
.schedules
.get(level_spec
.id, Schedule(level_spec
.name
))
501 schedule
.add(Interval
.from_string(interval
),
502 StartTime
.from_string(start_time
))
503 self
.schedules
[level_spec
.id] = schedule
504 self
.level_specs
[level_spec
.id] = level_spec
505 self
.save(level_spec
, schedule
)
508 level_spec
: LevelSpec
,
509 interval
: Optional
[str],
510 start_time
: Optional
[str]) -> None:
511 schedule
= self
.schedules
.pop(level_spec
.id, None)
517 schedule
.remove(Interval
.from_string(interval
),
518 StartTime
.from_string(start_time
))
521 self
.schedules
[level_spec
.id] = schedule
523 del self
.level_specs
[level_spec
.id]
524 self
.save(level_spec
, schedule
)
529 image_id
: Optional
[str] = None) -> Optional
['Schedule']:
530 levels
= [pool_id
, namespace
]
532 levels
.append(image_id
)
533 nr_levels
= len(levels
)
534 while nr_levels
>= 0:
535 # an empty spec id implies global schedule
536 level_spec_id
= "/".join(levels
[:nr_levels
])
537 found
= self
.schedules
.get(level_spec_id
)
538 if found
is not None:
543 def intersects(self
, level_spec
: LevelSpec
) -> bool:
544 for ls
in self
.level_specs
.values():
545 if ls
.intersects(level_spec
):
549 def to_list(self
, level_spec
: LevelSpec
) -> Dict
[str, dict]:
550 if level_spec
.id in self
.schedules
:
551 parent
: Optional
[LevelSpec
] = level_spec
553 # try to find existing parent
555 for level_spec_id
in self
.schedules
:
556 ls
= self
.level_specs
[level_spec_id
]
560 if level_spec
.is_child_of(ls
) and \
561 (not parent
or ls
.is_child_of(parent
)):
564 # set to non-existing parent so we still could list its children
568 for level_spec_id
, schedule
in self
.schedules
.items():
569 ls
= self
.level_specs
[level_spec_id
]
570 if ls
== parent
or ls
== level_spec
or ls
.is_child_of(level_spec
):
571 result
[level_spec_id
] = {
572 'name' : schedule
.name
,
573 'schedule' : schedule
.to_list(),