return task_json
def remove_task(self,
- ioctx: rados.Ioctx,
+ ioctx: Optional[rados.Ioctx],
task: Task,
remove_in_memory: bool = True) -> None:
self.log.info("remove_task: task={}".format(str(task)))
- omap_keys = (task.sequence_key, )
- try:
- with rados.WriteOpCtx() as write_op:
- ioctx.remove_omap_keys(write_op, omap_keys)
- ioctx.operate_write_op(write_op, RBD_TASK_OID)
- except rados.ObjectNotFound:
- pass
+ if ioctx:
+ try:
+ with rados.WriteOpCtx() as write_op:
+ omap_keys = (task.sequence_key, )
+ ioctx.remove_omap_keys(write_op, omap_keys)
+ ioctx.operate_write_op(write_op, RBD_TASK_OID)
+ except rados.ObjectNotFound:
+ pass
if remove_in_memory:
try:
task.retry_message = "{}".format(e)
self.update_progress(task, 0)
else:
- # pool DNE -- remove the task
+ # pool DNE -- remove in-memory task
self.complete_progress(task)
- self.remove_task(ioctx, task)
+ self.remove_task(None, task)
except (rados.Error, rbd.Error) as e:
self.log.error("execute_task: {}".format(e))