]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/schedule.py
6 from datetime
import datetime
, timedelta
, time
7 from dateutil
.parser
import parse
9 from .common
import get_rbd_pools
11 SCHEDULE_INTERVAL
= "interval"
12 SCHEDULE_START_TIME
= "start_time"
17 def __init__(self
, name
, id, pool_id
, namespace
, image_id
=None):
20 self
.pool_id
= pool_id
21 self
.namespace
= namespace
22 self
.image_id
= image_id
24 def __eq__(self
, level_spec
):
25 return self
.id == level_spec
.id
27 def is_child_of(self
, level_spec
):
28 if level_spec
.is_global():
29 return not self
.is_global()
30 if level_spec
.pool_id
!= self
.pool_id
:
32 if level_spec
.namespace
is None:
33 return self
.namespace
is not None
34 if level_spec
.namespace
!= self
.namespace
:
36 if level_spec
.image_id
is None:
37 return self
.image_id
is not None
41 return self
.pool_id
is None
43 def get_pool_id(self
):
46 def matches(self
, pool_id
, namespace
, image_id
=None):
47 if self
.pool_id
and self
.pool_id
!= pool_id
:
49 if self
.namespace
and self
.namespace
!= namespace
:
51 if self
.image_id
and self
.image_id
!= image_id
:
55 def intersects(self
, level_spec
):
56 if self
.pool_id
is None or level_spec
.pool_id
is None:
58 if self
.pool_id
!= level_spec
.pool_id
:
60 if self
.namespace
is None or level_spec
.namespace
is None:
62 if self
.namespace
!= level_spec
.namespace
:
64 if self
.image_id
is None or level_spec
.image_id
is None:
66 if self
.image_id
!= level_spec
.image_id
:
72 return LevelSpec("", "", None, None, None)
75 def from_pool_spec(cls
, pool_id
, pool_name
, namespace
=None):
77 id = "{}".format(pool_id
)
78 name
= "{}/".format(pool_name
)
80 id = "{}/{}".format(pool_id
, namespace
)
81 name
= "{}/{}/".format(pool_name
, namespace
)
82 return LevelSpec(name
, id, str(pool_id
), namespace
, None)
85 def from_name(cls
, handler
, name
, namespace_validator
=None,
86 image_validator
=None, allow_image_level
=True):
88 # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
89 match
= re
.match(r
'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
92 raise ValueError("failed to parse {}".format(name
))
93 if match
.group(3) and not allow_image_level
:
95 "invalid name {}: image level is not allowed".format(name
))
103 pool_name
= match
.group(1)
105 pool_id
= handler
.module
.rados
.pool_lookup(pool_name
)
107 raise ValueError("pool {} does not exist".format(pool_name
))
108 if pool_id
not in get_rbd_pools(handler
.module
):
109 raise ValueError("{} is not an RBD pool".format(pool_name
))
110 pool_id
= str(pool_id
)
112 if match
.group(2) is not None or match
.group(3):
114 with handler
.module
.rados
.open_ioctx(pool_name
) as ioctx
:
115 namespace
= match
.group(2) or ""
117 namespaces
= rbd
.RBD().namespace_list(ioctx
)
118 if namespace
not in namespaces
:
120 "namespace {} does not exist".format(
123 ioctx
.set_namespace(namespace
)
124 if namespace_validator
:
125 namespace_validator(ioctx
)
127 image_name
= match
.group(3)
129 with rbd
.Image(ioctx
, image_name
,
130 read_only
=True) as image
:
131 image_id
= image
.id()
134 image_validator(image
)
135 except rbd
.ImageNotFound
:
136 raise ValueError("image {} does not exist".format(
138 except rbd
.InvalidArgument
:
140 "image {} is not in snapshot mirror mode".format(
143 except rados
.ObjectNotFound
:
144 raise ValueError("pool {} does not exist".format(pool_name
))
146 # normalize possible input name like 'rbd//image'
147 if not namespace
and image_name
:
148 name
= "{}/{}".format(pool_name
, image_name
)
150 return LevelSpec(name
, id, pool_id
, namespace
, image_id
)
153 def from_id(cls
, handler
, id, namespace_validator
=None,
154 image_validator
=None):
156 # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
157 match
= re
.match(r
'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
159 raise ValueError("failed to parse: {}".format(id))
166 pool_id
= match
.group(1)
168 pool_name
= handler
.module
.rados
.pool_reverse_lookup(
170 if pool_name
is None:
171 raise ValueError("pool {} does not exist".format(pool_name
))
172 name
+= pool_name
+ "/"
173 if match
.group(2) is not None or match
.group(3):
174 with handler
.module
.rados
.open_ioctx(pool_name
) as ioctx
:
175 namespace
= match
.group(2) or ""
177 namespaces
= rbd
.RBD().namespace_list(ioctx
)
178 if namespace
not in namespaces
:
180 "namespace {} does not exist".format(
182 name
+= namespace
+ "/"
183 if namespace_validator
:
184 ioctx
.set_namespace(namespace
)
185 elif not match
.group(3):
188 image_id
= match
.group(3)
190 with rbd
.Image(ioctx
, image_id
=image_id
,
191 read_only
=True) as image
:
192 image_name
= image
.get_name()
195 image_validator(image
)
196 except rbd
.ImageNotFound
:
197 raise ValueError("image {} does not exist".format(
199 except rbd
.InvalidArgument
:
201 "image {} is not in snapshot mirror mode".format(
204 except rados
.ObjectNotFound
:
205 raise ValueError("pool {} does not exist".format(pool_id
))
207 return LevelSpec(name
, id, pool_id
, namespace
, image_id
)
212 def __init__(self
, minutes
):
213 self
.minutes
= minutes
215 def __eq__(self
, interval
):
216 return self
.minutes
== interval
.minutes
219 return hash(self
.minutes
)
222 if self
.minutes
% (60 * 24) == 0:
223 interval
= int(self
.minutes
/ (60 * 24))
225 elif self
.minutes
% 60 == 0:
226 interval
= int(self
.minutes
/ 60)
229 interval
= int(self
.minutes
)
232 return "{}{}".format(interval
, units
)
235 def from_string(cls
, interval
):
236 match
= re
.match(r
'^(\d+)(d|h|m)?$', interval
)
238 raise ValueError("Invalid interval ({})".format(interval
))
240 minutes
= int(match
.group(1))
241 if match
.group(2) == 'd':
243 elif match
.group(2) == 'h':
246 return Interval(minutes
)
251 def __init__(self
, hour
, minute
, tzinfo
):
252 self
.time
= time(hour
, minute
, tzinfo
=tzinfo
)
253 self
.minutes
= self
.time
.hour
* 60 + self
.time
.minute
255 self
.minutes
+= int(self
.time
.utcoffset().seconds
/ 60)
257 def __eq__(self
, start_time
):
258 return self
.minutes
== start_time
.minutes
261 return hash(self
.minutes
)
264 return self
.time
.isoformat()
267 def from_string(cls
, start_time
):
272 t
= parse(start_time
).timetz()
273 except ValueError as e
:
274 raise ValueError("Invalid start time {}: {}".format(start_time
, e
))
276 return StartTime(t
.hour
, t
.minute
, tzinfo
=t
.tzinfo
)
281 def __init__(self
, name
):
286 return len(self
.items
)
288 def add(self
, interval
, start_time
=None):
289 self
.items
.add((interval
, start_time
))
291 def remove(self
, interval
, start_time
=None):
292 self
.items
.discard((interval
, start_time
))
294 def next_run(self
, now
):
296 for item
in self
.items
:
297 period
= timedelta(minutes
=item
[0].minutes
)
298 start_time
= datetime(1970, 1, 1)
300 start_time
+= timedelta(minutes
=item
[1].minutes
)
301 time
= start_time
+ \
302 (int((now
- start_time
) / period
) + 1) * period
303 if schedule_time
is None or time
< schedule_time
:
305 return datetime
.strftime(schedule_time
, "%Y-%m-%d %H:%M:00")
308 return [{SCHEDULE_INTERVAL
: i
[0].to_string(),
309 SCHEDULE_START_TIME
: i
[1] and i
[1].to_string() or None}
313 return json
.dumps(self
.to_list(), indent
=4, sort_keys
=True)
316 def from_json(cls
, name
, val
):
318 items
= json
.loads(val
)
319 schedule
= Schedule(name
)
321 interval
= Interval
.from_string(item
[SCHEDULE_INTERVAL
])
322 start_time
= item
[SCHEDULE_START_TIME
] and \
323 StartTime
.from_string(item
[SCHEDULE_START_TIME
]) or None
324 schedule
.add(interval
, start_time
)
326 except json
.JSONDecodeError
as e
:
327 raise ValueError("Invalid JSON ({})".format(str(e
)))
328 except KeyError as e
:
330 "Invalid schedule format (missing key {})".format(str(e
)))
331 except TypeError as e
:
332 raise ValueError("Invalid schedule format ({})".format(str(e
)))
336 def __init__(self
, handler
):
337 self
.handler
= handler
338 self
.level_specs
= {}
342 return len(self
.schedules
)
344 def load(self
, namespace_validator
=None, image_validator
=None):
346 schedule_cfg
= self
.handler
.module
.get_module_option(
347 self
.handler
.MODULE_OPTION_NAME
, '')
349 # Previous versions incorrectly stored the global config in
350 # the localized module option. Check the config is here and fix it.
352 schedule_cfg
= self
.handler
.module
.get_localized_module_option(
353 self
.handler
.MODULE_OPTION_NAME
, '')
355 self
.handler
.module
.set_module_option(
356 self
.handler
.MODULE_OPTION_NAME
, schedule_cfg
)
357 self
.handler
.module
.set_localized_module_option(
358 self
.handler
.MODULE_OPTION_NAME
, None)
362 level_spec
= LevelSpec
.make_global()
363 self
.level_specs
[level_spec
.id] = level_spec
364 schedule
= Schedule
.from_json(level_spec
.name
, schedule_cfg
)
365 self
.schedules
[level_spec
.id] = schedule
367 self
.handler
.log
.error(
368 "Failed to decode configured schedule {}".format(
371 for pool_id
, pool_name
in get_rbd_pools(self
.handler
.module
).items():
373 with self
.handler
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
374 self
.load_from_pool(ioctx
, namespace_validator
,
376 except rados
.Error
as e
:
377 self
.handler
.log
.error(
378 "Failed to load schedules for pool {}: {}".format(
381 def load_from_pool(self
, ioctx
, namespace_validator
, image_validator
):
382 pool_id
= ioctx
.get_pool_id()
383 pool_name
= ioctx
.get_pool_name()
388 with rados
.ReadOpCtx() as read_op
:
389 self
.handler
.log
.info(
390 "load_schedules: {}, start_after={}".format(
391 pool_name
, start_after
))
392 it
, ret
= ioctx
.get_omap_vals(read_op
, start_after
, "", 128)
393 ioctx
.operate_read_op(read_op
, self
.handler
.SCHEDULE_OID
)
399 self
.handler
.log
.info(
400 "load_schedule: {} {}".format(k
, v
))
403 level_spec
= LevelSpec
.from_id(
404 self
.handler
, k
, namespace_validator
,
407 self
.handler
.log
.debug(
408 "Stail schedule key {} in pool".format(
413 self
.level_specs
[level_spec
.id] = level_spec
414 schedule
= Schedule
.from_json(level_spec
.name
, v
)
415 self
.schedules
[level_spec
.id] = schedule
417 self
.handler
.log
.error(
418 "Failed to decode schedule: pool={}, {} {}".format(
423 except StopIteration:
425 except rados
.ObjectNotFound
:
429 with rados
.WriteOpCtx() as write_op
:
430 ioctx
.remove_omap_keys(write_op
, stale_keys
)
431 ioctx
.operate_write_op(write_op
, self
.handler
.SCHEDULE_OID
)
433 def save(self
, level_spec
, schedule
):
434 if level_spec
.is_global():
435 schedule_cfg
= schedule
and schedule
.to_json() or None
436 self
.handler
.module
.set_module_option(
437 self
.handler
.MODULE_OPTION_NAME
, schedule_cfg
)
440 pool_id
= level_spec
.get_pool_id()
441 with self
.handler
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
442 with rados
.WriteOpCtx() as write_op
:
444 ioctx
.set_omap(write_op
, (level_spec
.id, ),
445 (schedule
.to_json(), ))
447 ioctx
.remove_omap_keys(write_op
, (level_spec
.id, ))
448 ioctx
.operate_write_op(write_op
, self
.handler
.SCHEDULE_OID
)
451 def add(self
, level_spec
, interval
, start_time
):
452 schedule
= self
.schedules
.get(level_spec
.id, Schedule(level_spec
.name
))
453 schedule
.add(Interval
.from_string(interval
),
454 StartTime
.from_string(start_time
))
455 self
.schedules
[level_spec
.id] = schedule
456 self
.level_specs
[level_spec
.id] = level_spec
457 self
.save(level_spec
, schedule
)
459 def remove(self
, level_spec
, interval
, start_time
):
460 schedule
= self
.schedules
.pop(level_spec
.id, None)
465 schedule
.remove(Interval
.from_string(interval
),
466 StartTime
.from_string(start_time
))
468 self
.schedules
[level_spec
.id] = schedule
470 del self
.level_specs
[level_spec
.id]
471 self
.save(level_spec
, schedule
)
473 def find(self
, pool_id
, namespace
, image_id
=None):
474 levels
= [None, pool_id
, namespace
]
476 levels
.append(image_id
)
479 level_spec_id
= "/".join(levels
[1:])
480 if level_spec_id
in self
.schedules
:
481 return self
.schedules
[level_spec_id
]
485 def intersects(self
, level_spec
):
486 for ls
in self
.level_specs
.values():
487 if ls
.intersects(level_spec
):
491 def to_list(self
, level_spec
):
492 if level_spec
.id in self
.schedules
:
495 # try to find existing parent
497 for level_spec_id
in self
.schedules
:
498 ls
= self
.level_specs
[level_spec_id
]
502 if level_spec
.is_child_of(ls
) and \
503 (not parent
or ls
.is_child_of(parent
)):
506 # set to non-existing parent so we still could list its children
510 for level_spec_id
, schedule
in self
.schedules
.items():
511 ls
= self
.level_specs
[level_spec_id
]
512 if ls
== parent
or ls
== level_spec
or ls
.is_child_of(level_spec
):
513 result
[level_spec_id
] = {
514 'name' : schedule
.name
,
515 'schedule' : schedule
.to_list(),