]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/schedule.py
import 15.2.9
[ceph.git] / ceph / src / pybind / mgr / rbd_support / schedule.py
1 import json
2 import rados
3 import rbd
4 import re
5
6 from datetime import datetime, timedelta, time
7 from dateutil.parser import parse
8
9 from .common import get_rbd_pools
10
11 SCHEDULE_INTERVAL = "interval"
12 SCHEDULE_START_TIME = "start_time"
13
14
15 class LevelSpec:
16
17 def __init__(self, name, id, pool_id, namespace, image_id=None):
18 self.name = name
19 self.id = id
20 self.pool_id = pool_id
21 self.namespace = namespace
22 self.image_id = image_id
23
24 def __eq__(self, level_spec):
25 return self.id == level_spec.id
26
27 def is_child_of(self, level_spec):
28 if level_spec.is_global():
29 return not self.is_global()
30 if level_spec.pool_id != self.pool_id:
31 return False
32 if level_spec.namespace is None:
33 return self.namespace is not None
34 if level_spec.namespace != self.namespace:
35 return False
36 if level_spec.image_id is None:
37 return self.image_id is not None
38 return False
39
40 def is_global(self):
41 return self.pool_id is None
42
43 def get_pool_id(self):
44 return self.pool_id
45
46 def matches(self, pool_id, namespace, image_id=None):
47 if self.pool_id and self.pool_id != pool_id:
48 return False
49 if self.namespace and self.namespace != namespace:
50 return False
51 if self.image_id and self.image_id != image_id:
52 return False
53 return True
54
55 def intersects(self, level_spec):
56 if self.pool_id is None or level_spec.pool_id is None:
57 return True
58 if self.pool_id != level_spec.pool_id:
59 return False
60 if self.namespace is None or level_spec.namespace is None:
61 return True
62 if self.namespace != level_spec.namespace:
63 return False
64 if self.image_id is None or level_spec.image_id is None:
65 return True
66 if self.image_id != level_spec.image_id:
67 return False
68 return True
69
70 @classmethod
71 def make_global(cls):
72 return LevelSpec("", "", None, None, None)
73
74 @classmethod
75 def from_pool_spec(cls, pool_id, pool_name, namespace=None):
76 if namespace is None:
77 id = "{}".format(pool_id)
78 name = "{}/".format(pool_name)
79 else:
80 id = "{}/{}".format(pool_id, namespace)
81 name = "{}/{}/".format(pool_name, namespace)
82 return LevelSpec(name, id, str(pool_id), namespace, None)
83
84 @classmethod
85 def from_name(cls, handler, name, namespace_validator=None,
86 image_validator=None, allow_image_level=True):
87 # parse names like:
88 # '', 'rbd/', 'rbd/ns/', 'rbd//image', 'rbd/image', 'rbd/ns/image'
89 match = re.match(r'^(?:([^/]+)/(?:(?:([^/]*)/|)(?:([^/@]+))?)?)?$',
90 name)
91 if not match:
92 raise ValueError("failed to parse {}".format(name))
93 if match.group(3) and not allow_image_level:
94 raise ValueError(
95 "invalid name {}: image level is not allowed".format(name))
96
97 id = ""
98 pool_id = None
99 namespace = None
100 image_name = None
101 image_id = None
102 if match.group(1):
103 pool_name = match.group(1)
104 try:
105 pool_id = handler.module.rados.pool_lookup(pool_name)
106 if pool_id is None:
107 raise ValueError("pool {} does not exist".format(pool_name))
108 if pool_id not in get_rbd_pools(handler.module):
109 raise ValueError("{} is not an RBD pool".format(pool_name))
110 pool_id = str(pool_id)
111 id += pool_id
112 if match.group(2) is not None or match.group(3):
113 id += "/"
114 with handler.module.rados.open_ioctx(pool_name) as ioctx:
115 namespace = match.group(2) or ""
116 if namespace:
117 namespaces = rbd.RBD().namespace_list(ioctx)
118 if namespace not in namespaces:
119 raise ValueError(
120 "namespace {} does not exist".format(
121 namespace))
122 id += namespace
123 ioctx.set_namespace(namespace)
124 if namespace_validator:
125 namespace_validator(ioctx)
126 if match.group(3):
127 image_name = match.group(3)
128 try:
129 with rbd.Image(ioctx, image_name,
130 read_only=True) as image:
131 image_id = image.id()
132 id += "/" + image_id
133 if image_validator:
134 image_validator(image)
135 except rbd.ImageNotFound:
136 raise ValueError("image {} does not exist".format(
137 image_name))
138 except rbd.InvalidArgument:
139 raise ValueError(
140 "image {} is not in snapshot mirror mode".format(
141 image_name))
142
143 except rados.ObjectNotFound:
144 raise ValueError("pool {} does not exist".format(pool_name))
145
146 # normalize possible input name like 'rbd//image'
147 if not namespace and image_name:
148 name = "{}/{}".format(pool_name, image_name)
149
150 return LevelSpec(name, id, pool_id, namespace, image_id)
151
152 @classmethod
153 def from_id(cls, handler, id, namespace_validator=None,
154 image_validator=None):
155 # parse ids like:
156 # '', '123', '123/', '123/ns', '123//image_id', '123/ns/image_id'
157 match = re.match(r'^(?:(\d+)(?:/([^/]*)(?:/([^/@]+))?)?)?$', id)
158 if not match:
159 raise ValueError("failed to parse: {}".format(id))
160
161 name = ""
162 pool_id = None
163 namespace = None
164 image_id = None
165 if match.group(1):
166 pool_id = match.group(1)
167 try:
168 pool_name = handler.module.rados.pool_reverse_lookup(
169 int(pool_id))
170 if pool_name is None:
171 raise ValueError("pool {} does not exist".format(pool_name))
172 name += pool_name + "/"
173 if match.group(2) is not None or match.group(3):
174 with handler.module.rados.open_ioctx(pool_name) as ioctx:
175 namespace = match.group(2) or ""
176 if namespace:
177 namespaces = rbd.RBD().namespace_list(ioctx)
178 if namespace not in namespaces:
179 raise ValueError(
180 "namespace {} does not exist".format(
181 namespace))
182 name += namespace + "/"
183 if namespace_validator:
184 ioctx.set_namespace(namespace)
185 elif not match.group(3):
186 name += "/"
187 if match.group(3):
188 image_id = match.group(3)
189 try:
190 with rbd.Image(ioctx, image_id=image_id,
191 read_only=True) as image:
192 image_name = image.get_name()
193 name += image_name
194 if image_validator:
195 image_validator(image)
196 except rbd.ImageNotFound:
197 raise ValueError("image {} does not exist".format(
198 image_id))
199 except rbd.InvalidArgument:
200 raise ValueError(
201 "image {} is not in snapshot mirror mode".format(
202 image_id))
203
204 except rados.ObjectNotFound:
205 raise ValueError("pool {} does not exist".format(pool_id))
206
207 return LevelSpec(name, id, pool_id, namespace, image_id)
208
209
210 class Interval:
211
212 def __init__(self, minutes):
213 self.minutes = minutes
214
215 def __eq__(self, interval):
216 return self.minutes == interval.minutes
217
218 def __hash__(self):
219 return hash(self.minutes)
220
221 def to_string(self):
222 if self.minutes % (60 * 24) == 0:
223 interval = int(self.minutes / (60 * 24))
224 units = 'd'
225 elif self.minutes % 60 == 0:
226 interval = int(self.minutes / 60)
227 units = 'h'
228 else:
229 interval = int(self.minutes)
230 units = 'm'
231
232 return "{}{}".format(interval, units)
233
234 @classmethod
235 def from_string(cls, interval):
236 match = re.match(r'^(\d+)(d|h|m)?$', interval)
237 if not match:
238 raise ValueError("Invalid interval ({})".format(interval))
239
240 minutes = int(match.group(1))
241 if match.group(2) == 'd':
242 minutes *= 60 * 24
243 elif match.group(2) == 'h':
244 minutes *= 60
245
246 return Interval(minutes)
247
248
249 class StartTime:
250
251 def __init__(self, hour, minute, tzinfo):
252 self.time = time(hour, minute, tzinfo=tzinfo)
253 self.minutes = self.time.hour * 60 + self.time.minute
254 if self.time.tzinfo:
255 self.minutes += int(self.time.utcoffset().seconds / 60)
256
257 def __eq__(self, start_time):
258 return self.minutes == start_time.minutes
259
260 def __hash__(self):
261 return hash(self.minutes)
262
263 def to_string(self):
264 return self.time.isoformat()
265
266 @classmethod
267 def from_string(cls, start_time):
268 if not start_time:
269 return None
270
271 try:
272 t = parse(start_time).timetz()
273 except ValueError as e:
274 raise ValueError("Invalid start time {}: {}".format(start_time, e))
275
276 return StartTime(t.hour, t.minute, tzinfo=t.tzinfo)
277
278
279 class Schedule:
280
281 def __init__(self, name):
282 self.name = name
283 self.items = set()
284
285 def __len__(self):
286 return len(self.items)
287
288 def add(self, interval, start_time=None):
289 self.items.add((interval, start_time))
290
291 def remove(self, interval, start_time=None):
292 self.items.discard((interval, start_time))
293
294 def next_run(self, now):
295 schedule_time = None
296 for item in self.items:
297 period = timedelta(minutes=item[0].minutes)
298 start_time = datetime(1970, 1, 1)
299 if item[1]:
300 start_time += timedelta(minutes=item[1].minutes)
301 time = start_time + \
302 (int((now - start_time) / period) + 1) * period
303 if schedule_time is None or time < schedule_time:
304 schedule_time = time
305 return datetime.strftime(schedule_time, "%Y-%m-%d %H:%M:00")
306
307 def to_list(self):
308 return [{SCHEDULE_INTERVAL: i[0].to_string(),
309 SCHEDULE_START_TIME: i[1] and i[1].to_string() or None}
310 for i in self.items]
311
312 def to_json(self):
313 return json.dumps(self.to_list(), indent=4, sort_keys=True)
314
315 @classmethod
316 def from_json(cls, name, val):
317 try:
318 items = json.loads(val)
319 schedule = Schedule(name)
320 for item in items:
321 interval = Interval.from_string(item[SCHEDULE_INTERVAL])
322 start_time = item[SCHEDULE_START_TIME] and \
323 StartTime.from_string(item[SCHEDULE_START_TIME]) or None
324 schedule.add(interval, start_time)
325 return schedule
326 except json.JSONDecodeError as e:
327 raise ValueError("Invalid JSON ({})".format(str(e)))
328 except KeyError as e:
329 raise ValueError(
330 "Invalid schedule format (missing key {})".format(str(e)))
331 except TypeError as e:
332 raise ValueError("Invalid schedule format ({})".format(str(e)))
333
334 class Schedules:
335
336 def __init__(self, handler):
337 self.handler = handler
338 self.level_specs = {}
339 self.schedules = {}
340
341 def __len__(self):
342 return len(self.schedules)
343
344 def load(self, namespace_validator=None, image_validator=None):
345
346 schedule_cfg = self.handler.module.get_module_option(
347 self.handler.MODULE_OPTION_NAME, '')
348
349 # Previous versions incorrectly stored the global config in
350 # the localized module option. Check the config is here and fix it.
351 if not schedule_cfg:
352 schedule_cfg = self.handler.module.get_localized_module_option(
353 self.handler.MODULE_OPTION_NAME, '')
354 if schedule_cfg:
355 self.handler.module.set_module_option(
356 self.handler.MODULE_OPTION_NAME, schedule_cfg)
357 self.handler.module.set_localized_module_option(
358 self.handler.MODULE_OPTION_NAME, None)
359
360 if schedule_cfg:
361 try:
362 level_spec = LevelSpec.make_global()
363 self.level_specs[level_spec.id] = level_spec
364 schedule = Schedule.from_json(level_spec.name, schedule_cfg)
365 self.schedules[level_spec.id] = schedule
366 except ValueError:
367 self.handler.log.error(
368 "Failed to decode configured schedule {}".format(
369 schedule_cfg))
370
371 for pool_id, pool_name in get_rbd_pools(self.handler.module).items():
372 try:
373 with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
374 self.load_from_pool(ioctx, namespace_validator,
375 image_validator)
376 except rados.Error as e:
377 self.handler.log.error(
378 "Failed to load schedules for pool {}: {}".format(
379 pool_name, e))
380
381 def load_from_pool(self, ioctx, namespace_validator, image_validator):
382 pool_id = ioctx.get_pool_id()
383 pool_name = ioctx.get_pool_name()
384 stale_keys = ()
385 start_after = ''
386 try:
387 while True:
388 with rados.ReadOpCtx() as read_op:
389 self.handler.log.info(
390 "load_schedules: {}, start_after={}".format(
391 pool_name, start_after))
392 it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
393 ioctx.operate_read_op(read_op, self.handler.SCHEDULE_OID)
394
395 it = list(it)
396 for k, v in it:
397 start_after = k
398 v = v.decode()
399 self.handler.log.info(
400 "load_schedule: {} {}".format(k, v))
401 try:
402 try:
403 level_spec = LevelSpec.from_id(
404 self.handler, k, namespace_validator,
405 image_validator)
406 except ValueError:
407 self.handler.log.debug(
408 "Stail schedule key {} in pool".format(
409 k, pool_name))
410 stale_keys += (k,)
411 continue
412
413 self.level_specs[level_spec.id] = level_spec
414 schedule = Schedule.from_json(level_spec.name, v)
415 self.schedules[level_spec.id] = schedule
416 except ValueError:
417 self.handler.log.error(
418 "Failed to decode schedule: pool={}, {} {}".format(
419 pool_name, k, v))
420 if not it:
421 break
422
423 except StopIteration:
424 pass
425 except rados.ObjectNotFound:
426 pass
427
428 if stale_keys:
429 with rados.WriteOpCtx() as write_op:
430 ioctx.remove_omap_keys(write_op, stale_keys)
431 ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
432
433 def save(self, level_spec, schedule):
434 if level_spec.is_global():
435 schedule_cfg = schedule and schedule.to_json() or None
436 self.handler.module.set_module_option(
437 self.handler.MODULE_OPTION_NAME, schedule_cfg)
438 return
439
440 pool_id = level_spec.get_pool_id()
441 with self.handler.module.rados.open_ioctx2(int(pool_id)) as ioctx:
442 with rados.WriteOpCtx() as write_op:
443 if schedule:
444 ioctx.set_omap(write_op, (level_spec.id, ),
445 (schedule.to_json(), ))
446 else:
447 ioctx.remove_omap_keys(write_op, (level_spec.id, ))
448 ioctx.operate_write_op(write_op, self.handler.SCHEDULE_OID)
449
450
451 def add(self, level_spec, interval, start_time):
452 schedule = self.schedules.get(level_spec.id, Schedule(level_spec.name))
453 schedule.add(Interval.from_string(interval),
454 StartTime.from_string(start_time))
455 self.schedules[level_spec.id] = schedule
456 self.level_specs[level_spec.id] = level_spec
457 self.save(level_spec, schedule)
458
459 def remove(self, level_spec, interval, start_time):
460 schedule = self.schedules.pop(level_spec.id, None)
461 if schedule:
462 if interval is None:
463 schedule = None
464 else:
465 schedule.remove(Interval.from_string(interval),
466 StartTime.from_string(start_time))
467 if schedule:
468 self.schedules[level_spec.id] = schedule
469 if not schedule:
470 del self.level_specs[level_spec.id]
471 self.save(level_spec, schedule)
472
473 def find(self, pool_id, namespace, image_id=None):
474 levels = [None, pool_id, namespace]
475 if image_id:
476 levels.append(image_id)
477
478 while levels:
479 level_spec_id = "/".join(levels[1:])
480 if level_spec_id in self.schedules:
481 return self.schedules[level_spec_id]
482 del levels[-1]
483 return None
484
485 def intersects(self, level_spec):
486 for ls in self.level_specs.values():
487 if ls.intersects(level_spec):
488 return True
489 return False
490
491 def to_list(self, level_spec):
492 if level_spec.id in self.schedules:
493 parent = level_spec
494 else:
495 # try to find existing parent
496 parent = None
497 for level_spec_id in self.schedules:
498 ls = self.level_specs[level_spec_id]
499 if ls == level_spec:
500 parent = ls
501 break
502 if level_spec.is_child_of(ls) and \
503 (not parent or ls.is_child_of(parent)):
504 parent = ls
505 if not parent:
506 # set to non-existing parent so we still could list its children
507 parent = level_spec
508
509 result = {}
510 for level_spec_id, schedule in self.schedules.items():
511 ls = self.level_specs[level_spec_id]
512 if ls == parent or ls == level_spec or ls.is_child_of(level_spec):
513 result[level_spec_id] = {
514 'name' : schedule.name,
515 'schedule' : schedule.to_list(),
516 }
517 return result
518