]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/rbd_support/task.py
import 15.2.9
[ceph.git] / ceph / src / pybind / mgr / rbd_support / task.py
index c17ffa102f7d9757785c9380d933dc82e3a549ac..87d43eca15a6e97bd103d088df2a793614e6d18b 100644 (file)
@@ -21,7 +21,9 @@ TASK_SEQUENCE = "sequence"
 TASK_ID = "id"
 TASK_REFS = "refs"
 TASK_MESSAGE = "message"
+TASK_RETRY_ATTEMPTS = "retry_attempts"
 TASK_RETRY_TIME = "retry_time"
+TASK_RETRY_MESSAGE = "retry_message"
 TASK_IN_PROGRESS = "in_progress"
 TASK_PROGRESS = "progress"
 TASK_CANCELED = "canceled"
@@ -47,6 +49,7 @@ VALID_TASK_ACTIONS = [TASK_REF_ACTION_FLATTEN,
                       TASK_REF_ACTION_MIGRATION_ABORT]
 
 TASK_RETRY_INTERVAL = timedelta(seconds=30)
+TASK_MAX_RETRY_INTERVAL = timedelta(seconds=300)
 MAX_COMPLETED_TASKS = 50
 
 
@@ -71,11 +74,14 @@ class Task:
         self.task_id = task_id
         self.message = message
         self.refs = refs
+        self.retry_message = None
+        self.retry_attempts = 0
         self.retry_time = None
         self.in_progress = False
         self.progress = 0.0
         self.canceled = False
         self.failed = False
+        self.progress_posted = False
 
     def __str__(self):
         return self.to_json()
@@ -98,6 +104,10 @@ class Task:
              TASK_MESSAGE: self.message,
              TASK_REFS: self.refs
              }
+        if self.retry_message:
+            d[TASK_RETRY_MESSAGE] = self.retry_message
+        if self.retry_attempts:
+            d[TASK_RETRY_ATTEMPTS] = self.retry_attempts
         if self.retry_time:
             d[TASK_RETRY_TIME] = self.retry_time.isoformat()
         if self.in_progress:
@@ -364,7 +374,6 @@ class TaskHandler:
                 else:
                     task.in_progress = True
                     self.in_progress_task = task
-                    self.update_progress(task, 0)
 
                     self.lock.release()
                     try:
@@ -386,6 +395,7 @@ class TaskHandler:
         except rados.ObjectNotFound as e:
             self.log.error("execute_task: {}".format(e))
             if pool_valid:
+                task.retry_message = "{}".format(e)
                 self.update_progress(task, 0)
             else:
                 # pool DNE -- remove the task
@@ -394,11 +404,15 @@ class TaskHandler:
 
         except (rados.Error, rbd.Error) as e:
             self.log.error("execute_task: {}".format(e))
+            task.retry_message = "{}".format(e)
             self.update_progress(task, 0)
 
         finally:
             task.in_progress = False
-            task.retry_time = datetime.now() + TASK_RETRY_INTERVAL
+            task.retry_attempts += 1
+            task.retry_time = datetime.now() + min(
+                TASK_RETRY_INTERVAL * task.retry_attempts,
+                TASK_MAX_RETRY_INTERVAL)
 
     def progress_callback(self, task, current, total):
         progress = float(current) / float(total)
@@ -416,7 +430,12 @@ class TaskHandler:
         finally:
             self.lock.release()
 
-        self.throttled_update_progress(task, progress)
+        if not task.progress_posted:
+            # delayed creation of progress event until first callback
+            self.post_progress(task, progress)
+        else:
+            self.throttled_update_progress(task, progress)
+
         return 0
 
     def execute_flatten(self, ioctx, task):
@@ -492,6 +511,10 @@ class TaskHandler:
             self.log.info("{}: task={}".format(task.failure_message, str(task)))
 
     def complete_progress(self, task):
+        if not task.progress_posted:
+            # ensure progress event exists before we complete/fail it
+            self.post_progress(task, 0)
+
         self.log.debug("complete_progress: task={}".format(str(task)))
         try:
             if task.failed:
@@ -503,7 +526,7 @@ class TaskHandler:
             # progress module is disabled
             pass
 
-    def update_progress(self, task, progress):
+    def _update_progress(self, task, progress):
         self.log.debug("update_progress: task={}, progress={}".format(str(task), progress))
         try:
             refs = {"origin": "rbd_support"}
@@ -515,6 +538,14 @@ class TaskHandler:
             # progress module is disabled
             pass
 
+    def post_progress(self, task, progress):
+        self._update_progress(task, progress)
+        task.progress_posted = True
+
+    def update_progress(self, task, progress):
+        if task.progress_posted:
+            self._update_progress(task, progress)
+
     @Throttle(timedelta(seconds=1))
     def throttled_update_progress(self, task, progress):
         self.update_progress(task, progress)