TASK_ID = "id"
TASK_REFS = "refs"
TASK_MESSAGE = "message"
+TASK_RETRY_ATTEMPTS = "retry_attempts"
TASK_RETRY_TIME = "retry_time"
+TASK_RETRY_MESSAGE = "retry_message"
TASK_IN_PROGRESS = "in_progress"
TASK_PROGRESS = "progress"
TASK_CANCELED = "canceled"
TASK_REF_ACTION_MIGRATION_ABORT]
TASK_RETRY_INTERVAL = timedelta(seconds=30)
+TASK_MAX_RETRY_INTERVAL = timedelta(seconds=300)
MAX_COMPLETED_TASKS = 50
self.task_id = task_id
self.message = message
self.refs = refs
+ self.retry_message = None
+ self.retry_attempts = 0
self.retry_time = None
self.in_progress = False
self.progress = 0.0
self.canceled = False
self.failed = False
+ self.progress_posted = False
def __str__(self):
return self.to_json()
TASK_MESSAGE: self.message,
TASK_REFS: self.refs
}
+ if self.retry_message:
+ d[TASK_RETRY_MESSAGE] = self.retry_message
+ if self.retry_attempts:
+ d[TASK_RETRY_ATTEMPTS] = self.retry_attempts
if self.retry_time:
d[TASK_RETRY_TIME] = self.retry_time.isoformat()
if self.in_progress:
else:
task.in_progress = True
self.in_progress_task = task
- self.update_progress(task, 0)
self.lock.release()
try:
except rados.ObjectNotFound as e:
self.log.error("execute_task: {}".format(e))
if pool_valid:
+ task.retry_message = "{}".format(e)
self.update_progress(task, 0)
else:
# pool DNE -- remove the task
except (rados.Error, rbd.Error) as e:
self.log.error("execute_task: {}".format(e))
+ task.retry_message = "{}".format(e)
self.update_progress(task, 0)
finally:
task.in_progress = False
- task.retry_time = datetime.now() + TASK_RETRY_INTERVAL
+ task.retry_attempts += 1
+ task.retry_time = datetime.now() + min(
+ TASK_RETRY_INTERVAL * task.retry_attempts,
+ TASK_MAX_RETRY_INTERVAL)
def progress_callback(self, task, current, total):
progress = float(current) / float(total)
finally:
self.lock.release()
- self.throttled_update_progress(task, progress)
+ if not task.progress_posted:
+ # delayed creation of progress event until first callback
+ self.post_progress(task, progress)
+ else:
+ self.throttled_update_progress(task, progress)
+
return 0
def execute_flatten(self, ioctx, task):
self.log.info("{}: task={}".format(task.failure_message, str(task)))
def complete_progress(self, task):
+ if not task.progress_posted:
+ # ensure progress event exists before we complete/fail it
+ self.post_progress(task, 0)
+
self.log.debug("complete_progress: task={}".format(str(task)))
try:
if task.failed:
# progress module is disabled
pass
- def update_progress(self, task, progress):
+ def _update_progress(self, task, progress):
self.log.debug("update_progress: task={}, progress={}".format(str(task), progress))
try:
refs = {"origin": "rbd_support"}
# progress module is disabled
pass
+ def post_progress(self, task, progress):
+ self._update_progress(task, progress)
+ task.progress_posted = True
+
+ def update_progress(self, task, progress):
+ if task.progress_posted:
+ self._update_progress(task, progress)
+
@Throttle(timedelta(seconds=1))
def throttled_update_progress(self, task, progress):
self.update_progress(task, progress)