]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rbd_support/task.py
d283962a365e3f07f1c84fa9da352360d0caf97b
[ceph.git] / ceph / src / pybind / mgr / rbd_support / task.py
1 import errno
2 import json
3 import rados
4 import rbd
5 import re
6 import traceback
7 import uuid
8
9 from contextlib import contextmanager
10 from datetime import datetime, timedelta
11 from functools import partial, wraps
12 from threading import Condition, Lock, Thread
13 from typing import cast, Any, Callable, Dict, Iterator, List, Optional, Tuple, TypeVar
14
15 from .common import (authorize_request, extract_pool_key, get_rbd_pools,
16 is_authorized, GLOBAL_POOL_KEY)
17
18
19 RBD_TASK_OID = "rbd_task"
20
21 TASK_SEQUENCE = "sequence"
22 TASK_ID = "id"
23 TASK_REFS = "refs"
24 TASK_MESSAGE = "message"
25 TASK_RETRY_ATTEMPTS = "retry_attempts"
26 TASK_RETRY_TIME = "retry_time"
27 TASK_RETRY_MESSAGE = "retry_message"
28 TASK_IN_PROGRESS = "in_progress"
29 TASK_PROGRESS = "progress"
30 TASK_CANCELED = "canceled"
31
32 TASK_REF_POOL_NAME = "pool_name"
33 TASK_REF_POOL_NAMESPACE = "pool_namespace"
34 TASK_REF_IMAGE_NAME = "image_name"
35 TASK_REF_IMAGE_ID = "image_id"
36 TASK_REF_ACTION = "action"
37
38 TASK_REF_ACTION_FLATTEN = "flatten"
39 TASK_REF_ACTION_REMOVE = "remove"
40 TASK_REF_ACTION_TRASH_REMOVE = "trash remove"
41 TASK_REF_ACTION_MIGRATION_EXECUTE = "migrate execute"
42 TASK_REF_ACTION_MIGRATION_COMMIT = "migrate commit"
43 TASK_REF_ACTION_MIGRATION_ABORT = "migrate abort"
44
45 VALID_TASK_ACTIONS = [TASK_REF_ACTION_FLATTEN,
46 TASK_REF_ACTION_REMOVE,
47 TASK_REF_ACTION_TRASH_REMOVE,
48 TASK_REF_ACTION_MIGRATION_EXECUTE,
49 TASK_REF_ACTION_MIGRATION_COMMIT,
50 TASK_REF_ACTION_MIGRATION_ABORT]
51
52 TASK_RETRY_INTERVAL = timedelta(seconds=30)
53 TASK_MAX_RETRY_INTERVAL = timedelta(seconds=300)
54 MAX_COMPLETED_TASKS = 50
55
56
57 T = TypeVar('T')
58 FuncT = TypeVar('FuncT', bound=Callable[..., Any])
59
60
61 class Throttle:
62 def __init__(self: Any, throttle_period: timedelta) -> None:
63 self.throttle_period = throttle_period
64 self.time_of_last_call = datetime.min
65
66 def __call__(self: 'Throttle', fn: FuncT) -> FuncT:
67 @wraps(fn)
68 def wrapper(*args: Any, **kwargs: Any) -> Any:
69 now = datetime.now()
70 if self.time_of_last_call + self.throttle_period <= now:
71 self.time_of_last_call = now
72 return fn(*args, **kwargs)
73 return cast(FuncT, wrapper)
74
75
76 TaskRefsT = Dict[str, str]
77
78
79 class Task:
80 def __init__(self, sequence: int, task_id: str, message: str, refs: TaskRefsT):
81 self.sequence = sequence
82 self.task_id = task_id
83 self.message = message
84 self.refs = refs
85 self.retry_message: Optional[str] = None
86 self.retry_attempts = 0
87 self.retry_time: Optional[datetime] = None
88 self.in_progress = False
89 self.progress = 0.0
90 self.canceled = False
91 self.failed = False
92 self.progress_posted = False
93
94 def __str__(self) -> str:
95 return self.to_json()
96
97 @property
98 def sequence_key(self) -> bytes:
99 return "{0:016X}".format(self.sequence).encode()
100
101 def cancel(self) -> None:
102 self.canceled = True
103 self.fail("Operation canceled")
104
105 def fail(self, message: str) -> None:
106 self.failed = True
107 self.failure_message = message
108
109 def to_dict(self) -> Dict[str, Any]:
110 d = {TASK_SEQUENCE: self.sequence,
111 TASK_ID: self.task_id,
112 TASK_MESSAGE: self.message,
113 TASK_REFS: self.refs
114 }
115 if self.retry_message:
116 d[TASK_RETRY_MESSAGE] = self.retry_message
117 if self.retry_attempts:
118 d[TASK_RETRY_ATTEMPTS] = self.retry_attempts
119 if self.retry_time:
120 d[TASK_RETRY_TIME] = self.retry_time.isoformat()
121 if self.in_progress:
122 d[TASK_IN_PROGRESS] = True
123 d[TASK_PROGRESS] = self.progress
124 if self.canceled:
125 d[TASK_CANCELED] = True
126 return d
127
128 def to_json(self) -> str:
129 return str(json.dumps(self.to_dict()))
130
131 @classmethod
132 def from_json(cls, val: str) -> 'Task':
133 try:
134 d = json.loads(val)
135 action = d.get(TASK_REFS, {}).get(TASK_REF_ACTION)
136 if action not in VALID_TASK_ACTIONS:
137 raise ValueError("Invalid task action: {}".format(action))
138
139 return Task(d[TASK_SEQUENCE], d[TASK_ID], d[TASK_MESSAGE], d[TASK_REFS])
140 except json.JSONDecodeError as e:
141 raise ValueError("Invalid JSON ({})".format(str(e)))
142 except KeyError as e:
143 raise ValueError("Invalid task format (missing key {})".format(str(e)))
144
145
146 # pool_name, namespace, image_name
147 ImageSpecT = Tuple[str, str, str]
148 # pool_name, namespace
149 PoolSpecT = Tuple[str, str]
150 MigrationStatusT = Dict[str, str]
151
152 class TaskHandler:
153 lock = Lock()
154 condition = Condition(lock)
155 thread = None
156
157 in_progress_task = None
158 tasks_by_sequence: Dict[int, Task] = dict()
159 tasks_by_id: Dict[str, Task] = dict()
160
161 completed_tasks: List[Task] = []
162
163 sequence = 0
164
165 def __init__(self, module: Any) -> None:
166 self.module = module
167 self.log = module.log
168
169 with self.lock:
170 self.init_task_queue()
171
172 self.thread = Thread(target=self.run)
173 self.thread.start()
174
175 @property
176 def default_pool_name(self) -> str:
177 return self.module.get_ceph_option("rbd_default_pool")
178
179 def extract_pool_spec(self, pool_spec: str) -> PoolSpecT:
180 pool_spec = extract_pool_key(pool_spec)
181 if pool_spec == GLOBAL_POOL_KEY:
182 pool_spec = (self.default_pool_name, '')
183 return cast(PoolSpecT, pool_spec)
184
185 def extract_image_spec(self, image_spec: str) -> ImageSpecT:
186 match = re.match(r'^(?:([^/]+)/(?:([^/]+)/)?)?([^/@]+)$',
187 image_spec or '')
188 if not match:
189 raise ValueError("Invalid image spec: {}".format(image_spec))
190 return (match.group(1) or self.default_pool_name, match.group(2) or '',
191 match.group(3))
192
193 def run(self) -> None:
194 try:
195 self.log.info("TaskHandler: starting")
196 while True:
197 with self.lock:
198 now = datetime.now()
199 for sequence in sorted([sequence for sequence, task
200 in self.tasks_by_sequence.items()
201 if not task.retry_time or task.retry_time <= now]):
202 self.execute_task(sequence)
203
204 self.condition.wait(5)
205 self.log.debug("TaskHandler: tick")
206
207 except Exception as ex:
208 self.log.fatal("Fatal runtime error: {}\n{}".format(
209 ex, traceback.format_exc()))
210
211 @contextmanager
212 def open_ioctx(self, spec: PoolSpecT) -> Iterator[rados.Ioctx]:
213 try:
214 with self.module.rados.open_ioctx(spec[0]) as ioctx:
215 ioctx.set_namespace(spec[1])
216 yield ioctx
217 except rados.ObjectNotFound:
218 self.log.error("Failed to locate pool {}".format(spec[0]))
219 raise
220
221 @classmethod
222 def format_image_spec(cls, image_spec: ImageSpecT) -> str:
223 image = image_spec[2]
224 if image_spec[1]:
225 image = "{}/{}".format(image_spec[1], image)
226 if image_spec[0]:
227 image = "{}/{}".format(image_spec[0], image)
228 return image
229
230 def init_task_queue(self) -> None:
231 for pool_id, pool_name in get_rbd_pools(self.module).items():
232 try:
233 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
234 self.load_task_queue(ioctx, pool_name)
235
236 try:
237 namespaces = rbd.RBD().namespace_list(ioctx)
238 except rbd.OperationNotSupported:
239 self.log.debug("Namespaces not supported")
240 continue
241
242 for namespace in namespaces:
243 ioctx.set_namespace(namespace)
244 self.load_task_queue(ioctx, pool_name)
245
246 except rados.ObjectNotFound:
247 # pool DNE
248 pass
249
250 if self.tasks_by_sequence:
251 self.sequence = list(sorted(self.tasks_by_sequence.keys()))[-1]
252
253 self.log.debug("sequence={}, tasks_by_sequence={}, tasks_by_id={}".format(
254 self.sequence, str(self.tasks_by_sequence), str(self.tasks_by_id)))
255
256 def load_task_queue(self, ioctx: rados.Ioctx, pool_name: str) -> None:
257 pool_spec = pool_name
258 if ioctx.nspace:
259 pool_spec += "/{}".format(ioctx.nspace)
260
261 start_after = ''
262 try:
263 while True:
264 with rados.ReadOpCtx() as read_op:
265 self.log.info("load_task_task: {}, start_after={}".format(
266 pool_spec, start_after))
267 it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
268 ioctx.operate_read_op(read_op, RBD_TASK_OID)
269
270 it = list(it)
271 for k, v in it:
272 start_after = k
273 v = v.decode()
274 self.log.info("load_task_task: task={}".format(v))
275
276 try:
277 task = Task.from_json(v)
278 self.append_task(task)
279 except ValueError:
280 self.log.error("Failed to decode task: pool_spec={}, task={}".format(pool_spec, v))
281
282 if not it:
283 break
284
285 except StopIteration:
286 pass
287 except rados.ObjectNotFound:
288 # rbd_task DNE
289 pass
290
291 def append_task(self, task: Task) -> None:
292 self.tasks_by_sequence[task.sequence] = task
293 self.tasks_by_id[task.task_id] = task
294
295 def task_refs_match(self, task_refs: TaskRefsT, refs: TaskRefsT) -> bool:
296 if TASK_REF_IMAGE_ID not in refs and TASK_REF_IMAGE_ID in task_refs:
297 task_refs = task_refs.copy()
298 del task_refs[TASK_REF_IMAGE_ID]
299
300 self.log.debug("task_refs_match: ref1={}, ref2={}".format(task_refs, refs))
301 return task_refs == refs
302
303 def find_task(self, refs: TaskRefsT) -> Optional[Task]:
304 self.log.debug("find_task: refs={}".format(refs))
305
306 # search for dups and return the original
307 for task_id in reversed(sorted(self.tasks_by_id.keys())):
308 task = self.tasks_by_id[task_id]
309 if self.task_refs_match(task.refs, refs):
310 return task
311
312 # search for a completed task (message replay)
313 for task in reversed(self.completed_tasks):
314 if self.task_refs_match(task.refs, refs):
315 return task
316 else:
317 return None
318
319 def add_task(self,
320 ioctx: rados.Ioctx,
321 message: str,
322 refs: TaskRefsT) -> str:
323 self.log.debug("add_task: message={}, refs={}".format(message, refs))
324
325 # ensure unique uuid across all pools
326 while True:
327 task_id = str(uuid.uuid4())
328 if task_id not in self.tasks_by_id:
329 break
330
331 self.sequence += 1
332 task = Task(self.sequence, task_id, message, refs)
333
334 # add the task to the rbd_task omap
335 task_json = task.to_json()
336 omap_keys = (task.sequence_key, )
337 omap_vals = (str.encode(task_json), )
338 self.log.info("adding task: %s %s",
339 omap_keys[0].decode(),
340 omap_vals[0].decode())
341
342 with rados.WriteOpCtx() as write_op:
343 ioctx.set_omap(write_op, omap_keys, omap_vals)
344 ioctx.operate_write_op(write_op, RBD_TASK_OID)
345 self.append_task(task)
346
347 self.condition.notify()
348 return task_json
349
350 def remove_task(self,
351 ioctx: rados.Ioctx,
352 task: Task,
353 remove_in_memory: bool = True) -> None:
354 self.log.info("remove_task: task={}".format(str(task)))
355 omap_keys = (task.sequence_key, )
356 try:
357 with rados.WriteOpCtx() as write_op:
358 ioctx.remove_omap_keys(write_op, omap_keys)
359 ioctx.operate_write_op(write_op, RBD_TASK_OID)
360 except rados.ObjectNotFound:
361 pass
362
363 if remove_in_memory:
364 try:
365 del self.tasks_by_id[task.task_id]
366 del self.tasks_by_sequence[task.sequence]
367
368 # keep a record of the last N tasks to help avoid command replay
369 # races
370 if not task.failed and not task.canceled:
371 self.log.debug("remove_task: moving to completed tasks")
372 self.completed_tasks.append(task)
373 self.completed_tasks = self.completed_tasks[-MAX_COMPLETED_TASKS:]
374
375 except KeyError:
376 pass
377
378 def execute_task(self, sequence: int) -> None:
379 task = self.tasks_by_sequence[sequence]
380 self.log.info("execute_task: task={}".format(str(task)))
381
382 pool_valid = False
383 try:
384 with self.open_ioctx((task.refs[TASK_REF_POOL_NAME],
385 task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx:
386 pool_valid = True
387
388 action = task.refs[TASK_REF_ACTION]
389 execute_fn = {TASK_REF_ACTION_FLATTEN: self.execute_flatten,
390 TASK_REF_ACTION_REMOVE: self.execute_remove,
391 TASK_REF_ACTION_TRASH_REMOVE: self.execute_trash_remove,
392 TASK_REF_ACTION_MIGRATION_EXECUTE: self.execute_migration_execute,
393 TASK_REF_ACTION_MIGRATION_COMMIT: self.execute_migration_commit,
394 TASK_REF_ACTION_MIGRATION_ABORT: self.execute_migration_abort
395 }.get(action)
396 if not execute_fn:
397 self.log.error("Invalid task action: {}".format(action))
398 else:
399 task.in_progress = True
400 self.in_progress_task = task
401
402 self.lock.release()
403 try:
404 execute_fn(ioctx, task)
405
406 except rbd.OperationCanceled:
407 self.log.info("Operation canceled: task={}".format(
408 str(task)))
409
410 finally:
411 self.lock.acquire()
412
413 task.in_progress = False
414 self.in_progress_task = None
415
416 self.complete_progress(task)
417 self.remove_task(ioctx, task)
418
419 except rados.ObjectNotFound as e:
420 self.log.error("execute_task: {}".format(e))
421 if pool_valid:
422 task.retry_message = "{}".format(e)
423 self.update_progress(task, 0)
424 else:
425 # pool DNE -- remove the task
426 self.complete_progress(task)
427 self.remove_task(ioctx, task)
428
429 except (rados.Error, rbd.Error) as e:
430 self.log.error("execute_task: {}".format(e))
431 task.retry_message = "{}".format(e)
432 self.update_progress(task, 0)
433
434 finally:
435 task.in_progress = False
436 task.retry_attempts += 1
437 task.retry_time = datetime.now() + min(
438 TASK_RETRY_INTERVAL * task.retry_attempts,
439 TASK_MAX_RETRY_INTERVAL)
440
441 def progress_callback(self, task: Task, current: int, total: int) -> int:
442 progress = float(current) / float(total)
443 self.log.debug("progress_callback: task={}, progress={}".format(
444 str(task), progress))
445
446 # avoid deadlocking when a new command comes in during a progress callback
447 if not self.lock.acquire(False):
448 return 0
449
450 try:
451 if not self.in_progress_task or self.in_progress_task.canceled:
452 return -rbd.ECANCELED
453 self.in_progress_task.progress = progress
454 finally:
455 self.lock.release()
456
457 if not task.progress_posted:
458 # delayed creation of progress event until first callback
459 self.post_progress(task, progress)
460 else:
461 self.throttled_update_progress(task, progress)
462
463 return 0
464
465 def execute_flatten(self, ioctx: rados.Ioctx, task: Task) -> None:
466 self.log.info("execute_flatten: task={}".format(str(task)))
467
468 try:
469 with rbd.Image(ioctx, task.refs[TASK_REF_IMAGE_NAME]) as image:
470 image.flatten(on_progress=partial(self.progress_callback, task))
471 except rbd.InvalidArgument:
472 task.fail("Image does not have parent")
473 self.log.info("{}: task={}".format(task.failure_message, str(task)))
474 except rbd.ImageNotFound:
475 task.fail("Image does not exist")
476 self.log.info("{}: task={}".format(task.failure_message, str(task)))
477
478 def execute_remove(self, ioctx: rados.Ioctx, task: Task) -> None:
479 self.log.info("execute_remove: task={}".format(str(task)))
480
481 try:
482 rbd.RBD().remove(ioctx, task.refs[TASK_REF_IMAGE_NAME],
483 on_progress=partial(self.progress_callback, task))
484 except rbd.ImageNotFound:
485 task.fail("Image does not exist")
486 self.log.info("{}: task={}".format(task.failure_message, str(task)))
487
488 def execute_trash_remove(self, ioctx: rados.Ioctx, task: Task) -> None:
489 self.log.info("execute_trash_remove: task={}".format(str(task)))
490
491 try:
492 rbd.RBD().trash_remove(ioctx, task.refs[TASK_REF_IMAGE_ID],
493 on_progress=partial(self.progress_callback, task))
494 except rbd.ImageNotFound:
495 task.fail("Image does not exist")
496 self.log.info("{}: task={}".format(task.failure_message, str(task)))
497
498 def execute_migration_execute(self, ioctx: rados.Ioctx, task: Task) -> None:
499 self.log.info("execute_migration_execute: task={}".format(str(task)))
500
501 try:
502 rbd.RBD().migration_execute(ioctx, task.refs[TASK_REF_IMAGE_NAME],
503 on_progress=partial(self.progress_callback, task))
504 except rbd.ImageNotFound:
505 task.fail("Image does not exist")
506 self.log.info("{}: task={}".format(task.failure_message, str(task)))
507 except rbd.InvalidArgument:
508 task.fail("Image is not migrating")
509 self.log.info("{}: task={}".format(task.failure_message, str(task)))
510
511 def execute_migration_commit(self, ioctx: rados.Ioctx, task: Task) -> None:
512 self.log.info("execute_migration_commit: task={}".format(str(task)))
513
514 try:
515 rbd.RBD().migration_commit(ioctx, task.refs[TASK_REF_IMAGE_NAME],
516 on_progress=partial(self.progress_callback, task))
517 except rbd.ImageNotFound:
518 task.fail("Image does not exist")
519 self.log.info("{}: task={}".format(task.failure_message, str(task)))
520 except rbd.InvalidArgument:
521 task.fail("Image is not migrating or migration not executed")
522 self.log.info("{}: task={}".format(task.failure_message, str(task)))
523
524 def execute_migration_abort(self, ioctx: rados.Ioctx, task: Task) -> None:
525 self.log.info("execute_migration_abort: task={}".format(str(task)))
526
527 try:
528 rbd.RBD().migration_abort(ioctx, task.refs[TASK_REF_IMAGE_NAME],
529 on_progress=partial(self.progress_callback, task))
530 except rbd.ImageNotFound:
531 task.fail("Image does not exist")
532 self.log.info("{}: task={}".format(task.failure_message, str(task)))
533 except rbd.InvalidArgument:
534 task.fail("Image is not migrating")
535 self.log.info("{}: task={}".format(task.failure_message, str(task)))
536
537 def complete_progress(self, task: Task) -> None:
538 if not task.progress_posted:
539 # ensure progress event exists before we complete/fail it
540 self.post_progress(task, 0)
541
542 self.log.debug("complete_progress: task={}".format(str(task)))
543 try:
544 if task.failed:
545 self.module.remote("progress", "fail", task.task_id,
546 task.failure_message)
547 else:
548 self.module.remote("progress", "complete", task.task_id)
549 except ImportError:
550 # progress module is disabled
551 pass
552
553 def _update_progress(self, task: Task, progress: float) -> None:
554 self.log.debug("update_progress: task={}, progress={}".format(str(task), progress))
555 try:
556 refs = {"origin": "rbd_support"}
557 refs.update(task.refs)
558
559 self.module.remote("progress", "update", task.task_id,
560 task.message, progress, refs)
561 except ImportError:
562 # progress module is disabled
563 pass
564
565 def post_progress(self, task: Task, progress: float) -> None:
566 self._update_progress(task, progress)
567 task.progress_posted = True
568
569 def update_progress(self, task: Task, progress: float) -> None:
570 if task.progress_posted:
571 self._update_progress(task, progress)
572
573 @Throttle(timedelta(seconds=1))
574 def throttled_update_progress(self, task: Task, progress: float) -> None:
575 self.update_progress(task, progress)
576
577 def queue_flatten(self, image_spec: str) -> Tuple[int, str, str]:
578 image_spec = self.extract_image_spec(image_spec)
579
580 authorize_request(self.module, image_spec[0], image_spec[1])
581 self.log.info("queue_flatten: {}".format(image_spec))
582
583 refs = {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN,
584 TASK_REF_POOL_NAME: image_spec[0],
585 TASK_REF_POOL_NAMESPACE: image_spec[1],
586 TASK_REF_IMAGE_NAME: image_spec[2]}
587
588 with self.open_ioctx(image_spec[:2]) as ioctx:
589 try:
590 with rbd.Image(ioctx, image_spec[2]) as image:
591 refs[TASK_REF_IMAGE_ID] = image.id()
592
593 try:
594 parent_image_id = image.parent_id()
595 except rbd.ImageNotFound:
596 parent_image_id = None
597
598 except rbd.ImageNotFound:
599 pass
600
601 task = self.find_task(refs)
602 if task:
603 return 0, task.to_json(), ''
604
605 if TASK_REF_IMAGE_ID not in refs:
606 raise rbd.ImageNotFound("Image {} does not exist".format(
607 self.format_image_spec(image_spec)), errno=errno.ENOENT)
608 if not parent_image_id:
609 raise rbd.ImageNotFound("Image {} does not have a parent".format(
610 self.format_image_spec(image_spec)), errno=errno.ENOENT)
611
612 return 0, self.add_task(ioctx,
613 "Flattening image {}".format(
614 self.format_image_spec(image_spec)),
615 refs), ""
616
617 def queue_remove(self, image_spec: str) -> Tuple[int, str, str]:
618 image_spec = self.extract_image_spec(image_spec)
619
620 authorize_request(self.module, image_spec[0], image_spec[1])
621 self.log.info("queue_remove: {}".format(image_spec))
622
623 refs = {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE,
624 TASK_REF_POOL_NAME: image_spec[0],
625 TASK_REF_POOL_NAMESPACE: image_spec[1],
626 TASK_REF_IMAGE_NAME: image_spec[2]}
627
628 with self.open_ioctx(image_spec[:2]) as ioctx:
629 try:
630 with rbd.Image(ioctx, image_spec[2]) as image:
631 refs[TASK_REF_IMAGE_ID] = image.id()
632 snaps = list(image.list_snaps())
633
634 except rbd.ImageNotFound:
635 pass
636
637 task = self.find_task(refs)
638 if task:
639 return 0, task.to_json(), ''
640
641 if TASK_REF_IMAGE_ID not in refs:
642 raise rbd.ImageNotFound("Image {} does not exist".format(
643 self.format_image_spec(image_spec)), errno=errno.ENOENT)
644 if snaps:
645 raise rbd.ImageBusy("Image {} has snapshots".format(
646 self.format_image_spec(image_spec)), errno=errno.EBUSY)
647
648 return 0, self.add_task(ioctx,
649 "Removing image {}".format(
650 self.format_image_spec(image_spec)),
651 refs), ''
652
653 def queue_trash_remove(self, image_id_spec: str) -> Tuple[int, str, str]:
654 image_id_spec = self.extract_image_spec(image_id_spec)
655
656 authorize_request(self.module, image_id_spec[0], image_id_spec[1])
657 self.log.info("queue_trash_remove: {}".format(image_id_spec))
658
659 refs = {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE,
660 TASK_REF_POOL_NAME: image_id_spec[0],
661 TASK_REF_POOL_NAMESPACE: image_id_spec[1],
662 TASK_REF_IMAGE_ID: image_id_spec[2]}
663 task = self.find_task(refs)
664 if task:
665 return 0, task.to_json(), ''
666
667 # verify that image exists in trash
668 with self.open_ioctx(image_id_spec[:2]) as ioctx:
669 rbd.RBD().trash_get(ioctx, image_id_spec[2])
670
671 return 0, self.add_task(ioctx,
672 "Removing image {} from trash".format(
673 self.format_image_spec(image_id_spec)),
674 refs), ''
675
676 def get_migration_status(self,
677 ioctx: rados.Ioctx,
678 image_spec: ImageSpecT) -> Optional[MigrationStatusT]:
679 try:
680 return rbd.RBD().migration_status(ioctx, image_spec[2])
681 except (rbd.InvalidArgument, rbd.ImageNotFound):
682 return None
683
684 def validate_image_migrating(self,
685 image_spec: ImageSpecT,
686 migration_status: Optional[MigrationStatusT]) -> None:
687 if not migration_status:
688 raise rbd.InvalidArgument("Image {} is not migrating".format(
689 self.format_image_spec(image_spec)), errno=errno.EINVAL)
690
691 def resolve_pool_name(self, pool_id: str) -> str:
692 osd_map = self.module.get('osd_map')
693 for pool in osd_map['pools']:
694 if pool['pool'] == pool_id:
695 return pool['pool_name']
696 return '<unknown>'
697
698 def queue_migration_execute(self, image_spec: str) -> Tuple[int, str, str]:
699 image_spec = self.extract_image_spec(image_spec)
700
701 authorize_request(self.module, image_spec[0], image_spec[1])
702 self.log.info("queue_migration_execute: {}".format(image_spec))
703
704 refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE,
705 TASK_REF_POOL_NAME: image_spec[0],
706 TASK_REF_POOL_NAMESPACE: image_spec[1],
707 TASK_REF_IMAGE_NAME: image_spec[2]}
708
709 with self.open_ioctx(image_spec[:2]) as ioctx:
710 status = self.get_migration_status(ioctx, image_spec)
711 if status:
712 refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
713
714 task = self.find_task(refs)
715 if task:
716 return 0, task.to_json(), ''
717
718 self.validate_image_migrating(image_spec, status)
719 assert status
720 if status['state'] not in [rbd.RBD_IMAGE_MIGRATION_STATE_PREPARED,
721 rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTING]:
722 raise rbd.InvalidArgument("Image {} is not in ready state".format(
723 self.format_image_spec(image_spec)), errno=errno.EINVAL)
724
725 source_pool = self.resolve_pool_name(status['source_pool_id'])
726 dest_pool = self.resolve_pool_name(status['dest_pool_id'])
727 return 0, self.add_task(ioctx,
728 "Migrating image {} to {}".format(
729 self.format_image_spec((source_pool,
730 status['source_pool_namespace'],
731 status['source_image_name'])),
732 self.format_image_spec((dest_pool,
733 status['dest_pool_namespace'],
734 status['dest_image_name']))),
735 refs), ''
736
737 def queue_migration_commit(self, image_spec: str) -> Tuple[int, str, str]:
738 image_spec = self.extract_image_spec(image_spec)
739
740 authorize_request(self.module, image_spec[0], image_spec[1])
741 self.log.info("queue_migration_commit: {}".format(image_spec))
742
743 refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT,
744 TASK_REF_POOL_NAME: image_spec[0],
745 TASK_REF_POOL_NAMESPACE: image_spec[1],
746 TASK_REF_IMAGE_NAME: image_spec[2]}
747
748 with self.open_ioctx(image_spec[:2]) as ioctx:
749 status = self.get_migration_status(ioctx, image_spec)
750 if status:
751 refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
752
753 task = self.find_task(refs)
754 if task:
755 return 0, task.to_json(), ''
756
757 self.validate_image_migrating(image_spec, status)
758 assert status
759 if status['state'] != rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTED:
760 raise rbd.InvalidArgument("Image {} has not completed migration".format(
761 self.format_image_spec(image_spec)), errno=errno.EINVAL)
762
763 return 0, self.add_task(ioctx,
764 "Committing image migration for {}".format(
765 self.format_image_spec(image_spec)),
766 refs), ''
767
768 def queue_migration_abort(self, image_spec: str) -> Tuple[int, str, str]:
769 image_spec = self.extract_image_spec(image_spec)
770
771 authorize_request(self.module, image_spec[0], image_spec[1])
772 self.log.info("queue_migration_abort: {}".format(image_spec))
773
774 refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT,
775 TASK_REF_POOL_NAME: image_spec[0],
776 TASK_REF_POOL_NAMESPACE: image_spec[1],
777 TASK_REF_IMAGE_NAME: image_spec[2]}
778
779 with self.open_ioctx(image_spec[:2]) as ioctx:
780 status = self.get_migration_status(ioctx, image_spec)
781 if status:
782 refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
783
784 task = self.find_task(refs)
785 if task:
786 return 0, task.to_json(), ''
787
788 self.validate_image_migrating(image_spec, status)
789 return 0, self.add_task(ioctx,
790 "Aborting image migration for {}".format(
791 self.format_image_spec(image_spec)),
792 refs), ''
793
794 def task_cancel(self, task_id: str) -> Tuple[int, str, str]:
795 self.log.info("task_cancel: {}".format(task_id))
796
797 task = self.tasks_by_id.get(task_id)
798 if not task or not is_authorized(self.module,
799 task.refs[TASK_REF_POOL_NAME],
800 task.refs[TASK_REF_POOL_NAMESPACE]):
801 return -errno.ENOENT, '', "No such task {}".format(task_id)
802
803 task.cancel()
804
805 remove_in_memory = True
806 if self.in_progress_task and self.in_progress_task.task_id == task_id:
807 self.log.info("Attempting to cancel in-progress task: {}".format(str(self.in_progress_task)))
808 remove_in_memory = False
809
810 # complete any associated event in the progress module
811 self.complete_progress(task)
812
813 # remove from rbd_task omap
814 with self.open_ioctx((task.refs[TASK_REF_POOL_NAME],
815 task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx:
816 self.remove_task(ioctx, task, remove_in_memory)
817
818 return 0, "", ""
819
820 def task_list(self, task_id: Optional[str]) -> Tuple[int, str, str]:
821 self.log.info("task_list: {}".format(task_id))
822
823 if task_id:
824 task = self.tasks_by_id.get(task_id)
825 if not task or not is_authorized(self.module,
826 task.refs[TASK_REF_POOL_NAME],
827 task.refs[TASK_REF_POOL_NAMESPACE]):
828 return -errno.ENOENT, '', "No such task {}".format(task_id)
829
830 return 0, json.dumps(task.to_dict(), indent=4, sort_keys=True), ""
831 else:
832 tasks = []
833 for sequence in sorted(self.tasks_by_sequence.keys()):
834 task = self.tasks_by_sequence[sequence]
835 if is_authorized(self.module,
836 task.refs[TASK_REF_POOL_NAME],
837 task.refs[TASK_REF_POOL_NAMESPACE]):
838 tasks.append(task.to_dict())
839
840 return 0, json.dumps(tasks, indent=4, sort_keys=True), ""