9 from contextlib
import contextmanager
10 from datetime
import datetime
, timedelta
11 from functools
import partial
, wraps
12 from threading
import Condition
, Lock
, Thread
13 from typing
import cast
, Any
, Callable
, Dict
, Iterator
, List
, Optional
, Tuple
, TypeVar
15 from .common
import (authorize_request
, extract_pool_key
, get_rbd_pools
,
16 is_authorized
, GLOBAL_POOL_KEY
)
19 RBD_TASK_OID
= "rbd_task"
21 TASK_SEQUENCE
= "sequence"
24 TASK_MESSAGE
= "message"
25 TASK_RETRY_ATTEMPTS
= "retry_attempts"
26 TASK_RETRY_TIME
= "retry_time"
27 TASK_RETRY_MESSAGE
= "retry_message"
28 TASK_IN_PROGRESS
= "in_progress"
29 TASK_PROGRESS
= "progress"
30 TASK_CANCELED
= "canceled"
32 TASK_REF_POOL_NAME
= "pool_name"
33 TASK_REF_POOL_NAMESPACE
= "pool_namespace"
34 TASK_REF_IMAGE_NAME
= "image_name"
35 TASK_REF_IMAGE_ID
= "image_id"
36 TASK_REF_ACTION
= "action"
38 TASK_REF_ACTION_FLATTEN
= "flatten"
39 TASK_REF_ACTION_REMOVE
= "remove"
40 TASK_REF_ACTION_TRASH_REMOVE
= "trash remove"
41 TASK_REF_ACTION_MIGRATION_EXECUTE
= "migrate execute"
42 TASK_REF_ACTION_MIGRATION_COMMIT
= "migrate commit"
43 TASK_REF_ACTION_MIGRATION_ABORT
= "migrate abort"
45 VALID_TASK_ACTIONS
= [TASK_REF_ACTION_FLATTEN
,
46 TASK_REF_ACTION_REMOVE
,
47 TASK_REF_ACTION_TRASH_REMOVE
,
48 TASK_REF_ACTION_MIGRATION_EXECUTE
,
49 TASK_REF_ACTION_MIGRATION_COMMIT
,
50 TASK_REF_ACTION_MIGRATION_ABORT
]
52 TASK_RETRY_INTERVAL
= timedelta(seconds
=30)
53 TASK_MAX_RETRY_INTERVAL
= timedelta(seconds
=300)
54 MAX_COMPLETED_TASKS
= 50
58 FuncT
= TypeVar('FuncT', bound
=Callable
[..., Any
])
62 def __init__(self
: Any
, throttle_period
: timedelta
) -> None:
63 self
.throttle_period
= throttle_period
64 self
.time_of_last_call
= datetime
.min
66 def __call__(self
: 'Throttle', fn
: FuncT
) -> FuncT
:
68 def wrapper(*args
: Any
, **kwargs
: Any
) -> Any
:
70 if self
.time_of_last_call
+ self
.throttle_period
<= now
:
71 self
.time_of_last_call
= now
72 return fn(*args
, **kwargs
)
73 return cast(FuncT
, wrapper
)
76 TaskRefsT
= Dict
[str, str]
80 def __init__(self
, sequence
: int, task_id
: str, message
: str, refs
: TaskRefsT
):
81 self
.sequence
= sequence
82 self
.task_id
= task_id
83 self
.message
= message
85 self
.retry_message
: Optional
[str] = None
86 self
.retry_attempts
= 0
87 self
.retry_time
: Optional
[datetime
] = None
88 self
.in_progress
= False
92 self
.progress_posted
= False
94 def __str__(self
) -> str:
98 def sequence_key(self
) -> bytes
:
99 return "{0:016X}".format(self
.sequence
).encode()
101 def cancel(self
) -> None:
103 self
.fail("Operation canceled")
105 def fail(self
, message
: str) -> None:
107 self
.failure_message
= message
109 def to_dict(self
) -> Dict
[str, Any
]:
110 d
= {TASK_SEQUENCE
: self
.sequence
,
111 TASK_ID
: self
.task_id
,
112 TASK_MESSAGE
: self
.message
,
115 if self
.retry_message
:
116 d
[TASK_RETRY_MESSAGE
] = self
.retry_message
117 if self
.retry_attempts
:
118 d
[TASK_RETRY_ATTEMPTS
] = self
.retry_attempts
120 d
[TASK_RETRY_TIME
] = self
.retry_time
.isoformat()
122 d
[TASK_IN_PROGRESS
] = True
123 d
[TASK_PROGRESS
] = self
.progress
125 d
[TASK_CANCELED
] = True
128 def to_json(self
) -> str:
129 return str(json
.dumps(self
.to_dict()))
132 def from_json(cls
, val
: str) -> 'Task':
135 action
= d
.get(TASK_REFS
, {}).get(TASK_REF_ACTION
)
136 if action
not in VALID_TASK_ACTIONS
:
137 raise ValueError("Invalid task action: {}".format(action
))
139 return Task(d
[TASK_SEQUENCE
], d
[TASK_ID
], d
[TASK_MESSAGE
], d
[TASK_REFS
])
140 except json
.JSONDecodeError
as e
:
141 raise ValueError("Invalid JSON ({})".format(str(e
)))
142 except KeyError as e
:
143 raise ValueError("Invalid task format (missing key {})".format(str(e
)))
146 # pool_name, namespace, image_name
147 ImageSpecT
= Tuple
[str, str, str]
148 # pool_name, namespace
149 PoolSpecT
= Tuple
[str, str]
150 MigrationStatusT
= Dict
[str, str]
154 condition
= Condition(lock
)
157 in_progress_task
= None
158 tasks_by_sequence
: Dict
[int, Task
] = dict()
159 tasks_by_id
: Dict
[str, Task
] = dict()
161 completed_tasks
: List
[Task
] = []
165 def __init__(self
, module
: Any
) -> None:
167 self
.log
= module
.log
170 self
.init_task_queue()
172 self
.thread
= Thread(target
=self
.run
)
176 def default_pool_name(self
) -> str:
177 return self
.module
.get_ceph_option("rbd_default_pool")
179 def extract_pool_spec(self
, pool_spec
: str) -> PoolSpecT
:
180 pool_spec
= extract_pool_key(pool_spec
)
181 if pool_spec
== GLOBAL_POOL_KEY
:
182 pool_spec
= (self
.default_pool_name
, '')
183 return cast(PoolSpecT
, pool_spec
)
185 def extract_image_spec(self
, image_spec
: str) -> ImageSpecT
:
186 match
= re
.match(r
'^(?:([^/]+)/(?:([^/]+)/)?)?([^/@]+)$',
189 raise ValueError("Invalid image spec: {}".format(image_spec
))
190 return (match
.group(1) or self
.default_pool_name
, match
.group(2) or '',
193 def run(self
) -> None:
195 self
.log
.info("TaskHandler: starting")
199 for sequence
in sorted([sequence
for sequence
, task
200 in self
.tasks_by_sequence
.items()
201 if not task
.retry_time
or task
.retry_time
<= now
]):
202 self
.execute_task(sequence
)
204 self
.condition
.wait(5)
205 self
.log
.debug("TaskHandler: tick")
207 except Exception as ex
:
208 self
.log
.fatal("Fatal runtime error: {}\n{}".format(
209 ex
, traceback
.format_exc()))
212 def open_ioctx(self
, spec
: PoolSpecT
) -> Iterator
[rados
.Ioctx
]:
214 with self
.module
.rados
.open_ioctx(spec
[0]) as ioctx
:
215 ioctx
.set_namespace(spec
[1])
217 except rados
.ObjectNotFound
:
218 self
.log
.error("Failed to locate pool {}".format(spec
[0]))
222 def format_image_spec(cls
, image_spec
: ImageSpecT
) -> str:
223 image
= image_spec
[2]
225 image
= "{}/{}".format(image_spec
[1], image
)
227 image
= "{}/{}".format(image_spec
[0], image
)
230 def init_task_queue(self
) -> None:
231 for pool_id
, pool_name
in get_rbd_pools(self
.module
).items():
233 with self
.module
.rados
.open_ioctx2(int(pool_id
)) as ioctx
:
234 self
.load_task_queue(ioctx
, pool_name
)
237 namespaces
= rbd
.RBD().namespace_list(ioctx
)
238 except rbd
.OperationNotSupported
:
239 self
.log
.debug("Namespaces not supported")
242 for namespace
in namespaces
:
243 ioctx
.set_namespace(namespace
)
244 self
.load_task_queue(ioctx
, pool_name
)
246 except rados
.ObjectNotFound
:
250 if self
.tasks_by_sequence
:
251 self
.sequence
= list(sorted(self
.tasks_by_sequence
.keys()))[-1]
253 self
.log
.debug("sequence={}, tasks_by_sequence={}, tasks_by_id={}".format(
254 self
.sequence
, str(self
.tasks_by_sequence
), str(self
.tasks_by_id
)))
256 def load_task_queue(self
, ioctx
: rados
.Ioctx
, pool_name
: str) -> None:
257 pool_spec
= pool_name
259 pool_spec
+= "/{}".format(ioctx
.nspace
)
264 with rados
.ReadOpCtx() as read_op
:
265 self
.log
.info("load_task_task: {}, start_after={}".format(
266 pool_spec
, start_after
))
267 it
, ret
= ioctx
.get_omap_vals(read_op
, start_after
, "", 128)
268 ioctx
.operate_read_op(read_op
, RBD_TASK_OID
)
274 self
.log
.info("load_task_task: task={}".format(v
))
277 task
= Task
.from_json(v
)
278 self
.append_task(task
)
280 self
.log
.error("Failed to decode task: pool_spec={}, task={}".format(pool_spec
, v
))
285 except StopIteration:
287 except rados
.ObjectNotFound
:
291 def append_task(self
, task
: Task
) -> None:
292 self
.tasks_by_sequence
[task
.sequence
] = task
293 self
.tasks_by_id
[task
.task_id
] = task
295 def task_refs_match(self
, task_refs
: TaskRefsT
, refs
: TaskRefsT
) -> bool:
296 if TASK_REF_IMAGE_ID
not in refs
and TASK_REF_IMAGE_ID
in task_refs
:
297 task_refs
= task_refs
.copy()
298 del task_refs
[TASK_REF_IMAGE_ID
]
300 self
.log
.debug("task_refs_match: ref1={}, ref2={}".format(task_refs
, refs
))
301 return task_refs
== refs
303 def find_task(self
, refs
: TaskRefsT
) -> Optional
[Task
]:
304 self
.log
.debug("find_task: refs={}".format(refs
))
306 # search for dups and return the original
307 for task_id
in reversed(sorted(self
.tasks_by_id
.keys())):
308 task
= self
.tasks_by_id
[task_id
]
309 if self
.task_refs_match(task
.refs
, refs
):
312 # search for a completed task (message replay)
313 for task
in reversed(self
.completed_tasks
):
314 if self
.task_refs_match(task
.refs
, refs
):
322 refs
: TaskRefsT
) -> str:
323 self
.log
.debug("add_task: message={}, refs={}".format(message
, refs
))
325 # ensure unique uuid across all pools
327 task_id
= str(uuid
.uuid4())
328 if task_id
not in self
.tasks_by_id
:
332 task
= Task(self
.sequence
, task_id
, message
, refs
)
334 # add the task to the rbd_task omap
335 task_json
= task
.to_json()
336 omap_keys
= (task
.sequence_key
, )
337 omap_vals
= (str.encode(task_json
), )
338 self
.log
.info("adding task: %s %s",
339 omap_keys
[0].decode(),
340 omap_vals
[0].decode())
342 with rados
.WriteOpCtx() as write_op
:
343 ioctx
.set_omap(write_op
, omap_keys
, omap_vals
)
344 ioctx
.operate_write_op(write_op
, RBD_TASK_OID
)
345 self
.append_task(task
)
347 self
.condition
.notify()
350 def remove_task(self
,
353 remove_in_memory
: bool = True) -> None:
354 self
.log
.info("remove_task: task={}".format(str(task
)))
355 omap_keys
= (task
.sequence_key
, )
357 with rados
.WriteOpCtx() as write_op
:
358 ioctx
.remove_omap_keys(write_op
, omap_keys
)
359 ioctx
.operate_write_op(write_op
, RBD_TASK_OID
)
360 except rados
.ObjectNotFound
:
365 del self
.tasks_by_id
[task
.task_id
]
366 del self
.tasks_by_sequence
[task
.sequence
]
368 # keep a record of the last N tasks to help avoid command replay
370 if not task
.failed
and not task
.canceled
:
371 self
.log
.debug("remove_task: moving to completed tasks")
372 self
.completed_tasks
.append(task
)
373 self
.completed_tasks
= self
.completed_tasks
[-MAX_COMPLETED_TASKS
:]
378 def execute_task(self
, sequence
: int) -> None:
379 task
= self
.tasks_by_sequence
[sequence
]
380 self
.log
.info("execute_task: task={}".format(str(task
)))
384 with self
.open_ioctx((task
.refs
[TASK_REF_POOL_NAME
],
385 task
.refs
[TASK_REF_POOL_NAMESPACE
])) as ioctx
:
388 action
= task
.refs
[TASK_REF_ACTION
]
389 execute_fn
= {TASK_REF_ACTION_FLATTEN
: self
.execute_flatten
,
390 TASK_REF_ACTION_REMOVE
: self
.execute_remove
,
391 TASK_REF_ACTION_TRASH_REMOVE
: self
.execute_trash_remove
,
392 TASK_REF_ACTION_MIGRATION_EXECUTE
: self
.execute_migration_execute
,
393 TASK_REF_ACTION_MIGRATION_COMMIT
: self
.execute_migration_commit
,
394 TASK_REF_ACTION_MIGRATION_ABORT
: self
.execute_migration_abort
397 self
.log
.error("Invalid task action: {}".format(action
))
399 task
.in_progress
= True
400 self
.in_progress_task
= task
404 execute_fn(ioctx
, task
)
406 except rbd
.OperationCanceled
:
407 self
.log
.info("Operation canceled: task={}".format(
413 task
.in_progress
= False
414 self
.in_progress_task
= None
416 self
.complete_progress(task
)
417 self
.remove_task(ioctx
, task
)
419 except rados
.ObjectNotFound
as e
:
420 self
.log
.error("execute_task: {}".format(e
))
422 task
.retry_message
= "{}".format(e
)
423 self
.update_progress(task
, 0)
425 # pool DNE -- remove the task
426 self
.complete_progress(task
)
427 self
.remove_task(ioctx
, task
)
429 except (rados
.Error
, rbd
.Error
) as e
:
430 self
.log
.error("execute_task: {}".format(e
))
431 task
.retry_message
= "{}".format(e
)
432 self
.update_progress(task
, 0)
435 task
.in_progress
= False
436 task
.retry_attempts
+= 1
437 task
.retry_time
= datetime
.now() + min(
438 TASK_RETRY_INTERVAL
* task
.retry_attempts
,
439 TASK_MAX_RETRY_INTERVAL
)
441 def progress_callback(self
, task
: Task
, current
: int, total
: int) -> int:
442 progress
= float(current
) / float(total
)
443 self
.log
.debug("progress_callback: task={}, progress={}".format(
444 str(task
), progress
))
446 # avoid deadlocking when a new command comes in during a progress callback
447 if not self
.lock
.acquire(False):
451 if not self
.in_progress_task
or self
.in_progress_task
.canceled
:
452 return -rbd
.ECANCELED
453 self
.in_progress_task
.progress
= progress
457 if not task
.progress_posted
:
458 # delayed creation of progress event until first callback
459 self
.post_progress(task
, progress
)
461 self
.throttled_update_progress(task
, progress
)
465 def execute_flatten(self
, ioctx
: rados
.Ioctx
, task
: Task
) -> None:
466 self
.log
.info("execute_flatten: task={}".format(str(task
)))
469 with rbd
.Image(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
]) as image
:
470 image
.flatten(on_progress
=partial(self
.progress_callback
, task
))
471 except rbd
.InvalidArgument
:
472 task
.fail("Image does not have parent")
473 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
474 except rbd
.ImageNotFound
:
475 task
.fail("Image does not exist")
476 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
478 def execute_remove(self
, ioctx
: rados
.Ioctx
, task
: Task
) -> None:
479 self
.log
.info("execute_remove: task={}".format(str(task
)))
482 rbd
.RBD().remove(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
],
483 on_progress
=partial(self
.progress_callback
, task
))
484 except rbd
.ImageNotFound
:
485 task
.fail("Image does not exist")
486 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
488 def execute_trash_remove(self
, ioctx
: rados
.Ioctx
, task
: Task
) -> None:
489 self
.log
.info("execute_trash_remove: task={}".format(str(task
)))
492 rbd
.RBD().trash_remove(ioctx
, task
.refs
[TASK_REF_IMAGE_ID
],
493 on_progress
=partial(self
.progress_callback
, task
))
494 except rbd
.ImageNotFound
:
495 task
.fail("Image does not exist")
496 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
498 def execute_migration_execute(self
, ioctx
: rados
.Ioctx
, task
: Task
) -> None:
499 self
.log
.info("execute_migration_execute: task={}".format(str(task
)))
502 rbd
.RBD().migration_execute(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
],
503 on_progress
=partial(self
.progress_callback
, task
))
504 except rbd
.ImageNotFound
:
505 task
.fail("Image does not exist")
506 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
507 except rbd
.InvalidArgument
:
508 task
.fail("Image is not migrating")
509 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
511 def execute_migration_commit(self
, ioctx
: rados
.Ioctx
, task
: Task
) -> None:
512 self
.log
.info("execute_migration_commit: task={}".format(str(task
)))
515 rbd
.RBD().migration_commit(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
],
516 on_progress
=partial(self
.progress_callback
, task
))
517 except rbd
.ImageNotFound
:
518 task
.fail("Image does not exist")
519 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
520 except rbd
.InvalidArgument
:
521 task
.fail("Image is not migrating or migration not executed")
522 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
524 def execute_migration_abort(self
, ioctx
: rados
.Ioctx
, task
: Task
) -> None:
525 self
.log
.info("execute_migration_abort: task={}".format(str(task
)))
528 rbd
.RBD().migration_abort(ioctx
, task
.refs
[TASK_REF_IMAGE_NAME
],
529 on_progress
=partial(self
.progress_callback
, task
))
530 except rbd
.ImageNotFound
:
531 task
.fail("Image does not exist")
532 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
533 except rbd
.InvalidArgument
:
534 task
.fail("Image is not migrating")
535 self
.log
.info("{}: task={}".format(task
.failure_message
, str(task
)))
537 def complete_progress(self
, task
: Task
) -> None:
538 if not task
.progress_posted
:
539 # ensure progress event exists before we complete/fail it
540 self
.post_progress(task
, 0)
542 self
.log
.debug("complete_progress: task={}".format(str(task
)))
545 self
.module
.remote("progress", "fail", task
.task_id
,
546 task
.failure_message
)
548 self
.module
.remote("progress", "complete", task
.task_id
)
550 # progress module is disabled
553 def _update_progress(self
, task
: Task
, progress
: float) -> None:
554 self
.log
.debug("update_progress: task={}, progress={}".format(str(task
), progress
))
556 refs
= {"origin": "rbd_support"}
557 refs
.update(task
.refs
)
559 self
.module
.remote("progress", "update", task
.task_id
,
560 task
.message
, progress
, refs
)
562 # progress module is disabled
565 def post_progress(self
, task
: Task
, progress
: float) -> None:
566 self
._update
_progress
(task
, progress
)
567 task
.progress_posted
= True
569 def update_progress(self
, task
: Task
, progress
: float) -> None:
570 if task
.progress_posted
:
571 self
._update
_progress
(task
, progress
)
573 @Throttle(timedelta(seconds
=1))
574 def throttled_update_progress(self
, task
: Task
, progress
: float) -> None:
575 self
.update_progress(task
, progress
)
577 def queue_flatten(self
, image_spec
: str) -> Tuple
[int, str, str]:
578 image_spec
= self
.extract_image_spec(image_spec
)
580 authorize_request(self
.module
, image_spec
[0], image_spec
[1])
581 self
.log
.info("queue_flatten: {}".format(image_spec
))
583 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_FLATTEN
,
584 TASK_REF_POOL_NAME
: image_spec
[0],
585 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
586 TASK_REF_IMAGE_NAME
: image_spec
[2]}
588 with self
.open_ioctx(image_spec
[:2]) as ioctx
:
590 with rbd
.Image(ioctx
, image_spec
[2]) as image
:
591 refs
[TASK_REF_IMAGE_ID
] = image
.id()
594 parent_image_id
= image
.parent_id()
595 except rbd
.ImageNotFound
:
596 parent_image_id
= None
598 except rbd
.ImageNotFound
:
601 task
= self
.find_task(refs
)
603 return 0, task
.to_json(), ''
605 if TASK_REF_IMAGE_ID
not in refs
:
606 raise rbd
.ImageNotFound("Image {} does not exist".format(
607 self
.format_image_spec(image_spec
)), errno
=errno
.ENOENT
)
608 if not parent_image_id
:
609 raise rbd
.ImageNotFound("Image {} does not have a parent".format(
610 self
.format_image_spec(image_spec
)), errno
=errno
.ENOENT
)
612 return 0, self
.add_task(ioctx
,
613 "Flattening image {}".format(
614 self
.format_image_spec(image_spec
)),
617 def queue_remove(self
, image_spec
: str) -> Tuple
[int, str, str]:
618 image_spec
= self
.extract_image_spec(image_spec
)
620 authorize_request(self
.module
, image_spec
[0], image_spec
[1])
621 self
.log
.info("queue_remove: {}".format(image_spec
))
623 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_REMOVE
,
624 TASK_REF_POOL_NAME
: image_spec
[0],
625 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
626 TASK_REF_IMAGE_NAME
: image_spec
[2]}
628 with self
.open_ioctx(image_spec
[:2]) as ioctx
:
630 with rbd
.Image(ioctx
, image_spec
[2]) as image
:
631 refs
[TASK_REF_IMAGE_ID
] = image
.id()
632 snaps
= list(image
.list_snaps())
634 except rbd
.ImageNotFound
:
637 task
= self
.find_task(refs
)
639 return 0, task
.to_json(), ''
641 if TASK_REF_IMAGE_ID
not in refs
:
642 raise rbd
.ImageNotFound("Image {} does not exist".format(
643 self
.format_image_spec(image_spec
)), errno
=errno
.ENOENT
)
645 raise rbd
.ImageBusy("Image {} has snapshots".format(
646 self
.format_image_spec(image_spec
)), errno
=errno
.EBUSY
)
648 return 0, self
.add_task(ioctx
,
649 "Removing image {}".format(
650 self
.format_image_spec(image_spec
)),
653 def queue_trash_remove(self
, image_id_spec
: str) -> Tuple
[int, str, str]:
654 image_id_spec
= self
.extract_image_spec(image_id_spec
)
656 authorize_request(self
.module
, image_id_spec
[0], image_id_spec
[1])
657 self
.log
.info("queue_trash_remove: {}".format(image_id_spec
))
659 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_TRASH_REMOVE
,
660 TASK_REF_POOL_NAME
: image_id_spec
[0],
661 TASK_REF_POOL_NAMESPACE
: image_id_spec
[1],
662 TASK_REF_IMAGE_ID
: image_id_spec
[2]}
663 task
= self
.find_task(refs
)
665 return 0, task
.to_json(), ''
667 # verify that image exists in trash
668 with self
.open_ioctx(image_id_spec
[:2]) as ioctx
:
669 rbd
.RBD().trash_get(ioctx
, image_id_spec
[2])
671 return 0, self
.add_task(ioctx
,
672 "Removing image {} from trash".format(
673 self
.format_image_spec(image_id_spec
)),
676 def get_migration_status(self
,
678 image_spec
: ImageSpecT
) -> Optional
[MigrationStatusT
]:
680 return rbd
.RBD().migration_status(ioctx
, image_spec
[2])
681 except (rbd
.InvalidArgument
, rbd
.ImageNotFound
):
684 def validate_image_migrating(self
,
685 image_spec
: ImageSpecT
,
686 migration_status
: Optional
[MigrationStatusT
]) -> None:
687 if not migration_status
:
688 raise rbd
.InvalidArgument("Image {} is not migrating".format(
689 self
.format_image_spec(image_spec
)), errno
=errno
.EINVAL
)
691 def resolve_pool_name(self
, pool_id
: str) -> str:
692 osd_map
= self
.module
.get('osd_map')
693 for pool
in osd_map
['pools']:
694 if pool
['pool'] == pool_id
:
695 return pool
['pool_name']
698 def queue_migration_execute(self
, image_spec
: str) -> Tuple
[int, str, str]:
699 image_spec
= self
.extract_image_spec(image_spec
)
701 authorize_request(self
.module
, image_spec
[0], image_spec
[1])
702 self
.log
.info("queue_migration_execute: {}".format(image_spec
))
704 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_MIGRATION_EXECUTE
,
705 TASK_REF_POOL_NAME
: image_spec
[0],
706 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
707 TASK_REF_IMAGE_NAME
: image_spec
[2]}
709 with self
.open_ioctx(image_spec
[:2]) as ioctx
:
710 status
= self
.get_migration_status(ioctx
, image_spec
)
712 refs
[TASK_REF_IMAGE_ID
] = status
['dest_image_id']
714 task
= self
.find_task(refs
)
716 return 0, task
.to_json(), ''
718 self
.validate_image_migrating(image_spec
, status
)
720 if status
['state'] not in [rbd
.RBD_IMAGE_MIGRATION_STATE_PREPARED
,
721 rbd
.RBD_IMAGE_MIGRATION_STATE_EXECUTING
]:
722 raise rbd
.InvalidArgument("Image {} is not in ready state".format(
723 self
.format_image_spec(image_spec
)), errno
=errno
.EINVAL
)
725 source_pool
= self
.resolve_pool_name(status
['source_pool_id'])
726 dest_pool
= self
.resolve_pool_name(status
['dest_pool_id'])
727 return 0, self
.add_task(ioctx
,
728 "Migrating image {} to {}".format(
729 self
.format_image_spec((source_pool
,
730 status
['source_pool_namespace'],
731 status
['source_image_name'])),
732 self
.format_image_spec((dest_pool
,
733 status
['dest_pool_namespace'],
734 status
['dest_image_name']))),
737 def queue_migration_commit(self
, image_spec
: str) -> Tuple
[int, str, str]:
738 image_spec
= self
.extract_image_spec(image_spec
)
740 authorize_request(self
.module
, image_spec
[0], image_spec
[1])
741 self
.log
.info("queue_migration_commit: {}".format(image_spec
))
743 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_MIGRATION_COMMIT
,
744 TASK_REF_POOL_NAME
: image_spec
[0],
745 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
746 TASK_REF_IMAGE_NAME
: image_spec
[2]}
748 with self
.open_ioctx(image_spec
[:2]) as ioctx
:
749 status
= self
.get_migration_status(ioctx
, image_spec
)
751 refs
[TASK_REF_IMAGE_ID
] = status
['dest_image_id']
753 task
= self
.find_task(refs
)
755 return 0, task
.to_json(), ''
757 self
.validate_image_migrating(image_spec
, status
)
759 if status
['state'] != rbd
.RBD_IMAGE_MIGRATION_STATE_EXECUTED
:
760 raise rbd
.InvalidArgument("Image {} has not completed migration".format(
761 self
.format_image_spec(image_spec
)), errno
=errno
.EINVAL
)
763 return 0, self
.add_task(ioctx
,
764 "Committing image migration for {}".format(
765 self
.format_image_spec(image_spec
)),
768 def queue_migration_abort(self
, image_spec
: str) -> Tuple
[int, str, str]:
769 image_spec
= self
.extract_image_spec(image_spec
)
771 authorize_request(self
.module
, image_spec
[0], image_spec
[1])
772 self
.log
.info("queue_migration_abort: {}".format(image_spec
))
774 refs
= {TASK_REF_ACTION
: TASK_REF_ACTION_MIGRATION_ABORT
,
775 TASK_REF_POOL_NAME
: image_spec
[0],
776 TASK_REF_POOL_NAMESPACE
: image_spec
[1],
777 TASK_REF_IMAGE_NAME
: image_spec
[2]}
779 with self
.open_ioctx(image_spec
[:2]) as ioctx
:
780 status
= self
.get_migration_status(ioctx
, image_spec
)
782 refs
[TASK_REF_IMAGE_ID
] = status
['dest_image_id']
784 task
= self
.find_task(refs
)
786 return 0, task
.to_json(), ''
788 self
.validate_image_migrating(image_spec
, status
)
789 return 0, self
.add_task(ioctx
,
790 "Aborting image migration for {}".format(
791 self
.format_image_spec(image_spec
)),
794 def task_cancel(self
, task_id
: str) -> Tuple
[int, str, str]:
795 self
.log
.info("task_cancel: {}".format(task_id
))
797 task
= self
.tasks_by_id
.get(task_id
)
798 if not task
or not is_authorized(self
.module
,
799 task
.refs
[TASK_REF_POOL_NAME
],
800 task
.refs
[TASK_REF_POOL_NAMESPACE
]):
801 return -errno
.ENOENT
, '', "No such task {}".format(task_id
)
805 remove_in_memory
= True
806 if self
.in_progress_task
and self
.in_progress_task
.task_id
== task_id
:
807 self
.log
.info("Attempting to cancel in-progress task: {}".format(str(self
.in_progress_task
)))
808 remove_in_memory
= False
810 # complete any associated event in the progress module
811 self
.complete_progress(task
)
813 # remove from rbd_task omap
814 with self
.open_ioctx((task
.refs
[TASK_REF_POOL_NAME
],
815 task
.refs
[TASK_REF_POOL_NAMESPACE
])) as ioctx
:
816 self
.remove_task(ioctx
, task
, remove_in_memory
)
820 def task_list(self
, task_id
: Optional
[str]) -> Tuple
[int, str, str]:
821 self
.log
.info("task_list: {}".format(task_id
))
824 task
= self
.tasks_by_id
.get(task_id
)
825 if not task
or not is_authorized(self
.module
,
826 task
.refs
[TASK_REF_POOL_NAME
],
827 task
.refs
[TASK_REF_POOL_NAMESPACE
]):
828 return -errno
.ENOENT
, '', "No such task {}".format(task_id
)
830 return 0, json
.dumps(task
.to_dict(), indent
=4, sort_keys
=True), ""
833 for sequence
in sorted(self
.tasks_by_sequence
.keys()):
834 task
= self
.tasks_by_sequence
[sequence
]
835 if is_authorized(self
.module
,
836 task
.refs
[TASK_REF_POOL_NAME
],
837 task
.refs
[TASK_REF_POOL_NAMESPACE
]):
838 tasks
.append(task
.to_dict())
840 return 0, json
.dumps(tasks
, indent
=4, sort_keys
=True), ""