8 from datetime
import datetime
9 from threading
import Condition
, Lock
, Thread
11 from .common
import get_rbd_pools
12 from .schedule
import LevelSpec
, Interval
, StartTime
, Schedule
, Schedules
14 def namespace_validator(ioctx
):
15 mode
= rbd
.RBD().mirror_mode_get(ioctx
)
16 if mode
!= rbd
.RBD_MIRROR_MODE_IMAGE
:
17 raise ValueError("namespace {} is not in mirror image mode".format(
18 ioctx
.get_namespace()))
20 def image_validator(image
):
21 mode
= image
.mirror_image_get_mode()
22 if mode
!= rbd
.RBD_MIRROR_IMAGE_MODE_SNAPSHOT
:
23 raise rbd
.InvalidArgument("Invalid mirror image mode")
25 class MirrorSnapshotScheduleHandler
:
26 MODULE_OPTION_NAME
= "mirror_snapshot_schedule"
27 SCHEDULE_OID
= "rbd_mirror_snapshot_schedule"
30 condition
= Condition(lock
)
33 def __init__(self
, module
):
36 self
.last_refresh_images
= datetime(1970, 1, 1)
38 self
.init_schedule_queue()
40 self
.thread
= Thread(target
=self
.run
)
45 self
.log
.info("MirrorSnapshotScheduleHandler: starting")
49 (image_spec
, wait_time
) = self
.dequeue()
51 self
.condition
.wait(min(wait_time
, 60))
53 pool_id
, namespace
, image_id
= image_spec
54 self
.create_snapshot(pool_id
, namespace
, image_id
)
56 self
.enqueue(datetime
.now(), pool_id
, namespace
, image_id
)
58 except Exception as ex
:
59 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
60 ex
, traceback
.format_exc()))
62 def create_snapshot(self
, pool_id
, namespace
, image_id
):
64 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
65 ioctx
.set_namespace(namespace
)
66 with rbd
.Image(ioctx
, image_id
=image_id
) as image
:
67 mode
= image
.mirror_image_get_mode()
68 if mode
!= rbd
.RBD_MIRROR_IMAGE_MODE_SNAPSHOT
:
70 info
= image
.mirror_image_get_info()
71 if info
['state'] != rbd
.RBD_MIRROR_IMAGE_ENABLED
or \
74 snap_id
= image
.mirror_image_create_snapshot()
76 "create_snapshot: {}/{}/{}: snap_id={}".format(
77 ioctx
.get_pool_name(), namespace
, image
.get_name(),
79 except Exception as e
:
81 "exception when creating snapshot for {}/{}/{}: {}".format(
82 pool_id
, namespace
, image_id
, e
))
85 def init_schedule_queue(self
):
89 self
.log
.debug("scheduler queue is initialized")
91 def load_schedules(self
):
92 self
.log
.info("MirrorSnapshotScheduleHandler: load_schedules")
94 schedules
= Schedules(self
)
95 schedules
.load(namespace_validator
, image_validator
)
97 self
.schedules
= schedules
99 def refresh_images(self
):
100 if (datetime
.now() - self
.last_refresh_images
).seconds
< 60:
103 self
.log
.debug("MirrorSnapshotScheduleHandler: refresh_images")
105 self
.load_schedules()
109 for pool_id
, pool_name
in get_rbd_pools(self
.module
).items():
110 if not self
.schedules
.intersects(
111 LevelSpec
.from_pool_spec(pool_id
, pool_name
)):
113 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
114 self
.load_pool_images(ioctx
, images
)
117 self
.refresh_queue(images
)
120 self
.last_refresh_images
= datetime
.now()
122 def load_pool_images(self
, ioctx
, images
):
123 pool_id
= str(ioctx
.get_pool_id())
124 pool_name
= ioctx
.get_pool_name()
127 self
.log
.debug("load_pool_images: pool={}".format(pool_name
))
130 namespaces
= [''] + rbd
.RBD().namespace_list(ioctx
)
131 for namespace
in namespaces
:
132 if not self
.schedules
.intersects(
133 LevelSpec
.from_pool_spec(pool_id
, pool_name
, namespace
)):
135 self
.log
.debug("load_pool_images: pool={}, namespace={}".format(
136 pool_name
, namespace
))
137 images
[pool_id
][namespace
] = {}
138 ioctx
.set_namespace(namespace
)
139 mirror_images
= dict(rbd
.RBD().mirror_image_info_list(
140 ioctx
, rbd
.RBD_MIRROR_IMAGE_MODE_SNAPSHOT
))
141 if not mirror_images
:
144 [(x
['id'], x
['name']) for x
in filter(
145 lambda x
: x
['id'] in mirror_images
,
146 rbd
.RBD().list2(ioctx
))])
147 for image_id
in mirror_images
:
148 image_name
= image_names
.get(image_id
)
152 name
= "{}/{}/{}".format(pool_name
, namespace
,
155 name
= "{}/{}".format(pool_name
, image_name
)
156 self
.log
.debug("Adding image {}".format(name
))
157 images
[pool_id
][namespace
][image_id
] = name
158 except Exception as e
:
159 self
.log
.error("exception when scanning pool {}: {}".format(
163 def rebuild_queue(self
):
167 # don't remove from queue "due" images
168 now_string
= datetime
.strftime(now
, "%Y-%m-%d %H:%M:00")
170 for schedule_time
in list(self
.queue
):
171 if schedule_time
> now_string
:
172 del self
.queue
[schedule_time
]
174 if not self
.schedules
:
177 for pool_id
in self
.images
:
178 for namespace
in self
.images
[pool_id
]:
179 for image_id
in self
.images
[pool_id
][namespace
]:
180 self
.enqueue(now
, pool_id
, namespace
, image_id
)
182 self
.condition
.notify()
184 def refresh_queue(self
, current_images
):
187 for pool_id
in self
.images
:
188 for namespace
in self
.images
[pool_id
]:
189 for image_id
in self
.images
[pool_id
][namespace
]:
190 if pool_id
not in current_images
or \
191 namespace
not in current_images
[pool_id
] or \
192 image_id
not in current_images
[pool_id
][namespace
]:
193 self
.remove_from_queue(pool_id
, namespace
, image_id
)
195 for pool_id
in current_images
:
196 for namespace
in current_images
[pool_id
]:
197 for image_id
in current_images
[pool_id
][namespace
]:
198 if pool_id
not in self
.images
or \
199 namespace
not in self
.images
[pool_id
] or \
200 image_id
not in self
.images
[pool_id
][namespace
]:
201 self
.enqueue(now
, pool_id
, namespace
, image_id
)
203 self
.condition
.notify()
205 def enqueue(self
, now
, pool_id
, namespace
, image_id
):
207 schedule
= self
.schedules
.find(pool_id
, namespace
, image_id
)
211 schedule_time
= schedule
.next_run(now
)
212 if schedule_time
not in self
.queue
:
213 self
.queue
[schedule_time
] = []
214 self
.log
.debug("schedule image {}/{}/{} at {}".format(
215 pool_id
, namespace
, image_id
, schedule_time
))
216 image_spec
= (pool_id
, namespace
, image_id
)
217 if image_spec
not in self
.queue
[schedule_time
]:
218 self
.queue
[schedule_time
].append((pool_id
, namespace
, image_id
))
225 schedule_time
= sorted(self
.queue
)[0]
227 if datetime
.strftime(now
, "%Y-%m-%d %H:%M:%S") < schedule_time
:
228 wait_time
= (datetime
.strptime(schedule_time
,
229 "%Y-%m-%d %H:%M:%S") - now
)
230 return None, wait_time
.total_seconds()
232 images
= self
.queue
[schedule_time
]
233 image
= images
.pop(0)
235 del self
.queue
[schedule_time
]
238 def remove_from_queue(self
, pool_id
, namespace
, image_id
):
240 for schedule_time
, images
in self
.queue
.items():
241 if (pool_id
, namespace
, image_id
) in images
:
242 images
.remove((pool_id
, namespace
, image_id
))
244 empty_slots
.append(schedule_time
)
245 for schedule_time
in empty_slots
:
246 del self
.queue
[schedule_time
]
248 def add_schedule(self
, level_spec
, interval
, start_time
):
250 "add_schedule: level_spec={}, interval={}, start_time={}".format(
251 level_spec
.name
, interval
, start_time
))
254 self
.schedules
.add(level_spec
, interval
, start_time
)
256 # TODO: optimize to rebuild only affected part of the queue
260 def remove_schedule(self
, level_spec
, interval
, start_time
):
262 "remove_schedule: level_spec={}, interval={}, start_time={}".format(
263 level_spec
.name
, interval
, start_time
))
266 self
.schedules
.remove(level_spec
, interval
, start_time
)
268 # TODO: optimize to rebuild only affected part of the queue
272 def list(self
, level_spec
):
273 self
.log
.debug("list: level_spec={}".format(level_spec
.name
))
276 result
= self
.schedules
.to_list(level_spec
)
278 return 0, json
.dumps(result
, indent
=4, sort_keys
=True), ""
280 def status(self
, level_spec
):
281 self
.log
.debug("status: level_spec={}".format(level_spec
.name
))
283 scheduled_images
= []
285 for schedule_time
in sorted(self
.queue
):
286 for pool_id
, namespace
, image_id
in self
.queue
[schedule_time
]:
287 if not level_spec
.matches(pool_id
, namespace
, image_id
):
289 image_name
= self
.images
[pool_id
][namespace
][image_id
]
290 scheduled_images
.append({
291 'schedule_time' : schedule_time
,
294 return 0, json
.dumps({'scheduled_images' : scheduled_images
},
295 indent
=4, sort_keys
=True), ""
297 def handle_command(self
, inbuf
, prefix
, cmd
):
298 level_spec_name
= cmd
.get('level_spec', "")
301 level_spec
= LevelSpec
.from_name(self
, level_spec_name
,
304 except ValueError as e
:
305 return -errno
.EINVAL
, '', "Invalid level spec {}: {}".format(
309 return self
.add_schedule(level_spec
, cmd
['interval'],
310 cmd
.get('start_time'))
311 elif prefix
== 'remove':
312 return self
.remove_schedule(level_spec
, cmd
.get('interval'),
313 cmd
.get('start_time'))
314 elif prefix
== 'list':
315 return self
.list(level_spec
)
316 elif prefix
== 'status':
317 return self
.status(level_spec
)
319 raise NotImplementedError(cmd
['prefix'])