]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rbd_support/task.py
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / pybind / mgr / rbd_support / task.py
CommitLineData
9f95a23c
TL
1import errno
2import json
3import rados
4import rbd
5import re
6import traceback
7import uuid
8
9from contextlib import contextmanager
10from datetime import datetime, timedelta
11from functools import partial, wraps
12from threading import Condition, Lock, Thread
20effc67 13from typing import cast, Any, Callable, Dict, Iterator, List, Optional, Tuple, TypeVar
9f95a23c
TL
14
15from .common import (authorize_request, extract_pool_key, get_rbd_pools,
20effc67 16 is_authorized, GLOBAL_POOL_KEY)
9f95a23c
TL
17
18
19RBD_TASK_OID = "rbd_task"
20
21TASK_SEQUENCE = "sequence"
22TASK_ID = "id"
23TASK_REFS = "refs"
24TASK_MESSAGE = "message"
adb31ebb 25TASK_RETRY_ATTEMPTS = "retry_attempts"
9f95a23c 26TASK_RETRY_TIME = "retry_time"
adb31ebb 27TASK_RETRY_MESSAGE = "retry_message"
9f95a23c
TL
28TASK_IN_PROGRESS = "in_progress"
29TASK_PROGRESS = "progress"
30TASK_CANCELED = "canceled"
31
32TASK_REF_POOL_NAME = "pool_name"
33TASK_REF_POOL_NAMESPACE = "pool_namespace"
34TASK_REF_IMAGE_NAME = "image_name"
35TASK_REF_IMAGE_ID = "image_id"
36TASK_REF_ACTION = "action"
37
38TASK_REF_ACTION_FLATTEN = "flatten"
39TASK_REF_ACTION_REMOVE = "remove"
40TASK_REF_ACTION_TRASH_REMOVE = "trash remove"
41TASK_REF_ACTION_MIGRATION_EXECUTE = "migrate execute"
42TASK_REF_ACTION_MIGRATION_COMMIT = "migrate commit"
43TASK_REF_ACTION_MIGRATION_ABORT = "migrate abort"
44
45VALID_TASK_ACTIONS = [TASK_REF_ACTION_FLATTEN,
46 TASK_REF_ACTION_REMOVE,
47 TASK_REF_ACTION_TRASH_REMOVE,
48 TASK_REF_ACTION_MIGRATION_EXECUTE,
49 TASK_REF_ACTION_MIGRATION_COMMIT,
50 TASK_REF_ACTION_MIGRATION_ABORT]
51
52TASK_RETRY_INTERVAL = timedelta(seconds=30)
adb31ebb 53TASK_MAX_RETRY_INTERVAL = timedelta(seconds=300)
9f95a23c
TL
54MAX_COMPLETED_TASKS = 50
55
56
20effc67
TL
57T = TypeVar('T')
58FuncT = TypeVar('FuncT', bound=Callable[..., Any])
59
60
9f95a23c 61class Throttle:
20effc67 62 def __init__(self: Any, throttle_period: timedelta) -> None:
9f95a23c
TL
63 self.throttle_period = throttle_period
64 self.time_of_last_call = datetime.min
65
20effc67 66 def __call__(self: 'Throttle', fn: FuncT) -> FuncT:
9f95a23c 67 @wraps(fn)
20effc67 68 def wrapper(*args: Any, **kwargs: Any) -> Any:
9f95a23c
TL
69 now = datetime.now()
70 if self.time_of_last_call + self.throttle_period <= now:
71 self.time_of_last_call = now
72 return fn(*args, **kwargs)
20effc67
TL
73 return cast(FuncT, wrapper)
74
75
76TaskRefsT = Dict[str, str]
9f95a23c
TL
77
78
79class Task:
20effc67 80 def __init__(self, sequence: int, task_id: str, message: str, refs: TaskRefsT):
9f95a23c
TL
81 self.sequence = sequence
82 self.task_id = task_id
83 self.message = message
84 self.refs = refs
20effc67 85 self.retry_message: Optional[str] = None
adb31ebb 86 self.retry_attempts = 0
20effc67 87 self.retry_time: Optional[datetime] = None
9f95a23c
TL
88 self.in_progress = False
89 self.progress = 0.0
90 self.canceled = False
91 self.failed = False
adb31ebb 92 self.progress_posted = False
9f95a23c 93
20effc67 94 def __str__(self) -> str:
9f95a23c
TL
95 return self.to_json()
96
97 @property
20effc67
TL
98 def sequence_key(self) -> bytes:
99 return "{0:016X}".format(self.sequence).encode()
9f95a23c 100
20effc67 101 def cancel(self) -> None:
9f95a23c
TL
102 self.canceled = True
103 self.fail("Operation canceled")
104
20effc67 105 def fail(self, message: str) -> None:
9f95a23c
TL
106 self.failed = True
107 self.failure_message = message
108
20effc67 109 def to_dict(self) -> Dict[str, Any]:
9f95a23c
TL
110 d = {TASK_SEQUENCE: self.sequence,
111 TASK_ID: self.task_id,
112 TASK_MESSAGE: self.message,
113 TASK_REFS: self.refs
114 }
adb31ebb
TL
115 if self.retry_message:
116 d[TASK_RETRY_MESSAGE] = self.retry_message
117 if self.retry_attempts:
118 d[TASK_RETRY_ATTEMPTS] = self.retry_attempts
9f95a23c
TL
119 if self.retry_time:
120 d[TASK_RETRY_TIME] = self.retry_time.isoformat()
121 if self.in_progress:
122 d[TASK_IN_PROGRESS] = True
123 d[TASK_PROGRESS] = self.progress
124 if self.canceled:
125 d[TASK_CANCELED] = True
126 return d
127
20effc67 128 def to_json(self) -> str:
9f95a23c
TL
129 return str(json.dumps(self.to_dict()))
130
131 @classmethod
20effc67 132 def from_json(cls, val: str) -> 'Task':
9f95a23c
TL
133 try:
134 d = json.loads(val)
135 action = d.get(TASK_REFS, {}).get(TASK_REF_ACTION)
136 if action not in VALID_TASK_ACTIONS:
137 raise ValueError("Invalid task action: {}".format(action))
138
139 return Task(d[TASK_SEQUENCE], d[TASK_ID], d[TASK_MESSAGE], d[TASK_REFS])
140 except json.JSONDecodeError as e:
141 raise ValueError("Invalid JSON ({})".format(str(e)))
142 except KeyError as e:
143 raise ValueError("Invalid task format (missing key {})".format(str(e)))
144
145
20effc67
TL
146# pool_name, namespace, image_name
147ImageSpecT = Tuple[str, str, str]
148# pool_name, namespace
149PoolSpecT = Tuple[str, str]
150MigrationStatusT = Dict[str, str]
151
1e59de90 152
9f95a23c
TL
153class TaskHandler:
154 lock = Lock()
155 condition = Condition(lock)
9f95a23c
TL
156
157 in_progress_task = None
20effc67
TL
158 tasks_by_sequence: Dict[int, Task] = dict()
159 tasks_by_id: Dict[str, Task] = dict()
9f95a23c 160
20effc67 161 completed_tasks: List[Task] = []
9f95a23c
TL
162
163 sequence = 0
164
20effc67 165 def __init__(self, module: Any) -> None:
9f95a23c
TL
166 self.module = module
167 self.log = module.log
168
1e59de90
TL
169 self.stop_thread = False
170 self.thread = Thread(target=self.run)
171
172 def setup(self) -> None:
9f95a23c
TL
173 with self.lock:
174 self.init_task_queue()
9f95a23c
TL
175 self.thread.start()
176
177 @property
20effc67 178 def default_pool_name(self) -> str:
9f95a23c
TL
179 return self.module.get_ceph_option("rbd_default_pool")
180
20effc67 181 def extract_pool_spec(self, pool_spec: str) -> PoolSpecT:
9f95a23c
TL
182 pool_spec = extract_pool_key(pool_spec)
183 if pool_spec == GLOBAL_POOL_KEY:
184 pool_spec = (self.default_pool_name, '')
20effc67 185 return cast(PoolSpecT, pool_spec)
9f95a23c 186
20effc67 187 def extract_image_spec(self, image_spec: str) -> ImageSpecT:
9f95a23c
TL
188 match = re.match(r'^(?:([^/]+)/(?:([^/]+)/)?)?([^/@]+)$',
189 image_spec or '')
190 if not match:
191 raise ValueError("Invalid image spec: {}".format(image_spec))
192 return (match.group(1) or self.default_pool_name, match.group(2) or '',
193 match.group(3))
194
1e59de90
TL
195 def shutdown(self) -> None:
196 self.log.info("TaskHandler: shutting down")
197 self.stop_thread = True
198 if self.thread.is_alive():
199 self.log.debug("TaskHandler: joining thread")
200 self.thread.join()
201 self.log.info("TaskHandler: shut down")
202
20effc67 203 def run(self) -> None:
9f95a23c
TL
204 try:
205 self.log.info("TaskHandler: starting")
1e59de90 206 while not self.stop_thread:
9f95a23c
TL
207 with self.lock:
208 now = datetime.now()
209 for sequence in sorted([sequence for sequence, task
210 in self.tasks_by_sequence.items()
211 if not task.retry_time or task.retry_time <= now]):
212 self.execute_task(sequence)
213
214 self.condition.wait(5)
215 self.log.debug("TaskHandler: tick")
216
1e59de90
TL
217 except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
218 self.log.exception("TaskHandler: client blocklisted")
219 self.module.client_blocklisted.set()
9f95a23c
TL
220 except Exception as ex:
221 self.log.fatal("Fatal runtime error: {}\n{}".format(
222 ex, traceback.format_exc()))
223
224 @contextmanager
20effc67 225 def open_ioctx(self, spec: PoolSpecT) -> Iterator[rados.Ioctx]:
9f95a23c
TL
226 try:
227 with self.module.rados.open_ioctx(spec[0]) as ioctx:
228 ioctx.set_namespace(spec[1])
229 yield ioctx
230 except rados.ObjectNotFound:
231 self.log.error("Failed to locate pool {}".format(spec[0]))
232 raise
233
234 @classmethod
20effc67 235 def format_image_spec(cls, image_spec: ImageSpecT) -> str:
9f95a23c
TL
236 image = image_spec[2]
237 if image_spec[1]:
238 image = "{}/{}".format(image_spec[1], image)
239 if image_spec[0]:
240 image = "{}/{}".format(image_spec[0], image)
241 return image
242
20effc67 243 def init_task_queue(self) -> None:
9f95a23c
TL
244 for pool_id, pool_name in get_rbd_pools(self.module).items():
245 try:
246 with self.module.rados.open_ioctx2(int(pool_id)) as ioctx:
247 self.load_task_queue(ioctx, pool_name)
248
249 try:
250 namespaces = rbd.RBD().namespace_list(ioctx)
251 except rbd.OperationNotSupported:
252 self.log.debug("Namespaces not supported")
253 continue
254
255 for namespace in namespaces:
256 ioctx.set_namespace(namespace)
257 self.load_task_queue(ioctx, pool_name)
258
259 except rados.ObjectNotFound:
260 # pool DNE
261 pass
262
263 if self.tasks_by_sequence:
264 self.sequence = list(sorted(self.tasks_by_sequence.keys()))[-1]
265
266 self.log.debug("sequence={}, tasks_by_sequence={}, tasks_by_id={}".format(
267 self.sequence, str(self.tasks_by_sequence), str(self.tasks_by_id)))
268
20effc67 269 def load_task_queue(self, ioctx: rados.Ioctx, pool_name: str) -> None:
9f95a23c
TL
270 pool_spec = pool_name
271 if ioctx.nspace:
272 pool_spec += "/{}".format(ioctx.nspace)
273
274 start_after = ''
275 try:
276 while True:
277 with rados.ReadOpCtx() as read_op:
278 self.log.info("load_task_task: {}, start_after={}".format(
279 pool_spec, start_after))
280 it, ret = ioctx.get_omap_vals(read_op, start_after, "", 128)
281 ioctx.operate_read_op(read_op, RBD_TASK_OID)
282
283 it = list(it)
284 for k, v in it:
285 start_after = k
286 v = v.decode()
287 self.log.info("load_task_task: task={}".format(v))
288
289 try:
290 task = Task.from_json(v)
291 self.append_task(task)
292 except ValueError:
293 self.log.error("Failed to decode task: pool_spec={}, task={}".format(pool_spec, v))
294
295 if not it:
296 break
297
298 except StopIteration:
299 pass
300 except rados.ObjectNotFound:
301 # rbd_task DNE
302 pass
303
20effc67 304 def append_task(self, task: Task) -> None:
9f95a23c
TL
305 self.tasks_by_sequence[task.sequence] = task
306 self.tasks_by_id[task.task_id] = task
307
20effc67 308 def task_refs_match(self, task_refs: TaskRefsT, refs: TaskRefsT) -> bool:
9f95a23c
TL
309 if TASK_REF_IMAGE_ID not in refs and TASK_REF_IMAGE_ID in task_refs:
310 task_refs = task_refs.copy()
311 del task_refs[TASK_REF_IMAGE_ID]
312
313 self.log.debug("task_refs_match: ref1={}, ref2={}".format(task_refs, refs))
314 return task_refs == refs
315
20effc67 316 def find_task(self, refs: TaskRefsT) -> Optional[Task]:
9f95a23c
TL
317 self.log.debug("find_task: refs={}".format(refs))
318
319 # search for dups and return the original
320 for task_id in reversed(sorted(self.tasks_by_id.keys())):
321 task = self.tasks_by_id[task_id]
322 if self.task_refs_match(task.refs, refs):
323 return task
324
325 # search for a completed task (message replay)
326 for task in reversed(self.completed_tasks):
327 if self.task_refs_match(task.refs, refs):
328 return task
20effc67
TL
329 else:
330 return None
9f95a23c 331
20effc67
TL
332 def add_task(self,
333 ioctx: rados.Ioctx,
334 message: str,
335 refs: TaskRefsT) -> str:
9f95a23c
TL
336 self.log.debug("add_task: message={}, refs={}".format(message, refs))
337
338 # ensure unique uuid across all pools
339 while True:
340 task_id = str(uuid.uuid4())
341 if task_id not in self.tasks_by_id:
342 break
343
344 self.sequence += 1
345 task = Task(self.sequence, task_id, message, refs)
346
347 # add the task to the rbd_task omap
348 task_json = task.to_json()
349 omap_keys = (task.sequence_key, )
350 omap_vals = (str.encode(task_json), )
20effc67
TL
351 self.log.info("adding task: %s %s",
352 omap_keys[0].decode(),
353 omap_vals[0].decode())
9f95a23c
TL
354
355 with rados.WriteOpCtx() as write_op:
356 ioctx.set_omap(write_op, omap_keys, omap_vals)
357 ioctx.operate_write_op(write_op, RBD_TASK_OID)
358 self.append_task(task)
359
360 self.condition.notify()
361 return task_json
362
20effc67 363 def remove_task(self,
39ae355f 364 ioctx: Optional[rados.Ioctx],
20effc67
TL
365 task: Task,
366 remove_in_memory: bool = True) -> None:
9f95a23c 367 self.log.info("remove_task: task={}".format(str(task)))
39ae355f
TL
368 if ioctx:
369 try:
370 with rados.WriteOpCtx() as write_op:
371 omap_keys = (task.sequence_key, )
372 ioctx.remove_omap_keys(write_op, omap_keys)
373 ioctx.operate_write_op(write_op, RBD_TASK_OID)
374 except rados.ObjectNotFound:
375 pass
9f95a23c
TL
376
377 if remove_in_memory:
378 try:
379 del self.tasks_by_id[task.task_id]
380 del self.tasks_by_sequence[task.sequence]
381
382 # keep a record of the last N tasks to help avoid command replay
383 # races
384 if not task.failed and not task.canceled:
385 self.log.debug("remove_task: moving to completed tasks")
386 self.completed_tasks.append(task)
387 self.completed_tasks = self.completed_tasks[-MAX_COMPLETED_TASKS:]
388
389 except KeyError:
390 pass
391
20effc67 392 def execute_task(self, sequence: int) -> None:
9f95a23c
TL
393 task = self.tasks_by_sequence[sequence]
394 self.log.info("execute_task: task={}".format(str(task)))
395
396 pool_valid = False
397 try:
398 with self.open_ioctx((task.refs[TASK_REF_POOL_NAME],
399 task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx:
400 pool_valid = True
401
402 action = task.refs[TASK_REF_ACTION]
403 execute_fn = {TASK_REF_ACTION_FLATTEN: self.execute_flatten,
404 TASK_REF_ACTION_REMOVE: self.execute_remove,
405 TASK_REF_ACTION_TRASH_REMOVE: self.execute_trash_remove,
406 TASK_REF_ACTION_MIGRATION_EXECUTE: self.execute_migration_execute,
407 TASK_REF_ACTION_MIGRATION_COMMIT: self.execute_migration_commit,
408 TASK_REF_ACTION_MIGRATION_ABORT: self.execute_migration_abort
409 }.get(action)
410 if not execute_fn:
411 self.log.error("Invalid task action: {}".format(action))
412 else:
413 task.in_progress = True
414 self.in_progress_task = task
9f95a23c
TL
415
416 self.lock.release()
417 try:
418 execute_fn(ioctx, task)
419
420 except rbd.OperationCanceled:
421 self.log.info("Operation canceled: task={}".format(
422 str(task)))
423
424 finally:
425 self.lock.acquire()
426
427 task.in_progress = False
428 self.in_progress_task = None
429
430 self.complete_progress(task)
431 self.remove_task(ioctx, task)
432
433 except rados.ObjectNotFound as e:
434 self.log.error("execute_task: {}".format(e))
435 if pool_valid:
adb31ebb 436 task.retry_message = "{}".format(e)
9f95a23c
TL
437 self.update_progress(task, 0)
438 else:
39ae355f 439 # pool DNE -- remove in-memory task
9f95a23c 440 self.complete_progress(task)
39ae355f 441 self.remove_task(None, task)
9f95a23c 442
1e59de90
TL
443 except (rados.ConnectionShutdown, rbd.ConnectionShutdown):
444 raise
445
9f95a23c
TL
446 except (rados.Error, rbd.Error) as e:
447 self.log.error("execute_task: {}".format(e))
adb31ebb 448 task.retry_message = "{}".format(e)
9f95a23c
TL
449 self.update_progress(task, 0)
450
451 finally:
452 task.in_progress = False
adb31ebb
TL
453 task.retry_attempts += 1
454 task.retry_time = datetime.now() + min(
455 TASK_RETRY_INTERVAL * task.retry_attempts,
456 TASK_MAX_RETRY_INTERVAL)
9f95a23c 457
20effc67 458 def progress_callback(self, task: Task, current: int, total: int) -> int:
9f95a23c
TL
459 progress = float(current) / float(total)
460 self.log.debug("progress_callback: task={}, progress={}".format(
461 str(task), progress))
462
463 # avoid deadlocking when a new command comes in during a progress callback
464 if not self.lock.acquire(False):
465 return 0
466
467 try:
468 if not self.in_progress_task or self.in_progress_task.canceled:
469 return -rbd.ECANCELED
470 self.in_progress_task.progress = progress
471 finally:
472 self.lock.release()
473
adb31ebb
TL
474 if not task.progress_posted:
475 # delayed creation of progress event until first callback
476 self.post_progress(task, progress)
477 else:
478 self.throttled_update_progress(task, progress)
479
9f95a23c
TL
480 return 0
481
20effc67 482 def execute_flatten(self, ioctx: rados.Ioctx, task: Task) -> None:
9f95a23c
TL
483 self.log.info("execute_flatten: task={}".format(str(task)))
484
485 try:
486 with rbd.Image(ioctx, task.refs[TASK_REF_IMAGE_NAME]) as image:
487 image.flatten(on_progress=partial(self.progress_callback, task))
488 except rbd.InvalidArgument:
489 task.fail("Image does not have parent")
490 self.log.info("{}: task={}".format(task.failure_message, str(task)))
491 except rbd.ImageNotFound:
492 task.fail("Image does not exist")
493 self.log.info("{}: task={}".format(task.failure_message, str(task)))
494
20effc67 495 def execute_remove(self, ioctx: rados.Ioctx, task: Task) -> None:
9f95a23c
TL
496 self.log.info("execute_remove: task={}".format(str(task)))
497
498 try:
499 rbd.RBD().remove(ioctx, task.refs[TASK_REF_IMAGE_NAME],
500 on_progress=partial(self.progress_callback, task))
501 except rbd.ImageNotFound:
502 task.fail("Image does not exist")
503 self.log.info("{}: task={}".format(task.failure_message, str(task)))
504
20effc67 505 def execute_trash_remove(self, ioctx: rados.Ioctx, task: Task) -> None:
9f95a23c
TL
506 self.log.info("execute_trash_remove: task={}".format(str(task)))
507
508 try:
509 rbd.RBD().trash_remove(ioctx, task.refs[TASK_REF_IMAGE_ID],
510 on_progress=partial(self.progress_callback, task))
511 except rbd.ImageNotFound:
512 task.fail("Image does not exist")
513 self.log.info("{}: task={}".format(task.failure_message, str(task)))
514
20effc67 515 def execute_migration_execute(self, ioctx: rados.Ioctx, task: Task) -> None:
9f95a23c
TL
516 self.log.info("execute_migration_execute: task={}".format(str(task)))
517
518 try:
519 rbd.RBD().migration_execute(ioctx, task.refs[TASK_REF_IMAGE_NAME],
520 on_progress=partial(self.progress_callback, task))
521 except rbd.ImageNotFound:
522 task.fail("Image does not exist")
523 self.log.info("{}: task={}".format(task.failure_message, str(task)))
524 except rbd.InvalidArgument:
525 task.fail("Image is not migrating")
526 self.log.info("{}: task={}".format(task.failure_message, str(task)))
527
20effc67 528 def execute_migration_commit(self, ioctx: rados.Ioctx, task: Task) -> None:
9f95a23c
TL
529 self.log.info("execute_migration_commit: task={}".format(str(task)))
530
531 try:
532 rbd.RBD().migration_commit(ioctx, task.refs[TASK_REF_IMAGE_NAME],
533 on_progress=partial(self.progress_callback, task))
534 except rbd.ImageNotFound:
535 task.fail("Image does not exist")
536 self.log.info("{}: task={}".format(task.failure_message, str(task)))
537 except rbd.InvalidArgument:
538 task.fail("Image is not migrating or migration not executed")
539 self.log.info("{}: task={}".format(task.failure_message, str(task)))
540
20effc67 541 def execute_migration_abort(self, ioctx: rados.Ioctx, task: Task) -> None:
9f95a23c
TL
542 self.log.info("execute_migration_abort: task={}".format(str(task)))
543
544 try:
545 rbd.RBD().migration_abort(ioctx, task.refs[TASK_REF_IMAGE_NAME],
546 on_progress=partial(self.progress_callback, task))
547 except rbd.ImageNotFound:
548 task.fail("Image does not exist")
549 self.log.info("{}: task={}".format(task.failure_message, str(task)))
550 except rbd.InvalidArgument:
551 task.fail("Image is not migrating")
552 self.log.info("{}: task={}".format(task.failure_message, str(task)))
553
20effc67 554 def complete_progress(self, task: Task) -> None:
adb31ebb
TL
555 if not task.progress_posted:
556 # ensure progress event exists before we complete/fail it
557 self.post_progress(task, 0)
558
9f95a23c
TL
559 self.log.debug("complete_progress: task={}".format(str(task)))
560 try:
561 if task.failed:
562 self.module.remote("progress", "fail", task.task_id,
563 task.failure_message)
564 else:
565 self.module.remote("progress", "complete", task.task_id)
566 except ImportError:
567 # progress module is disabled
568 pass
569
20effc67 570 def _update_progress(self, task: Task, progress: float) -> None:
9f95a23c
TL
571 self.log.debug("update_progress: task={}, progress={}".format(str(task), progress))
572 try:
573 refs = {"origin": "rbd_support"}
574 refs.update(task.refs)
575
576 self.module.remote("progress", "update", task.task_id,
577 task.message, progress, refs)
578 except ImportError:
579 # progress module is disabled
580 pass
581
20effc67 582 def post_progress(self, task: Task, progress: float) -> None:
adb31ebb
TL
583 self._update_progress(task, progress)
584 task.progress_posted = True
585
20effc67 586 def update_progress(self, task: Task, progress: float) -> None:
adb31ebb
TL
587 if task.progress_posted:
588 self._update_progress(task, progress)
589
9f95a23c 590 @Throttle(timedelta(seconds=1))
20effc67 591 def throttled_update_progress(self, task: Task, progress: float) -> None:
9f95a23c
TL
592 self.update_progress(task, progress)
593
20effc67 594 def queue_flatten(self, image_spec: str) -> Tuple[int, str, str]:
9f95a23c
TL
595 image_spec = self.extract_image_spec(image_spec)
596
597 authorize_request(self.module, image_spec[0], image_spec[1])
598 self.log.info("queue_flatten: {}".format(image_spec))
599
600 refs = {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN,
601 TASK_REF_POOL_NAME: image_spec[0],
602 TASK_REF_POOL_NAMESPACE: image_spec[1],
603 TASK_REF_IMAGE_NAME: image_spec[2]}
604
20effc67 605 with self.open_ioctx(image_spec[:2]) as ioctx:
9f95a23c
TL
606 try:
607 with rbd.Image(ioctx, image_spec[2]) as image:
608 refs[TASK_REF_IMAGE_ID] = image.id()
609
610 try:
611 parent_image_id = image.parent_id()
612 except rbd.ImageNotFound:
613 parent_image_id = None
614
615 except rbd.ImageNotFound:
616 pass
617
618 task = self.find_task(refs)
619 if task:
620 return 0, task.to_json(), ''
621
622 if TASK_REF_IMAGE_ID not in refs:
623 raise rbd.ImageNotFound("Image {} does not exist".format(
624 self.format_image_spec(image_spec)), errno=errno.ENOENT)
625 if not parent_image_id:
626 raise rbd.ImageNotFound("Image {} does not have a parent".format(
627 self.format_image_spec(image_spec)), errno=errno.ENOENT)
628
629 return 0, self.add_task(ioctx,
630 "Flattening image {}".format(
631 self.format_image_spec(image_spec)),
632 refs), ""
633
20effc67 634 def queue_remove(self, image_spec: str) -> Tuple[int, str, str]:
9f95a23c
TL
635 image_spec = self.extract_image_spec(image_spec)
636
637 authorize_request(self.module, image_spec[0], image_spec[1])
638 self.log.info("queue_remove: {}".format(image_spec))
639
640 refs = {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE,
641 TASK_REF_POOL_NAME: image_spec[0],
642 TASK_REF_POOL_NAMESPACE: image_spec[1],
643 TASK_REF_IMAGE_NAME: image_spec[2]}
644
20effc67 645 with self.open_ioctx(image_spec[:2]) as ioctx:
9f95a23c
TL
646 try:
647 with rbd.Image(ioctx, image_spec[2]) as image:
648 refs[TASK_REF_IMAGE_ID] = image.id()
649 snaps = list(image.list_snaps())
650
651 except rbd.ImageNotFound:
652 pass
653
654 task = self.find_task(refs)
655 if task:
656 return 0, task.to_json(), ''
657
658 if TASK_REF_IMAGE_ID not in refs:
659 raise rbd.ImageNotFound("Image {} does not exist".format(
660 self.format_image_spec(image_spec)), errno=errno.ENOENT)
661 if snaps:
662 raise rbd.ImageBusy("Image {} has snapshots".format(
663 self.format_image_spec(image_spec)), errno=errno.EBUSY)
664
665 return 0, self.add_task(ioctx,
666 "Removing image {}".format(
667 self.format_image_spec(image_spec)),
668 refs), ''
669
20effc67 670 def queue_trash_remove(self, image_id_spec: str) -> Tuple[int, str, str]:
9f95a23c
TL
671 image_id_spec = self.extract_image_spec(image_id_spec)
672
673 authorize_request(self.module, image_id_spec[0], image_id_spec[1])
674 self.log.info("queue_trash_remove: {}".format(image_id_spec))
675
676 refs = {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE,
677 TASK_REF_POOL_NAME: image_id_spec[0],
678 TASK_REF_POOL_NAMESPACE: image_id_spec[1],
679 TASK_REF_IMAGE_ID: image_id_spec[2]}
680 task = self.find_task(refs)
681 if task:
682 return 0, task.to_json(), ''
683
684 # verify that image exists in trash
20effc67 685 with self.open_ioctx(image_id_spec[:2]) as ioctx:
9f95a23c
TL
686 rbd.RBD().trash_get(ioctx, image_id_spec[2])
687
688 return 0, self.add_task(ioctx,
689 "Removing image {} from trash".format(
690 self.format_image_spec(image_id_spec)),
691 refs), ''
692
20effc67
TL
693 def get_migration_status(self,
694 ioctx: rados.Ioctx,
695 image_spec: ImageSpecT) -> Optional[MigrationStatusT]:
9f95a23c
TL
696 try:
697 return rbd.RBD().migration_status(ioctx, image_spec[2])
698 except (rbd.InvalidArgument, rbd.ImageNotFound):
699 return None
700
20effc67
TL
701 def validate_image_migrating(self,
702 image_spec: ImageSpecT,
703 migration_status: Optional[MigrationStatusT]) -> None:
9f95a23c
TL
704 if not migration_status:
705 raise rbd.InvalidArgument("Image {} is not migrating".format(
706 self.format_image_spec(image_spec)), errno=errno.EINVAL)
707
20effc67 708 def resolve_pool_name(self, pool_id: str) -> str:
9f95a23c
TL
709 osd_map = self.module.get('osd_map')
710 for pool in osd_map['pools']:
711 if pool['pool'] == pool_id:
712 return pool['pool_name']
713 return '<unknown>'
714
20effc67 715 def queue_migration_execute(self, image_spec: str) -> Tuple[int, str, str]:
9f95a23c
TL
716 image_spec = self.extract_image_spec(image_spec)
717
718 authorize_request(self.module, image_spec[0], image_spec[1])
719 self.log.info("queue_migration_execute: {}".format(image_spec))
720
721 refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE,
722 TASK_REF_POOL_NAME: image_spec[0],
723 TASK_REF_POOL_NAMESPACE: image_spec[1],
724 TASK_REF_IMAGE_NAME: image_spec[2]}
725
20effc67 726 with self.open_ioctx(image_spec[:2]) as ioctx:
9f95a23c
TL
727 status = self.get_migration_status(ioctx, image_spec)
728 if status:
729 refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
730
731 task = self.find_task(refs)
732 if task:
733 return 0, task.to_json(), ''
734
735 self.validate_image_migrating(image_spec, status)
20effc67 736 assert status
9f95a23c
TL
737 if status['state'] not in [rbd.RBD_IMAGE_MIGRATION_STATE_PREPARED,
738 rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTING]:
739 raise rbd.InvalidArgument("Image {} is not in ready state".format(
740 self.format_image_spec(image_spec)), errno=errno.EINVAL)
741
742 source_pool = self.resolve_pool_name(status['source_pool_id'])
743 dest_pool = self.resolve_pool_name(status['dest_pool_id'])
744 return 0, self.add_task(ioctx,
745 "Migrating image {} to {}".format(
746 self.format_image_spec((source_pool,
747 status['source_pool_namespace'],
748 status['source_image_name'])),
749 self.format_image_spec((dest_pool,
750 status['dest_pool_namespace'],
751 status['dest_image_name']))),
752 refs), ''
753
20effc67 754 def queue_migration_commit(self, image_spec: str) -> Tuple[int, str, str]:
9f95a23c
TL
755 image_spec = self.extract_image_spec(image_spec)
756
757 authorize_request(self.module, image_spec[0], image_spec[1])
758 self.log.info("queue_migration_commit: {}".format(image_spec))
759
760 refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT,
761 TASK_REF_POOL_NAME: image_spec[0],
762 TASK_REF_POOL_NAMESPACE: image_spec[1],
763 TASK_REF_IMAGE_NAME: image_spec[2]}
764
20effc67 765 with self.open_ioctx(image_spec[:2]) as ioctx:
9f95a23c
TL
766 status = self.get_migration_status(ioctx, image_spec)
767 if status:
768 refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
769
770 task = self.find_task(refs)
771 if task:
772 return 0, task.to_json(), ''
773
774 self.validate_image_migrating(image_spec, status)
20effc67 775 assert status
9f95a23c
TL
776 if status['state'] != rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTED:
777 raise rbd.InvalidArgument("Image {} has not completed migration".format(
778 self.format_image_spec(image_spec)), errno=errno.EINVAL)
779
780 return 0, self.add_task(ioctx,
781 "Committing image migration for {}".format(
782 self.format_image_spec(image_spec)),
783 refs), ''
784
20effc67 785 def queue_migration_abort(self, image_spec: str) -> Tuple[int, str, str]:
9f95a23c
TL
786 image_spec = self.extract_image_spec(image_spec)
787
788 authorize_request(self.module, image_spec[0], image_spec[1])
789 self.log.info("queue_migration_abort: {}".format(image_spec))
790
791 refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT,
792 TASK_REF_POOL_NAME: image_spec[0],
793 TASK_REF_POOL_NAMESPACE: image_spec[1],
794 TASK_REF_IMAGE_NAME: image_spec[2]}
795
20effc67 796 with self.open_ioctx(image_spec[:2]) as ioctx:
9f95a23c
TL
797 status = self.get_migration_status(ioctx, image_spec)
798 if status:
799 refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
800
801 task = self.find_task(refs)
802 if task:
803 return 0, task.to_json(), ''
804
805 self.validate_image_migrating(image_spec, status)
806 return 0, self.add_task(ioctx,
807 "Aborting image migration for {}".format(
808 self.format_image_spec(image_spec)),
809 refs), ''
810
20effc67 811 def task_cancel(self, task_id: str) -> Tuple[int, str, str]:
9f95a23c
TL
812 self.log.info("task_cancel: {}".format(task_id))
813
814 task = self.tasks_by_id.get(task_id)
815 if not task or not is_authorized(self.module,
816 task.refs[TASK_REF_POOL_NAME],
817 task.refs[TASK_REF_POOL_NAMESPACE]):
818 return -errno.ENOENT, '', "No such task {}".format(task_id)
819
820 task.cancel()
821
822 remove_in_memory = True
823 if self.in_progress_task and self.in_progress_task.task_id == task_id:
824 self.log.info("Attempting to cancel in-progress task: {}".format(str(self.in_progress_task)))
825 remove_in_memory = False
826
827 # complete any associated event in the progress module
828 self.complete_progress(task)
829
830 # remove from rbd_task omap
831 with self.open_ioctx((task.refs[TASK_REF_POOL_NAME],
832 task.refs[TASK_REF_POOL_NAMESPACE])) as ioctx:
833 self.remove_task(ioctx, task, remove_in_memory)
834
835 return 0, "", ""
836
20effc67 837 def task_list(self, task_id: Optional[str]) -> Tuple[int, str, str]:
9f95a23c
TL
838 self.log.info("task_list: {}".format(task_id))
839
840 if task_id:
841 task = self.tasks_by_id.get(task_id)
842 if not task or not is_authorized(self.module,
843 task.refs[TASK_REF_POOL_NAME],
844 task.refs[TASK_REF_POOL_NAMESPACE]):
845 return -errno.ENOENT, '', "No such task {}".format(task_id)
846
20effc67 847 return 0, json.dumps(task.to_dict(), indent=4, sort_keys=True), ""
9f95a23c 848 else:
20effc67 849 tasks = []
9f95a23c
TL
850 for sequence in sorted(self.tasks_by_sequence.keys()):
851 task = self.tasks_by_sequence[sequence]
852 if is_authorized(self.module,
853 task.refs[TASK_REF_POOL_NAME],
854 task.refs[TASK_REF_POOL_NAMESPACE]):
20effc67 855 tasks.append(task.to_dict())
9f95a23c 856
20effc67 857 return 0, json.dumps(tasks, indent=4, sort_keys=True), ""