+import sys
import time
import logging
import threading
"""
Base class for implementing purge queue strategies.
"""
+
+ # this is "not" configurable and there is no need for it to be
+ # configurable. if a purge thread encounters an exception, we
+ # retry, till it hits this many consecutive exceptions after
+ # which a warning is sent to `ceph status`.
+ MAX_RETRIES_ON_EXCEPTION = 10
+
class PurgeThread(threading.Thread):
- def __init__(self, name, purge_fn):
+ def __init__(self, volume_client, name, purge_fn):
+ self.vc = volume_client
self.purge_fn = purge_fn
# event object to cancel ongoing purge
self.cancel_event = threading.Event()
threading.Thread.__init__(self, name=name)
def run(self):
- try:
- self.purge_fn()
- except Exception as e:
- trace = "".join(traceback.format_exception(None, e, e.__traceback__))
- log.error("purge queue thread encountered fatal error:\n"+trace)
+ retries = 0
+ thread_name = threading.currentThread().getName()
+ while retries < PurgeQueueBase.MAX_RETRIES_ON_EXCEPTION:
+ try:
+ self.purge_fn()
+ retries = 0
+ except Exception:
+ retries += 1
+ log.warning("purge thread [{0}] encountered fatal error: (attempt#" \
+ " {1}/{2})".format(thread_name, retries,
+ PurgeQueueBase.MAX_RETRIES_ON_EXCEPTION))
+ exc_type, exc_value, exc_traceback = sys.exc_info()
+ log.warning("traceback: {0}".format("".join(
+ traceback.format_exception(exc_type, exc_value, exc_traceback))))
+ time.sleep(1)
+ log.error("purge thread [{0}] reached exception limit, bailing out...".format(thread_name))
+ self.vc.cluster_log("purge thread {0} bailing out due to exception".format(thread_name))
def cancel_job(self):
self.cancel_event.set()
self.threads = []
for i in range(tp_size):
self.threads.append(
- PurgeQueueBase.PurgeThread(name="purgejob.{}".format(i), purge_fn=self.run))
+ PurgeQueueBase.PurgeThread(volume_client, name="purgejob.{}".format(i), purge_fn=self.run))
self.threads[-1].start()
def pick_purge_dir_from_volume(self):