]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | import errno |
2 | import json | |
3 | import rados | |
4 | import rbd | |
5 | import re | |
6 | import traceback | |
7 | import uuid | |
8 | ||
9 | from contextlib import contextmanager | |
10 | from datetime import datetime, timedelta | |
11 | from functools import partial, wraps | |
12 | from threading import Condition, Lock, Thread | |
20effc67 | 13 | from typing import cast, Any, Callable, Dict, Iterator, List, Optional, Tuple, TypeVar |
9f95a23c TL |
14 | |
15 | from .common import (authorize_request, extract_pool_key, get_rbd_pools, | |
20effc67 | 16 | is_authorized, GLOBAL_POOL_KEY) |
9f95a23c TL |
17 | |
18 | ||
19 | RBD_TASK_OID = "rbd_task" | |
20 | ||
21 | TASK_SEQUENCE = "sequence" | |
22 | TASK_ID = "id" | |
23 | TASK_REFS = "refs" | |
24 | TASK_MESSAGE = "message" | |
adb31ebb | 25 | TASK_RETRY_ATTEMPTS = "retry_attempts" |
9f95a23c | 26 | TASK_RETRY_TIME = "retry_time" |
adb31ebb | 27 | TASK_RETRY_MESSAGE = "retry_message" |
9f95a23c TL |
28 | TASK_IN_PROGRESS = "in_progress" |
29 | TASK_PROGRESS = "progress" | |
30 | TASK_CANCELED = "canceled" | |
31 | ||
32 | TASK_REF_POOL_NAME = "pool_name" | |
33 | TASK_REF_POOL_NAMESPACE = "pool_namespace" | |
34 | TASK_REF_IMAGE_NAME = "image_name" | |
35 | TASK_REF_IMAGE_ID = "image_id" | |
36 | TASK_REF_ACTION = "action" | |
37 | ||
38 | TASK_REF_ACTION_FLATTEN = "flatten" | |
39 | TASK_REF_ACTION_REMOVE = "remove" | |
40 | TASK_REF_ACTION_TRASH_REMOVE = "trash remove" | |
41 | TASK_REF_ACTION_MIGRATION_EXECUTE = "migrate execute" | |
42 | TASK_REF_ACTION_MIGRATION_COMMIT = "migrate commit" | |
43 | TASK_REF_ACTION_MIGRATION_ABORT = "migrate abort" | |
44 | ||
45 | VALID_TASK_ACTIONS = [TASK_REF_ACTION_FLATTEN, | |
46 | TASK_REF_ACTION_REMOVE, | |
47 | TASK_REF_ACTION_TRASH_REMOVE, | |
48 | TASK_REF_ACTION_MIGRATION_EXECUTE, | |
49 | TASK_REF_ACTION_MIGRATION_COMMIT, | |
50 | TASK_REF_ACTION_MIGRATION_ABORT] | |
51 | ||
52 | TASK_RETRY_INTERVAL = timedelta(seconds=30) | |
adb31ebb | 53 | TASK_MAX_RETRY_INTERVAL = timedelta(seconds=300) |
9f95a23c TL |
54 | MAX_COMPLETED_TASKS = 50 |
55 | ||
56 | ||
20effc67 TL |
57 | T = TypeVar('T') |
58 | FuncT = TypeVar('FuncT', bound=Callable[..., Any]) | |
59 | ||
60 | ||
9f95a23c | 61 | class Throttle: |
20effc67 | 62 | def __init__(self: Any, throttle_period: timedelta) -> None: |
9f95a23c TL |
63 | self.throttle_period = throttle_period |
64 | self.time_of_last_call = datetime.min | |
65 | ||
20effc67 | 66 | def __call__(self: 'Throttle', fn: FuncT) -> FuncT: |
9f95a23c | 67 | @wraps(fn) |
20effc67 | 68 | def wrapper(*args: Any, **kwargs: Any) -> Any: |
9f95a23c TL |
69 | now = datetime.now() |
70 | if self.time_of_last_call + self.throttle_period <= now: | |
71 | self.time_of_last_call = now | |
72 | return fn(*args, **kwargs) | |
20effc67 TL |
73 | return cast(FuncT, wrapper) |
74 | ||
75 | ||
76 | TaskRefsT = Dict[str, str] | |
9f95a23c TL |
77 | |
78 | ||
79 | class Task: | |
20effc67 | 80 | def __init__(self, sequence: int, task_id: str, message: str, refs: TaskRefsT): |
9f95a23c TL |
81 | self.sequence = sequence |
82 | self.task_id = task_id | |
83 | self.message = message | |
84 | self.refs = refs | |
20effc67 | 85 | self.retry_message: Optional[str] = None |
adb31ebb | 86 | self.retry_attempts = 0 |
20effc67 | 87 | self.retry_time: Optional[datetime] = None |
9f95a23c TL |
88 | self.in_progress = False |
89 | self.progress = 0.0 | |
90 | self.canceled = False | |
91 | self.failed = False | |
adb31ebb | 92 | self.progress_posted = False |
9f95a23c | 93 | |
20effc67 | 94 | def __str__(self) -> str: |
9f95a23c TL |
95 | return self.to_json() |
96 | ||
97 | @property | |
20effc67 TL |
98 | def sequence_key(self) -> bytes: |
99 | return "{0:016X}".format(self.sequence).encode() | |
9f95a23c | 100 | |
20effc67 | 101 | def cancel(self) -> None: |
9f95a23c TL |
102 | self.canceled = True |
103 | self.fail("Operation canceled") | |
104 | ||
20effc67 | 105 | def fail(self, message: str) -> None: |
9f95a23c TL |
106 | self.failed = True |
107 | self.failure_message = message | |
108 | ||
20effc67 | 109 | def to_dict(self) -> Dict[str, Any]: |
9f95a23c TL |
110 | d = {TASK_SEQUENCE: self.sequence, |
111 | TASK_ID: self.task_id, | |
112 | TASK_MESSAGE: self.message, | |
113 | TASK_REFS: self.refs | |
114 | } | |
adb31ebb TL |
115 | if self.retry_message: |
116 | d[TASK_RETRY_MESSAGE] = self.retry_message | |
117 | if self.retry_attempts: | |
118 | d[TASK_RETRY_ATTEMPTS] = self.retry_attempts | |
9f95a23c TL |
119 | if self.retry_time: |
120 | d[TASK_RETRY_TIME] = self.retry_time.isoformat() | |
121 | if self.in_progress: | |
122 | d[TASK_IN_PROGRESS] = True | |
123 | d[TASK_PROGRESS] = self.progress | |
124 | if self.canceled: | |
125 | d[TASK_CANCELED] = True | |
126 | return d | |
127 | ||
20effc67 | 128 | def to_json(self) -> str: |
9f95a23c TL |
129 | return str(json.dumps(self.to_dict())) |
130 | ||
131 | @classmethod | |
20effc67 | 132 | def from_json(cls, val: str) -> 'Task': |
9f95a23c TL |
133 | try: |
134 | d = json.loads(val) | |
135 | action = d.get(TASK_REFS, {}).get(TASK_REF_ACTION) | |
136 | if action not in VALID_TASK_ACTIONS: | |
137 | raise ValueError("Invalid task action: {}".format(action)) | |
138 | ||
139 | return Task(d[TASK_SEQUENCE], d[TASK_ID], d[TASK_MESSAGE], d[TASK_REFS]) | |
140 | except json.JSONDecodeError as e: | |
141 | raise ValueError("Invalid JSON ({})".format(str(e))) | |
142 | except KeyError as e: | |
143 | raise ValueError("Invalid task format (missing key {})".format(str(e))) | |
144 | ||
145 | ||
20effc67 TL |
146 | # pool_name, namespace, image_name |
147 | ImageSpecT = Tuple[str, str, str] | |
148 | # pool_name, namespace | |
149 | PoolSpecT = Tuple[str, str] | |
150 | MigrationStatusT = Dict[str, str] | |
151 | ||
1e59de90 | 152 | |
9f95a23c TL |
153 | class TaskHandler: |
154 | lock = Lock() | |
155 | condition = Condition(lock) | |
9f95a23c TL |
156 | |
157 | in_progress_task = None | |
20effc67 TL |
158 | tasks_by_sequence: Dict[int, Task] = dict() |
159 | tasks_by_id: Dict[str, Task] = dict() | |
9f95a23c | 160 | |
20effc67 | 161 | completed_tasks: List[Task] = [] |
9f95a23c TL |
162 | |
163 | sequence = 0 | |
164 | ||
20effc67 | 165 | def __init__(self, module: Any) -> None: |
9f95a23c TL |
166 | self.module = module |
167 | self.log = module.log | |
168 | ||
1e59de90 TL |
169 | self.stop_thread = False |
170 | self.thread = Thread(target=self.run) | |
171 | ||
172 | def setup(self) -> None: | |
9f95a23c TL |
173 | with self.lock: |
174 | self.init_task_queue() | |
9f95a23c TL |
175 | self.thread.start() |
176 | ||
177 | @property | |
20effc67 | 178 | def default_pool_name(self) -> str: |
9f95a23c TL |
179 | return self.module.get_ceph_option("rbd_default_pool") |
180 | ||
20effc67 | 181 | def extract_pool_spec(self, pool_spec: str) -> PoolSpecT: |
9f95a23c TL |
182 | pool_spec = extract_pool_key(pool_spec) |
183 | if pool_spec == GLOBAL_POOL_KEY: | |
184 | pool_spec = (self.default_pool_name, '') | |
20effc67 | 185 | return cast(PoolSpecT, pool_spec) |
9f95a23c | 186 | |
20effc67 | 187 | def extract_image_spec(self, image_spec: str) -> ImageSpecT: |
9f95a23c TL |
188 | match = re.match(r'^(?:([^/]+)/(?:([^/]+)/)?)?([^/@]+)$', |
189 | image_spec or '') | |
190 | if not match: | |
191 | raise ValueError("Invalid image spec: {}".format(image_spec)) | |
192 | return (match.group(1) or self.default_pool_name, match.group(2) or '', | |
193 | match.group(3)) | |
194 | ||
1e59de90 TL |
195 | def shutdown(self) -> None: |
196 | self.log.info("TaskHandler: shutting down") | |
197 | self.stop_thread = True | |
198 | if self.thread.is_alive(): | |
199 | self.log.debug("TaskHandler: joining thread") | |
200 | self.thread.join() | |
201 | self.log.info("TaskHandler: shut down") | |
202 | ||
20effc67 | 203 | def run(self) -> None: |
9f95a23c TL |
204 | try: |
205 | self.log.info("TaskHandler: starting") | |
1e59de90 | 206 | while not self.stop_thread: |
9f95a23c TL |
207 | with self.lock: |
208 | now = datetime.now() | |
209 | for sequence in sorted([sequence for sequence, task | |
210 | in self.tasks_by_sequence.items() | |
211 | if not task.retry_time or task.retry_time <= now]): | |
212 | self.execute_task(sequence) | |
213 | ||
214 | self.condition.wait(5) | |
215 | self.log.debug("TaskHandler: tick") | |
216 | ||
1e59de90 TL |
217 | except (rados.ConnectionShutdown, rbd.ConnectionShutdown): |
218 | self.log.exception("TaskHandler: client blocklisted") | |
219 | self.module.client_blocklisted.set() | |
9f95a23c TL |
220 | except Exception as ex: |
221 | self.log.fatal("Fatal runtime error: {}\n{}".format( | |
222 | ex, traceback.format_exc())) | |
223 | ||
224 | @contextmanager | |
20effc67 | 225 | def open_ioctx(self, spec: PoolSpecT) -> Iterator[rados.Ioctx]: |
9f95a23c TL |
226 | try: |
227 | with self.module.rados.open_ioctx(spec[0]) as ioctx: | |
228 | ioctx.set_namespace(spec[1]) | |
229 | yield ioctx | |
230 | except rados.ObjectNotFound: | |
231 | self.log.error("Failed to locate pool {}".format(spec[0])) | |
232 | raise | |
233 | ||
234 | @classmethod | |
20effc67 | 235 | def format_image_spec(cls, image_spec: ImageSpecT) -> str: |
9f95a23c TL |
236 | image = image_spec[2] |
237 | if image_spec[1]: | |
238 | image = "{}/{}".format(image_spec[1], image) | |
239 | if image_spec[0]: | |
240 | image = "{}/{}".format(image_spec[0], image) | |
241 | return image | |
242 | ||
20effc67 | 243 | def init_task_queue(self) -> None: |
9f95a23c TL |
244 | for pool_id, pool_name in get_rbd_pools(self.module).items(): |
245 | try: | |
246 | with self.module.rados.open_ioctx2(int(pool_id)) as ioctx: | |
247 | self.load_task_queue(ioctx, pool_name) | |
248 | ||
249 | try: | |
250 | namespaces = rbd.RBD().namespace_list(ioctx) | |
251 | except rbd.OperationNotSupported: | |
252 | self.log.debug("Namespaces not supported") | |
253 | continue | |
254 | ||
255 | for namespace in namespaces: | |
256 | ioctx.set_namespace(namespace) | |
257 | self.load_task_queue(ioctx, pool_name) | |
258 | ||
259 | except rados.ObjectNotFound: | |
260 | # pool DNE | |
261 | pass | |
262 | ||
263 | if self.tasks_by_sequence: | |
264 | self.sequence = list(sorted(self.tasks_by_sequence.keys()))[-1] | |
265 | ||
266 | self.log.debug("sequence={}, tasks_by_sequence={}, tasks_by_id={}".format( | |
267 | self.sequence, str(self.tasks_by_sequence), str(self.tasks_by_id))) | |
268 | ||
20effc67 | 269 | def load_task_queue(self, ioctx: rados.Ioctx, pool_name: str) -> None: |
9f95a23c TL |
270 | pool_spec = pool_name |
271 | if ioctx.nspace: | |
272 | pool_spec += "/{}".format(ioctx.nspace) | |
273 | ||
274 | start_after = '' | |
275 | try: | |
276 | while True: | |
277 | with rados.ReadOpCtx() as read_op: | |
278 | self.log.info("load_task_task: {}, start_after={}".format( | |
279 | pool_spec, start_after)) | |
280 | it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128) | |
281 | ioctx.operate_read_op(read_op, RBD_TASK_OID) | |
282 | ||
283 | it = list(it) | |
284 | for k, v in it: | |
285 | start_after = k | |
286 | v = v.decode() | |
287 | self.log.info("load_task_task: task={}".format(v)) | |
288 | ||
289 | try: | |
290 | task = Task.from_json(v) | |
291 | self.append_task(task) | |
292 | except ValueError: | |
293 | self.log.error("Failed to decode task: pool_spec={}, task={}".format(pool_spec, v)) | |
294 | ||
295 | if not it: | |
296 | break | |
297 | ||
298 | except StopIteration: | |
299 | pass | |
300 | except rados.ObjectNotFound: | |
301 | # rbd_task DNE | |
302 | pass | |
303 | ||
20effc67 | 304 | def append_task(self, task: Task) -> None: |
9f95a23c TL |
305 | self.tasks_by_sequence[task.sequence] = task |
306 | self.tasks_by_id[task.task_id] = task | |
307 | ||
20effc67 | 308 | def task_refs_match(self, task_refs: TaskRefsT, refs: TaskRefsT) -> bool: |
9f95a23c TL |
309 | if TASK_REF_IMAGE_ID not in refs and TASK_REF_IMAGE_ID in task_refs: |
310 | task_refs = task_refs.copy() | |
311 | del task_refs[TASK_REF_IMAGE_ID] | |
312 | ||
313 | self.log.debug("task_refs_match: ref1={}, ref2={}".format(task_refs, refs)) | |
314 | return task_refs == refs | |
315 | ||
20effc67 | 316 | def find_task(self, refs: TaskRefsT) -> Optional[Task]: |
9f95a23c TL |
317 | self.log.debug("find_task: refs={}".format(refs)) |
318 | ||
319 | # search for dups and return the original | |
320 | for task_id in reversed(sorted(self.tasks_by_id.keys())): | |
321 | task = self.tasks_by_id[task_id] | |
322 | if self.task_refs_match(task.refs, refs): | |
323 | return task | |
324 | ||
325 | # search for a completed task (message replay) | |
326 | for task in reversed(self.completed_tasks): | |
327 | if self.task_refs_match(task.refs, refs): | |
328 | return task | |
20effc67 TL |
329 | else: |
330 | return None | |
9f95a23c | 331 | |
20effc67 TL |
332 | def add_task(self, |
333 | ioctx: rados.Ioctx, | |
334 | message: str, | |
335 | refs: TaskRefsT) -> str: | |
9f95a23c TL |
336 | self.log.debug("add_task: message={}, refs={}".format(message, refs)) |
337 | ||
338 | # ensure unique uuid across all pools | |
339 | while True: | |
340 | task_id = str(uuid.uuid4()) | |
341 | if task_id not in self.tasks_by_id: | |
342 | break | |
343 | ||
344 | self.sequence += 1 | |
345 | task = Task(self.sequence, task_id, message, refs) | |
346 | ||
347 | # add the task to the rbd_task omap | |
348 | task_json = task.to_json() | |
349 | omap_keys = (task.sequence_key, ) | |
350 | omap_vals = (str.encode(task_json), ) | |
20effc67 TL |
351 | self.log.info("adding task: %s %s", |
352 | omap_keys[0].decode(), | |
353 | omap_vals[0].decode()) | |
9f95a23c TL |
354 | |
355 | with rados.WriteOpCtx() as write_op: | |
356 | ioctx.set_omap(write_op, omap_keys, omap_vals) | |
357 | ioctx.operate_write_op(write_op, RBD_TASK_OID) | |
358 | self.append_task(task) | |
359 | ||
360 | self.condition.notify() | |
361 | return task_json | |
362 | ||
20effc67 | 363 | def remove_task(self, |
39ae355f | 364 | ioctx: Optional[rados.Ioctx], |
20effc67 TL |
365 | task: Task, |
366 | remove_in_memory: bool = True) -> None: | |
9f95a23c | 367 | self.log.info("remove_task: task={}".format(str(task))) |
39ae355f TL |
368 | if ioctx: |
369 | try: | |
370 | with rados.WriteOpCtx() as write_op: | |
371 | omap_keys = (task.sequence_key, ) | |
372 | ioctx.remove_omap_keys(write_op, omap_keys) | |
373 | ioctx.operate_write_op(write_op, RBD_TASK_OID) | |
374 | except rados.ObjectNotFound: | |
375 | pass | |
9f95a23c TL |
376 | |
377 | if remove_in_memory: | |
378 | try: | |
379 | del self.tasks_by_id[task.task_id] | |
380 | del self.tasks_by_sequence[task.sequence] | |
381 | ||
382 | # keep a record of the last N tasks to help avoid command replay | |
383 | # races | |
384 | if not task.failed and not task.canceled: | |
385 | self.log.debug("remove_task: moving to completed tasks") | |
386 | self.completed_tasks.append(task) | |
387 | self.completed_tasks = self.completed_tasks[-MAX_COMPLETED_TASKS:] | |
388 | ||
389 | except KeyError: | |
390 | pass | |
391 | ||
20effc67 | 392 | def execute_task(self, sequence: int) -> None: |
9f95a23c TL |
393 | task = self.tasks_by_sequence[sequence] |
394 | self.log.info("execute_task: task={}".format(str(task))) | |
395 | ||
396 | pool_valid = False | |
397 | try: | |
398 | with self.open_ioctx((task.refs[TASK_REF_POOL_NAME], | |
399 | task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx: | |
400 | pool_valid = True | |
401 | ||
402 | action = task.refs[TASK_REF_ACTION] | |
403 | execute_fn = {TASK_REF_ACTION_FLATTEN: self.execute_flatten, | |
404 | TASK_REF_ACTION_REMOVE: self.execute_remove, | |
405 | TASK_REF_ACTION_TRASH_REMOVE: self.execute_trash_remove, | |
406 | TASK_REF_ACTION_MIGRATION_EXECUTE: self.execute_migration_execute, | |
407 | TASK_REF_ACTION_MIGRATION_COMMIT: self.execute_migration_commit, | |
408 | TASK_REF_ACTION_MIGRATION_ABORT: self.execute_migration_abort | |
409 | }.get(action) | |
410 | if not execute_fn: | |
411 | self.log.error("Invalid task action: {}".format(action)) | |
412 | else: | |
413 | task.in_progress = True | |
414 | self.in_progress_task = task | |
9f95a23c TL |
415 | |
416 | self.lock.release() | |
417 | try: | |
418 | execute_fn(ioctx, task) | |
419 | ||
420 | except rbd.OperationCanceled: | |
421 | self.log.info("Operation canceled: task={}".format( | |
422 | str(task))) | |
423 | ||
424 | finally: | |
425 | self.lock.acquire() | |
426 | ||
427 | task.in_progress = False | |
428 | self.in_progress_task = None | |
429 | ||
430 | self.complete_progress(task) | |
431 | self.remove_task(ioctx, task) | |
432 | ||
433 | except rados.ObjectNotFound as e: | |
434 | self.log.error("execute_task: {}".format(e)) | |
435 | if pool_valid: | |
adb31ebb | 436 | task.retry_message = "{}".format(e) |
9f95a23c TL |
437 | self.update_progress(task, 0) |
438 | else: | |
39ae355f | 439 | # pool DNE -- remove in-memory task |
9f95a23c | 440 | self.complete_progress(task) |
39ae355f | 441 | self.remove_task(None, task) |
9f95a23c | 442 | |
1e59de90 TL |
443 | except (rados.ConnectionShutdown, rbd.ConnectionShutdown): |
444 | raise | |
445 | ||
9f95a23c TL |
446 | except (rados.Error, rbd.Error) as e: |
447 | self.log.error("execute_task: {}".format(e)) | |
adb31ebb | 448 | task.retry_message = "{}".format(e) |
9f95a23c TL |
449 | self.update_progress(task, 0) |
450 | ||
451 | finally: | |
452 | task.in_progress = False | |
adb31ebb TL |
453 | task.retry_attempts += 1 |
454 | task.retry_time = datetime.now() + min( | |
455 | TASK_RETRY_INTERVAL * task.retry_attempts, | |
456 | TASK_MAX_RETRY_INTERVAL) | |
9f95a23c | 457 | |
20effc67 | 458 | def progress_callback(self, task: Task, current: int, total: int) -> int: |
9f95a23c TL |
459 | progress = float(current) / float(total) |
460 | self.log.debug("progress_callback: task={}, progress={}".format( | |
461 | str(task), progress)) | |
462 | ||
463 | # avoid deadlocking when a new command comes in during a progress callback | |
464 | if not self.lock.acquire(False): | |
465 | return 0 | |
466 | ||
467 | try: | |
468 | if not self.in_progress_task or self.in_progress_task.canceled: | |
469 | return -rbd.ECANCELED | |
470 | self.in_progress_task.progress = progress | |
471 | finally: | |
472 | self.lock.release() | |
473 | ||
adb31ebb TL |
474 | if not task.progress_posted: |
475 | # delayed creation of progress event until first callback | |
476 | self.post_progress(task, progress) | |
477 | else: | |
478 | self.throttled_update_progress(task, progress) | |
479 | ||
9f95a23c TL |
480 | return 0 |
481 | ||
20effc67 | 482 | def execute_flatten(self, ioctx: rados.Ioctx, task: Task) -> None: |
9f95a23c TL |
483 | self.log.info("execute_flatten: task={}".format(str(task))) |
484 | ||
485 | try: | |
486 | with rbd.Image(ioctx, task.refs[TASK_REF_IMAGE_NAME]) as image: | |
487 | image.flatten(on_progress=partial(self.progress_callback, task)) | |
488 | except rbd.InvalidArgument: | |
489 | task.fail("Image does not have parent") | |
490 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
491 | except rbd.ImageNotFound: | |
492 | task.fail("Image does not exist") | |
493 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
494 | ||
20effc67 | 495 | def execute_remove(self, ioctx: rados.Ioctx, task: Task) -> None: |
9f95a23c TL |
496 | self.log.info("execute_remove: task={}".format(str(task))) |
497 | ||
498 | try: | |
499 | rbd.RBD().remove(ioctx, task.refs[TASK_REF_IMAGE_NAME], | |
500 | on_progress=partial(self.progress_callback, task)) | |
501 | except rbd.ImageNotFound: | |
502 | task.fail("Image does not exist") | |
503 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
504 | ||
20effc67 | 505 | def execute_trash_remove(self, ioctx: rados.Ioctx, task: Task) -> None: |
9f95a23c TL |
506 | self.log.info("execute_trash_remove: task={}".format(str(task))) |
507 | ||
508 | try: | |
509 | rbd.RBD().trash_remove(ioctx, task.refs[TASK_REF_IMAGE_ID], | |
510 | on_progress=partial(self.progress_callback, task)) | |
511 | except rbd.ImageNotFound: | |
512 | task.fail("Image does not exist") | |
513 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
514 | ||
20effc67 | 515 | def execute_migration_execute(self, ioctx: rados.Ioctx, task: Task) -> None: |
9f95a23c TL |
516 | self.log.info("execute_migration_execute: task={}".format(str(task))) |
517 | ||
518 | try: | |
519 | rbd.RBD().migration_execute(ioctx, task.refs[TASK_REF_IMAGE_NAME], | |
520 | on_progress=partial(self.progress_callback, task)) | |
521 | except rbd.ImageNotFound: | |
522 | task.fail("Image does not exist") | |
523 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
524 | except rbd.InvalidArgument: | |
525 | task.fail("Image is not migrating") | |
526 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
527 | ||
20effc67 | 528 | def execute_migration_commit(self, ioctx: rados.Ioctx, task: Task) -> None: |
9f95a23c TL |
529 | self.log.info("execute_migration_commit: task={}".format(str(task))) |
530 | ||
531 | try: | |
532 | rbd.RBD().migration_commit(ioctx, task.refs[TASK_REF_IMAGE_NAME], | |
533 | on_progress=partial(self.progress_callback, task)) | |
534 | except rbd.ImageNotFound: | |
535 | task.fail("Image does not exist") | |
536 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
537 | except rbd.InvalidArgument: | |
538 | task.fail("Image is not migrating or migration not executed") | |
539 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
540 | ||
20effc67 | 541 | def execute_migration_abort(self, ioctx: rados.Ioctx, task: Task) -> None: |
9f95a23c TL |
542 | self.log.info("execute_migration_abort: task={}".format(str(task))) |
543 | ||
544 | try: | |
545 | rbd.RBD().migration_abort(ioctx, task.refs[TASK_REF_IMAGE_NAME], | |
546 | on_progress=partial(self.progress_callback, task)) | |
547 | except rbd.ImageNotFound: | |
548 | task.fail("Image does not exist") | |
549 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
550 | except rbd.InvalidArgument: | |
551 | task.fail("Image is not migrating") | |
552 | self.log.info("{}: task={}".format(task.failure_message, str(task))) | |
553 | ||
20effc67 | 554 | def complete_progress(self, task: Task) -> None: |
adb31ebb TL |
555 | if not task.progress_posted: |
556 | # ensure progress event exists before we complete/fail it | |
557 | self.post_progress(task, 0) | |
558 | ||
9f95a23c TL |
559 | self.log.debug("complete_progress: task={}".format(str(task))) |
560 | try: | |
561 | if task.failed: | |
562 | self.module.remote("progress", "fail", task.task_id, | |
563 | task.failure_message) | |
564 | else: | |
565 | self.module.remote("progress", "complete", task.task_id) | |
566 | except ImportError: | |
567 | # progress module is disabled | |
568 | pass | |
569 | ||
20effc67 | 570 | def _update_progress(self, task: Task, progress: float) -> None: |
9f95a23c TL |
571 | self.log.debug("update_progress: task={}, progress={}".format(str(task), progress)) |
572 | try: | |
573 | refs = {"origin": "rbd_support"} | |
574 | refs.update(task.refs) | |
575 | ||
576 | self.module.remote("progress", "update", task.task_id, | |
577 | task.message, progress, refs) | |
578 | except ImportError: | |
579 | # progress module is disabled | |
580 | pass | |
581 | ||
20effc67 | 582 | def post_progress(self, task: Task, progress: float) -> None: |
adb31ebb TL |
583 | self._update_progress(task, progress) |
584 | task.progress_posted = True | |
585 | ||
20effc67 | 586 | def update_progress(self, task: Task, progress: float) -> None: |
adb31ebb TL |
587 | if task.progress_posted: |
588 | self._update_progress(task, progress) | |
589 | ||
9f95a23c | 590 | @Throttle(timedelta(seconds=1)) |
20effc67 | 591 | def throttled_update_progress(self, task: Task, progress: float) -> None: |
9f95a23c TL |
592 | self.update_progress(task, progress) |
593 | ||
20effc67 | 594 | def queue_flatten(self, image_spec: str) -> Tuple[int, str, str]: |
9f95a23c TL |
595 | image_spec = self.extract_image_spec(image_spec) |
596 | ||
597 | authorize_request(self.module, image_spec[0], image_spec[1]) | |
598 | self.log.info("queue_flatten: {}".format(image_spec)) | |
599 | ||
600 | refs = {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN, | |
601 | TASK_REF_POOL_NAME: image_spec[0], | |
602 | TASK_REF_POOL_NAMESPACE: image_spec[1], | |
603 | TASK_REF_IMAGE_NAME: image_spec[2]} | |
604 | ||
20effc67 | 605 | with self.open_ioctx(image_spec[:2]) as ioctx: |
9f95a23c TL |
606 | try: |
607 | with rbd.Image(ioctx, image_spec[2]) as image: | |
608 | refs[TASK_REF_IMAGE_ID] = image.id() | |
609 | ||
610 | try: | |
611 | parent_image_id = image.parent_id() | |
612 | except rbd.ImageNotFound: | |
613 | parent_image_id = None | |
614 | ||
615 | except rbd.ImageNotFound: | |
616 | pass | |
617 | ||
618 | task = self.find_task(refs) | |
619 | if task: | |
620 | return 0, task.to_json(), '' | |
621 | ||
622 | if TASK_REF_IMAGE_ID not in refs: | |
623 | raise rbd.ImageNotFound("Image {} does not exist".format( | |
624 | self.format_image_spec(image_spec)), errno=errno.ENOENT) | |
625 | if not parent_image_id: | |
626 | raise rbd.ImageNotFound("Image {} does not have a parent".format( | |
627 | self.format_image_spec(image_spec)), errno=errno.ENOENT) | |
628 | ||
629 | return 0, self.add_task(ioctx, | |
630 | "Flattening image {}".format( | |
631 | self.format_image_spec(image_spec)), | |
632 | refs), "" | |
633 | ||
20effc67 | 634 | def queue_remove(self, image_spec: str) -> Tuple[int, str, str]: |
9f95a23c TL |
635 | image_spec = self.extract_image_spec(image_spec) |
636 | ||
637 | authorize_request(self.module, image_spec[0], image_spec[1]) | |
638 | self.log.info("queue_remove: {}".format(image_spec)) | |
639 | ||
640 | refs = {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE, | |
641 | TASK_REF_POOL_NAME: image_spec[0], | |
642 | TASK_REF_POOL_NAMESPACE: image_spec[1], | |
643 | TASK_REF_IMAGE_NAME: image_spec[2]} | |
644 | ||
20effc67 | 645 | with self.open_ioctx(image_spec[:2]) as ioctx: |
9f95a23c TL |
646 | try: |
647 | with rbd.Image(ioctx, image_spec[2]) as image: | |
648 | refs[TASK_REF_IMAGE_ID] = image.id() | |
649 | snaps = list(image.list_snaps()) | |
650 | ||
651 | except rbd.ImageNotFound: | |
652 | pass | |
653 | ||
654 | task = self.find_task(refs) | |
655 | if task: | |
656 | return 0, task.to_json(), '' | |
657 | ||
658 | if TASK_REF_IMAGE_ID not in refs: | |
659 | raise rbd.ImageNotFound("Image {} does not exist".format( | |
660 | self.format_image_spec(image_spec)), errno=errno.ENOENT) | |
661 | if snaps: | |
662 | raise rbd.ImageBusy("Image {} has snapshots".format( | |
663 | self.format_image_spec(image_spec)), errno=errno.EBUSY) | |
664 | ||
665 | return 0, self.add_task(ioctx, | |
666 | "Removing image {}".format( | |
667 | self.format_image_spec(image_spec)), | |
668 | refs), '' | |
669 | ||
20effc67 | 670 | def queue_trash_remove(self, image_id_spec: str) -> Tuple[int, str, str]: |
9f95a23c TL |
671 | image_id_spec = self.extract_image_spec(image_id_spec) |
672 | ||
673 | authorize_request(self.module, image_id_spec[0], image_id_spec[1]) | |
674 | self.log.info("queue_trash_remove: {}".format(image_id_spec)) | |
675 | ||
676 | refs = {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE, | |
677 | TASK_REF_POOL_NAME: image_id_spec[0], | |
678 | TASK_REF_POOL_NAMESPACE: image_id_spec[1], | |
679 | TASK_REF_IMAGE_ID: image_id_spec[2]} | |
680 | task = self.find_task(refs) | |
681 | if task: | |
682 | return 0, task.to_json(), '' | |
683 | ||
684 | # verify that image exists in trash | |
20effc67 | 685 | with self.open_ioctx(image_id_spec[:2]) as ioctx: |
9f95a23c TL |
686 | rbd.RBD().trash_get(ioctx, image_id_spec[2]) |
687 | ||
688 | return 0, self.add_task(ioctx, | |
689 | "Removing image {} from trash".format( | |
690 | self.format_image_spec(image_id_spec)), | |
691 | refs), '' | |
692 | ||
20effc67 TL |
693 | def get_migration_status(self, |
694 | ioctx: rados.Ioctx, | |
695 | image_spec: ImageSpecT) -> Optional[MigrationStatusT]: | |
9f95a23c TL |
696 | try: |
697 | return rbd.RBD().migration_status(ioctx, image_spec[2]) | |
698 | except (rbd.InvalidArgument, rbd.ImageNotFound): | |
699 | return None | |
700 | ||
20effc67 TL |
701 | def validate_image_migrating(self, |
702 | image_spec: ImageSpecT, | |
703 | migration_status: Optional[MigrationStatusT]) -> None: | |
9f95a23c TL |
704 | if not migration_status: |
705 | raise rbd.InvalidArgument("Image {} is not migrating".format( | |
706 | self.format_image_spec(image_spec)), errno=errno.EINVAL) | |
707 | ||
20effc67 | 708 | def resolve_pool_name(self, pool_id: str) -> str: |
9f95a23c TL |
709 | osd_map = self.module.get('osd_map') |
710 | for pool in osd_map['pools']: | |
711 | if pool['pool'] == pool_id: | |
712 | return pool['pool_name'] | |
713 | return '<unknown>' | |
714 | ||
20effc67 | 715 | def queue_migration_execute(self, image_spec: str) -> Tuple[int, str, str]: |
9f95a23c TL |
716 | image_spec = self.extract_image_spec(image_spec) |
717 | ||
718 | authorize_request(self.module, image_spec[0], image_spec[1]) | |
719 | self.log.info("queue_migration_execute: {}".format(image_spec)) | |
720 | ||
721 | refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE, | |
722 | TASK_REF_POOL_NAME: image_spec[0], | |
723 | TASK_REF_POOL_NAMESPACE: image_spec[1], | |
724 | TASK_REF_IMAGE_NAME: image_spec[2]} | |
725 | ||
20effc67 | 726 | with self.open_ioctx(image_spec[:2]) as ioctx: |
9f95a23c TL |
727 | status = self.get_migration_status(ioctx, image_spec) |
728 | if status: | |
729 | refs[TASK_REF_IMAGE_ID] = status['dest_image_id'] | |
730 | ||
731 | task = self.find_task(refs) | |
732 | if task: | |
733 | return 0, task.to_json(), '' | |
734 | ||
735 | self.validate_image_migrating(image_spec, status) | |
20effc67 | 736 | assert status |
9f95a23c TL |
737 | if status['state'] not in [rbd.RBD_IMAGE_MIGRATION_STATE_PREPARED, |
738 | rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTING]: | |
739 | raise rbd.InvalidArgument("Image {} is not in ready state".format( | |
740 | self.format_image_spec(image_spec)), errno=errno.EINVAL) | |
741 | ||
742 | source_pool = self.resolve_pool_name(status['source_pool_id']) | |
743 | dest_pool = self.resolve_pool_name(status['dest_pool_id']) | |
744 | return 0, self.add_task(ioctx, | |
745 | "Migrating image {} to {}".format( | |
746 | self.format_image_spec((source_pool, | |
747 | status['source_pool_namespace'], | |
748 | status['source_image_name'])), | |
749 | self.format_image_spec((dest_pool, | |
750 | status['dest_pool_namespace'], | |
751 | status['dest_image_name']))), | |
752 | refs), '' | |
753 | ||
20effc67 | 754 | def queue_migration_commit(self, image_spec: str) -> Tuple[int, str, str]: |
9f95a23c TL |
755 | image_spec = self.extract_image_spec(image_spec) |
756 | ||
757 | authorize_request(self.module, image_spec[0], image_spec[1]) | |
758 | self.log.info("queue_migration_commit: {}".format(image_spec)) | |
759 | ||
760 | refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT, | |
761 | TASK_REF_POOL_NAME: image_spec[0], | |
762 | TASK_REF_POOL_NAMESPACE: image_spec[1], | |
763 | TASK_REF_IMAGE_NAME: image_spec[2]} | |
764 | ||
20effc67 | 765 | with self.open_ioctx(image_spec[:2]) as ioctx: |
9f95a23c TL |
766 | status = self.get_migration_status(ioctx, image_spec) |
767 | if status: | |
768 | refs[TASK_REF_IMAGE_ID] = status['dest_image_id'] | |
769 | ||
770 | task = self.find_task(refs) | |
771 | if task: | |
772 | return 0, task.to_json(), '' | |
773 | ||
774 | self.validate_image_migrating(image_spec, status) | |
20effc67 | 775 | assert status |
9f95a23c TL |
776 | if status['state'] != rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTED: |
777 | raise rbd.InvalidArgument("Image {} has not completed migration".format( | |
778 | self.format_image_spec(image_spec)), errno=errno.EINVAL) | |
779 | ||
780 | return 0, self.add_task(ioctx, | |
781 | "Committing image migration for {}".format( | |
782 | self.format_image_spec(image_spec)), | |
783 | refs), '' | |
784 | ||
20effc67 | 785 | def queue_migration_abort(self, image_spec: str) -> Tuple[int, str, str]: |
9f95a23c TL |
786 | image_spec = self.extract_image_spec(image_spec) |
787 | ||
788 | authorize_request(self.module, image_spec[0], image_spec[1]) | |
789 | self.log.info("queue_migration_abort: {}".format(image_spec)) | |
790 | ||
791 | refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT, | |
792 | TASK_REF_POOL_NAME: image_spec[0], | |
793 | TASK_REF_POOL_NAMESPACE: image_spec[1], | |
794 | TASK_REF_IMAGE_NAME: image_spec[2]} | |
795 | ||
20effc67 | 796 | with self.open_ioctx(image_spec[:2]) as ioctx: |
9f95a23c TL |
797 | status = self.get_migration_status(ioctx, image_spec) |
798 | if status: | |
799 | refs[TASK_REF_IMAGE_ID] = status['dest_image_id'] | |
800 | ||
801 | task = self.find_task(refs) | |
802 | if task: | |
803 | return 0, task.to_json(), '' | |
804 | ||
805 | self.validate_image_migrating(image_spec, status) | |
806 | return 0, self.add_task(ioctx, | |
807 | "Aborting image migration for {}".format( | |
808 | self.format_image_spec(image_spec)), | |
809 | refs), '' | |
810 | ||
20effc67 | 811 | def task_cancel(self, task_id: str) -> Tuple[int, str, str]: |
9f95a23c TL |
812 | self.log.info("task_cancel: {}".format(task_id)) |
813 | ||
814 | task = self.tasks_by_id.get(task_id) | |
815 | if not task or not is_authorized(self.module, | |
816 | task.refs[TASK_REF_POOL_NAME], | |
817 | task.refs[TASK_REF_POOL_NAMESPACE]): | |
818 | return -errno.ENOENT, '', "No such task {}".format(task_id) | |
819 | ||
820 | task.cancel() | |
821 | ||
822 | remove_in_memory = True | |
823 | if self.in_progress_task and self.in_progress_task.task_id == task_id: | |
824 | self.log.info("Attempting to cancel in-progress task: {}".format(str(self.in_progress_task))) | |
825 | remove_in_memory = False | |
826 | ||
827 | # complete any associated event in the progress module | |
828 | self.complete_progress(task) | |
829 | ||
830 | # remove from rbd_task omap | |
831 | with self.open_ioctx((task.refs[TASK_REF_POOL_NAME], | |
832 | task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx: | |
833 | self.remove_task(ioctx, task, remove_in_memory) | |
834 | ||
835 | return 0, "", "" | |
836 | ||
20effc67 | 837 | def task_list(self, task_id: Optional[str]) -> Tuple[int, str, str]: |
9f95a23c TL |
838 | self.log.info("task_list: {}".format(task_id)) |
839 | ||
840 | if task_id: | |
841 | task = self.tasks_by_id.get(task_id) | |
842 | if not task or not is_authorized(self.module, | |
843 | task.refs[TASK_REF_POOL_NAME], | |
844 | task.refs[TASK_REF_POOL_NAMESPACE]): | |
845 | return -errno.ENOENT, '', "No such task {}".format(task_id) | |
846 | ||
20effc67 | 847 | return 0, json.dumps(task.to_dict(), indent=4, sort_keys=True), "" |
9f95a23c | 848 | else: |
20effc67 | 849 | tasks = [] |
9f95a23c TL |
850 | for sequence in sorted(self.tasks_by_sequence.keys()): |
851 | task = self.tasks_by_sequence[sequence] | |
852 | if is_authorized(self.module, | |
853 | task.refs[TASK_REF_POOL_NAME], | |
854 | task.refs[TASK_REF_POOL_NAMESPACE]): | |
20effc67 | 855 | tasks.append(task.to_dict()) |
9f95a23c | 856 | |
20effc67 | 857 | return 0, json.dumps(tasks, indent=4, sort_keys=True), "" |