thread_id = threading.currentThread()
assert isinstance(thread_id, JobThread)
thread_name = thread_id.getName()
+ log.debug("thread [{0}] starting".format(thread_name))
while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
vol_job = None
# fetch next job to execute
with self.async_job.lock:
while True:
+ if self.should_reconfigure_num_threads():
+ log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
+ self.async_job.threads.remove(self)
+ return
vol_job = self.async_job.get_job()
if vol_job:
break
time.sleep(1)
log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
+ with self.async_job.lock:
+ self.async_job.threads.remove(self)
+
+ def should_reconfigure_num_threads(self):
+ # reconfigure of max_concurrent_clones
+ return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs
def cancel_job(self):
self.cancel_event.set()
# cv for job cancelation
self.waiting = False
self.cancel_cv = threading.Condition(self.lock)
+ self.nr_concurrent_jobs = nr_concurrent_jobs
self.threads = []
for i in range(nr_concurrent_jobs):
self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
self.threads[-1].start()
+ def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs):
+ """
+ reconfigure number of cloner threads
+ """
+ with self.lock:
+ self.nr_concurrent_jobs = nr_concurrent_jobs
+ # Decrease in concurrency. Notify threads which are waiting for a job to terminate.
+ if len(self.threads) > nr_concurrent_jobs:
+ self.cv.notifyAll()
+ # Increase in concurrency
+ if len(self.threads) < nr_concurrent_jobs:
+ for i in range(len(self.threads), nr_concurrent_jobs):
+ self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i)))
+ self.threads[-1].start()
+
def get_job(self):
log.debug("processing {0} volume entries".format(len(self.q)))
nr_vols = len(self.q)