]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/volumes/fs/purge_queue.py
import ceph 14.2.5
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / purge_queue.py
index 8a7429aa6497dcdf19e141665930f27a7d692c61..a76065cbab953da9f724c6ce538faf2df547f170 100644 (file)
@@ -1,3 +1,4 @@
+import sys
 import time
 import logging
 import threading
@@ -10,19 +11,39 @@ class PurgeQueueBase(object):
     """
     Base class for implementing purge queue strategies.
     """
+
+    # this is "not" configurable and there is no need for it to be
+    # configurable. if a purge thread encounters an exception, we
+    # retry, till it hits this many consecutive exceptions after
+    # which a warning is sent to `ceph status`.
+    MAX_RETRIES_ON_EXCEPTION = 10
+
     class PurgeThread(threading.Thread):
-        def __init__(self, name, purge_fn):
+        def __init__(self, volume_client, name, purge_fn):
+            self.vc = volume_client
             self.purge_fn = purge_fn
             # event object to cancel ongoing purge
             self.cancel_event = threading.Event()
             threading.Thread.__init__(self, name=name)
 
         def run(self):
-            try:
-                self.purge_fn()
-            except Exception as e:
-                trace = "".join(traceback.format_exception(None, e, e.__traceback__))
-                log.error("purge queue thread encountered fatal error:\n"+trace)
+            retries = 0
+            thread_name = threading.currentThread().getName()
+            while retries < PurgeQueueBase.MAX_RETRIES_ON_EXCEPTION:
+                try:
+                    self.purge_fn()
+                    retries = 0
+                except Exception:
+                    retries += 1
+                    log.warning("purge thread [{0}] encountered fatal error: (attempt#" \
+                                " {1}/{2})".format(thread_name, retries,
+                                                   PurgeQueueBase.MAX_RETRIES_ON_EXCEPTION))
+                    exc_type, exc_value, exc_traceback = sys.exc_info()
+                    log.warning("traceback: {0}".format("".join(
+                        traceback.format_exception(exc_type, exc_value, exc_traceback))))
+                    time.sleep(1)
+            log.error("purge thread [{0}] reached exception limit, bailing out...".format(thread_name))
+            self.vc.cluster_log("purge thread {0} bailing out due to exception".format(thread_name))
 
         def cancel_job(self):
             self.cancel_event.set()
@@ -135,7 +156,7 @@ class ThreadPoolPurgeQueueMixin(PurgeQueueBase):
         self.threads = []
         for i in range(tp_size):
             self.threads.append(
-                PurgeQueueBase.PurgeThread(name="purgejob.{}".format(i), purge_fn=self.run))
+                PurgeQueueBase.PurgeThread(volume_client, name="purgejob.{}".format(i), purge_fn=self.run))
             self.threads[-1].start()
 
     def pick_purge_dir_from_volume(self):