]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/mirror_snapshot_schedule.py
479a967ef923df308f0d94bd5d18f2fc153af18f
[ceph.git] / ceph / src / pybind / mgr / rbd_support / mirror_snapshot_schedule.py
1 import errno
2 import json
3 import rados
4 import rbd
5 import re
6 import traceback
7
8 from datetime import datetime
9 from threading import Condition, Lock, Thread
10
11 from .common import get_rbd_pools
12 from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
13
14 def namespace_validator(ioctx):
15 mode = rbd.RBD().mirror_mode_get(ioctx)
16 if mode != rbd.RBD_MIRROR_MODE_IMAGE:
17 raise ValueError("namespace {} is not in mirror image mode".format(
18 ioctx.get_namespace()))
19
20 def image_validator(image):
21 mode = image.mirror_image_get_mode()
22 if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
23 raise rbd.InvalidArgument("Invalid mirror image mode")
24
25 class MirrorSnapshotScheduleHandler:
26 MODULE_OPTION_NAME = "mirror_snapshot_schedule"
27 SCHEDULE_OID = "rbd_mirror_snapshot_schedule"
28
29 lock = Lock()
30 condition = Condition(lock)
31 thread = None
32
33 def __init__(self, module):
34 self.module = module
35 self.log = module.log
36 self.last_refresh_images = datetime(1970, 1, 1)
37
38 self.init_schedule_queue()
39
40 self.thread = Thread(target=self.run)
41 self.thread.start()
42
43 def run(self):
44 try:
45 self.log.info("MirrorSnapshotScheduleHandler: starting")
46 while True:
47 self.refresh_images()
48 with self.lock:
49 (image_spec, wait_time) = self.dequeue()
50 if not image_spec:
51 self.condition.wait(min(wait_time, 60))
52 continue
53 pool_id, namespace, image_id = image_spec
54 self.create_snapshot(pool_id, namespace, image_id)
55 with self.lock:
56 self.enqueue(datetime.now(), pool_id, namespace, image_id)
57
58 except Exception as ex:
59 self.log.fatal("Fatal runtime error: {}\n{}".format(
60 ex, traceback.format_exc()))
61
62 def create_snapshot(self, pool_id, namespace, image_id):
63 try:
64 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
65 ioctx.set_namespace(namespace)
66 with rbd.Image(ioctx, image_id=image_id) as image:
67 mode = image.mirror_image_get_mode()
68 if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
69 return
70 info = image.mirror_image_get_info()
71 if info['state'] != rbd.RBD_MIRROR_IMAGE_ENABLED or \
72 not info['primary']:
73 return
74 snap_id = image.mirror_image_create_snapshot()
75 self.log.debug(
76 "create_snapshot: {}/{}/{}: snap_id={}".format(
77 ioctx.get_pool_name(), namespace, image.get_name(),
78 snap_id))
79 except Exception as e:
80 self.log.error(
81 "exception when creating snapshot for {}/{}/{}: {}".format(
82 pool_id, namespace, image_id, e))
83
84
85 def init_schedule_queue(self):
86 self.queue = {}
87 self.images = {}
88 self.refresh_images()
89 self.log.debug("scheduler queue is initialized")
90
91 def load_schedules(self):
92 self.log.info("MirrorSnapshotScheduleHandler: load_schedules")
93
94 schedules = Schedules(self)
95 schedules.load(namespace_validator, image_validator)
96 with self.lock:
97 self.schedules = schedules
98
99 def refresh_images(self):
100 if (datetime.now() - self.last_refresh_images).seconds < 60:
101 return
102
103 self.log.debug("MirrorSnapshotScheduleHandler: refresh_images")
104
105 self.load_schedules()
106
107 images = {}
108
109 for pool_id, pool_name in get_rbd_pools(self.module).items():
110 if not self.schedules.intersects(
111 LevelSpec.from_pool_spec(pool_id, pool_name)):
112 continue
113 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
114 self.load_pool_images(ioctx, images)
115
116 with self.lock:
117 self.refresh_queue(images)
118 self.images = images
119
120 self.last_refresh_images = datetime.now()
121
122 def load_pool_images(self, ioctx, images):
123 pool_id = str(ioctx.get_pool_id())
124 pool_name = ioctx.get_pool_name()
125 images[pool_id] = {}
126
127 self.log.debug("load_pool_images: pool={}".format(pool_name))
128
129 try:
130 namespaces = [''] + rbd.RBD().namespace_list(ioctx)
131 for namespace in namespaces:
132 if not self.schedules.intersects(
133 LevelSpec.from_pool_spec(pool_id, pool_name, namespace)):
134 continue
135 self.log.debug("load_pool_images: pool={}, namespace={}".format(
136 pool_name, namespace))
137 images[pool_id][namespace] = {}
138 ioctx.set_namespace(namespace)
139 mirror_images = dict(rbd.RBD().mirror_image_info_list(
140 ioctx, rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT))
141 if not mirror_images:
142 continue
143 image_names = dict(
144 [(x['id'], x['name']) for x in filter(
145 lambda x: x['id'] in mirror_images,
146 rbd.RBD().list2(ioctx))])
147 for image_id in mirror_images:
148 image_name = image_names.get(image_id)
149 if not image_name:
150 continue
151 if namespace:
152 name = "{}/{}/{}".format(pool_name, namespace,
153 image_name)
154 else:
155 name = "{}/{}".format(pool_name, image_name)
156 self.log.debug("Adding image {}".format(name))
157 images[pool_id][namespace][image_id] = name
158 except Exception as e:
159 self.log.error("exception when scanning pool {}: {}".format(
160 pool_name, e))
161 pass
162
163 def rebuild_queue(self):
164 with self.lock:
165 now = datetime.now()
166
167 # don't remove from queue "due" images
168 now_string = datetime.strftime(now, "%Y-%m-%d %H:%M:00")
169
170 for schedule_time in list(self.queue):
171 if schedule_time > now_string:
172 del self.queue[schedule_time]
173
174 if not self.schedules:
175 return
176
177 for pool_id in self.images:
178 for namespace in self.images[pool_id]:
179 for image_id in self.images[pool_id][namespace]:
180 self.enqueue(now, pool_id, namespace, image_id)
181
182 self.condition.notify()
183
184 def refresh_queue(self, current_images):
185 now = datetime.now()
186
187 for pool_id in self.images:
188 for namespace in self.images[pool_id]:
189 for image_id in self.images[pool_id][namespace]:
190 if pool_id not in current_images or \
191 namespace not in current_images[pool_id] or \
192 image_id not in current_images[pool_id][namespace]:
193 self.remove_from_queue(pool_id, namespace, image_id)
194
195 for pool_id in current_images:
196 for namespace in current_images[pool_id]:
197 for image_id in current_images[pool_id][namespace]:
198 if pool_id not in self.images or \
199 namespace not in self.images[pool_id] or \
200 image_id not in self.images[pool_id][namespace]:
201 self.enqueue(now, pool_id, namespace, image_id)
202
203 self.condition.notify()
204
205 def enqueue(self, now, pool_id, namespace, image_id):
206
207 schedule = self.schedules.find(pool_id, namespace, image_id)
208 if not schedule:
209 return
210
211 schedule_time = schedule.next_run(now)
212 if schedule_time not in self.queue:
213 self.queue[schedule_time] = []
214 self.log.debug("schedule image {}/{}/{} at {}".format(
215 pool_id, namespace, image_id, schedule_time))
216 image_spec = (pool_id, namespace, image_id)
217 if image_spec not in self.queue[schedule_time]:
218 self.queue[schedule_time].append((pool_id, namespace, image_id))
219
220 def dequeue(self):
221 if not self.queue:
222 return None, 1000
223
224 now = datetime.now()
225 schedule_time = sorted(self.queue)[0]
226
227 if datetime.strftime(now, "%Y-%m-%d %H:%M:%S") < schedule_time:
228 wait_time = (datetime.strptime(schedule_time,
229 "%Y-%m-%d %H:%M:%S") - now)
230 return None, wait_time.total_seconds()
231
232 images = self.queue[schedule_time]
233 image = images.pop(0)
234 if not images:
235 del self.queue[schedule_time]
236 return image, 0
237
238 def remove_from_queue(self, pool_id, namespace, image_id):
239 empty_slots = []
240 for schedule_time, images in self.queue.items():
241 if (pool_id, namespace, image_id) in images:
242 images.remove((pool_id, namespace, image_id))
243 if not images:
244 empty_slots.append(schedule_time)
245 for schedule_time in empty_slots:
246 del self.queue[schedule_time]
247
248 def add_schedule(self, level_spec, interval, start_time):
249 self.log.debug(
250 "add_schedule: level_spec={}, interval={}, start_time={}".format(
251 level_spec.name, interval, start_time))
252
253 with self.lock:
254 self.schedules.add(level_spec, interval, start_time)
255
256 # TODO: optimize to rebuild only affected part of the queue
257 self.rebuild_queue()
258 return 0, "", ""
259
260 def remove_schedule(self, level_spec, interval, start_time):
261 self.log.debug(
262 "remove_schedule: level_spec={}, interval={}, start_time={}".format(
263 level_spec.name, interval, start_time))
264
265 with self.lock:
266 self.schedules.remove(level_spec, interval, start_time)
267
268 # TODO: optimize to rebuild only affected part of the queue
269 self.rebuild_queue()
270 return 0, "", ""
271
272 def list(self, level_spec):
273 self.log.debug("list: level_spec={}".format(level_spec.name))
274
275 with self.lock:
276 result = self.schedules.to_list(level_spec)
277
278 return 0, json.dumps(result, indent=4, sort_keys=True), ""
279
280 def status(self, level_spec):
281 self.log.debug("status: level_spec={}".format(level_spec.name))
282
283 scheduled_images = []
284 with self.lock:
285 for schedule_time in sorted(self.queue):
286 for pool_id, namespace, image_id in self.queue[schedule_time]:
287 if not level_spec.matches(pool_id, namespace, image_id):
288 continue
289 image_name = self.images[pool_id][namespace][image_id]
290 scheduled_images.append({
291 'schedule_time' : schedule_time,
292 'image' : image_name
293 })
294 return 0, json.dumps({'scheduled_images' : scheduled_images},
295 indent=4, sort_keys=True), ""
296
297 def handle_command(self, inbuf, prefix, cmd):
298 level_spec_name = cmd.get('level_spec', "")
299
300 try:
301 level_spec = LevelSpec.from_name(self, level_spec_name,
302 namespace_validator,
303 image_validator)
304 except ValueError as e:
305 return -errno.EINVAL, '', "Invalid level spec {}: {}".format(
306 level_spec_name, e)
307
308 if prefix == 'add':
309 return self.add_schedule(level_spec, cmd['interval'],
310 cmd.get('start_time'))
311 elif prefix == 'remove':
312 return self.remove_schedule(level_spec, cmd.get('interval'),
313 cmd.get('start_time'))
314 elif prefix == 'list':
315 return self.list(level_spec)
316 elif prefix == 'status':
317 return self.status(level_spec)
318
319 raise NotImplementedError(cmd['prefix'])