]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/volumes/fs/async_job.py
Import ceph 15.2.8
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / async_job.py
index 3bdedb723b9ce4a0c7b60fd4663b31ae0ea048bc..fb7051f47c24282b07ec51f9149fde6f10243763 100644 (file)
@@ -27,6 +27,7 @@ class JobThread(threading.Thread):
         thread_id = threading.currentThread()
         assert isinstance(thread_id, JobThread)
         thread_name = thread_id.getName()
+        log.debug("thread [{0}] starting".format(thread_name))
 
         while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
             vol_job = None
@@ -34,6 +35,10 @@ class JobThread(threading.Thread):
                 # fetch next job to execute
                 with self.async_job.lock:
                     while True:
+                        if self.should_reconfigure_num_threads():
+                            log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
+                            self.async_job.threads.remove(self)
+                            return
                         vol_job = self.async_job.get_job()
                         if vol_job:
                             break
@@ -62,6 +67,12 @@ class JobThread(threading.Thread):
                 time.sleep(1)
         log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
         self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
+        with self.async_job.lock:
+            self.async_job.threads.remove(self)
+
+    def should_reconfigure_num_threads(self):
+        # reconfigure of max_concurrent_clones
+        return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs
 
     def cancel_job(self):
         self.cancel_event.set()
@@ -103,12 +114,28 @@ class AsyncJobs(object):
         # cv for job cancelation
         self.waiting = False
         self.cancel_cv = threading.Condition(self.lock)
+        self.nr_concurrent_jobs = nr_concurrent_jobs
 
         self.threads = []
         for i in range(nr_concurrent_jobs):
             self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
             self.threads[-1].start()
 
+    def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs):
+        """
+        reconfigure number of cloner threads
+        """
+        with self.lock:
+            self.nr_concurrent_jobs = nr_concurrent_jobs
+            # Decrease in concurrency. Notify threads which are waiting for a job to terminate.
+            if len(self.threads) > nr_concurrent_jobs:
+                self.cv.notifyAll()
+            # Increase in concurrency
+            if len(self.threads) < nr_concurrent_jobs:
+                for i in range(len(self.threads), nr_concurrent_jobs):
+                    self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i)))
+                    self.threads[-1].start()
+
     def get_job(self):
         log.debug("processing {0} volume entries".format(len(self.q)))
         nr_vols = len(self.q)