]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/async_job.py
Import ceph 15.2.8
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / async_job.py
1 import sys
2 import time
3 import logging
4 import threading
5 import traceback
6 from collections import deque
7
8 from .exception import NotImplementedException
9
10 log = logging.getLogger(__name__)
11
12 class JobThread(threading.Thread):
13 # this is "not" configurable and there is no need for it to be
14 # configurable. if a thread encounters an exception, we retry
15 # until it hits this many consecutive exceptions.
16 MAX_RETRIES_ON_EXCEPTION = 10
17
18 def __init__(self, async_job, volume_client, name):
19 self.vc = volume_client
20 self.async_job = async_job
21 # event object to cancel jobs
22 self.cancel_event = threading.Event()
23 threading.Thread.__init__(self, name=name)
24
25 def run(self):
26 retries = 0
27 thread_id = threading.currentThread()
28 assert isinstance(thread_id, JobThread)
29 thread_name = thread_id.getName()
30 log.debug("thread [{0}] starting".format(thread_name))
31
32 while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
33 vol_job = None
34 try:
35 # fetch next job to execute
36 with self.async_job.lock:
37 while True:
38 if self.should_reconfigure_num_threads():
39 log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
40 self.async_job.threads.remove(self)
41 return
42 vol_job = self.async_job.get_job()
43 if vol_job:
44 break
45 self.async_job.cv.wait()
46 self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id)
47
48 # execute the job (outside lock)
49 self.async_job.execute_job(vol_job[0], vol_job[1], should_cancel=lambda: thread_id.should_cancel())
50 retries = 0
51 except NotImplementedException:
52 raise
53 except Exception:
54 # unless the jobs fetching and execution routines are not implemented
55 # retry till we hit cap limit.
56 retries += 1
57 log.warning("thread [{0}] encountered fatal error: (attempt#" \
58 " {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION))
59 exc_type, exc_value, exc_traceback = sys.exc_info()
60 log.warning("traceback: {0}".format("".join(
61 traceback.format_exception(exc_type, exc_value, exc_traceback))))
62 finally:
63 # when done, unregister the job
64 if vol_job:
65 with self.async_job.lock:
66 self.async_job.unregister_async_job(vol_job[0], vol_job[1], thread_id)
67 time.sleep(1)
68 log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
69 self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
70 with self.async_job.lock:
71 self.async_job.threads.remove(self)
72
73 def should_reconfigure_num_threads(self):
74 # reconfigure of max_concurrent_clones
75 return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs
76
77 def cancel_job(self):
78 self.cancel_event.set()
79
80 def should_cancel(self):
81 return self.cancel_event.is_set()
82
83 def reset_cancel(self):
84 self.cancel_event.clear()
85
86 class AsyncJobs(object):
87 """
88 Class providing asynchronous execution of jobs via worker threads.
89 `jobs` are grouped by `volume`, so a `volume` can have N number of
90 `jobs` executing concurrently (capped by number of concurrent jobs).
91
92 Usability is simple: subclass this and implement the following:
93 - get_next_job(volname, running_jobs)
94 - execute_job(volname, job, should_cancel)
95
96 ... and do not forget to invoke base class constructor.
97
98 Job cancelation is for a volume as a whole, i.e., all executing jobs
99 for a volume are canceled. Cancelation is poll based -- jobs need to
100 periodically check if cancelation is requested, after which the job
101 should return as soon as possible. Cancelation check is provided
102 via `should_cancel()` lambda passed to `execute_job()`.
103 """
104
105 def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
106 self.vc = volume_client
107 # queue of volumes for starting async jobs
108 self.q = deque() # type: deque
109 # volume => job tracking
110 self.jobs = {}
111 # lock, cv for kickstarting jobs
112 self.lock = threading.Lock()
113 self.cv = threading.Condition(self.lock)
114 # cv for job cancelation
115 self.waiting = False
116 self.cancel_cv = threading.Condition(self.lock)
117 self.nr_concurrent_jobs = nr_concurrent_jobs
118
119 self.threads = []
120 for i in range(nr_concurrent_jobs):
121 self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
122 self.threads[-1].start()
123
124 def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs):
125 """
126 reconfigure number of cloner threads
127 """
128 with self.lock:
129 self.nr_concurrent_jobs = nr_concurrent_jobs
130 # Decrease in concurrency. Notify threads which are waiting for a job to terminate.
131 if len(self.threads) > nr_concurrent_jobs:
132 self.cv.notifyAll()
133 # Increase in concurrency
134 if len(self.threads) < nr_concurrent_jobs:
135 for i in range(len(self.threads), nr_concurrent_jobs):
136 self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i)))
137 self.threads[-1].start()
138
139 def get_job(self):
140 log.debug("processing {0} volume entries".format(len(self.q)))
141 nr_vols = len(self.q)
142 to_remove = []
143 next_job = None
144 while nr_vols > 0:
145 volname = self.q[0]
146 # do this now so that the other thread pick up jobs for other volumes
147 self.q.rotate(1)
148 running_jobs = [j[0] for j in self.jobs[volname]]
149 (ret, job) = self.get_next_job(volname, running_jobs)
150 if job:
151 next_job = (volname, job)
152 break
153 # this is an optimization when for a given volume there are no more
154 # jobs and no jobs are in progress. in such cases we remove the volume
155 # from the tracking list so as to:
156 #
157 # a. not query the filesystem for jobs over and over again
158 # b. keep the filesystem connection idle so that it can be freed
159 # from the connection pool
160 #
161 # if at all there are jobs for a volume, the volume gets added again
162 # to the tracking list and the jobs get kickstarted.
163 # note that, we do not iterate the volume list fully if there is a
164 # jobs to process (that will take place eventually).
165 if ret == 0 and not job and not running_jobs:
166 to_remove.append(volname)
167 nr_vols -= 1
168 for vol in to_remove:
169 log.debug("auto removing volume '{0}' from tracked volumes".format(vol))
170 self.q.remove(vol)
171 self.jobs.pop(vol)
172 return next_job
173
174 def register_async_job(self, volname, job, thread_id):
175 log.debug("registering async job {0}.{1} with thread {2}".format(volname, job, thread_id))
176 self.jobs[volname].append((job, thread_id))
177
178 def unregister_async_job(self, volname, job, thread_id):
179 log.debug("unregistering async job {0}.{1} from thread {2}".format(volname, job, thread_id))
180 self.jobs[volname].remove((job, thread_id))
181
182 cancelled = thread_id.should_cancel()
183 thread_id.reset_cancel()
184
185 # wake up cancellation waiters if needed
186 if cancelled:
187 logging.info("waking up cancellation waiters")
188 self.cancel_cv.notifyAll()
189
190 def queue_job(self, volname):
191 """
192 queue a volume for asynchronous job execution.
193 """
194 log.info("queuing job for volume '{0}'".format(volname))
195 with self.lock:
196 if not volname in self.q:
197 self.q.append(volname)
198 self.jobs[volname] = []
199 self.cv.notifyAll()
200
201 def _cancel_jobs(self, volname):
202 """
203 cancel all jobs for the volume. do nothing is the no jobs are
204 executing for the given volume. this would wait until all jobs
205 get interrupted and finish execution.
206 """
207 log.info("cancelling jobs for volume '{0}'".format(volname))
208 try:
209 if not volname in self.q and not volname in self.jobs:
210 return
211 self.q.remove(volname)
212 # cancel in-progress operation and wait until complete
213 for j in self.jobs[volname]:
214 j[1].cancel_job()
215 # wait for cancellation to complete
216 while self.jobs[volname]:
217 log.debug("waiting for {0} in-progress jobs for volume '{1}' to " \
218 "cancel".format(len(self.jobs[volname]), volname))
219 self.cancel_cv.wait()
220 self.jobs.pop(volname)
221 except (KeyError, ValueError):
222 pass
223
224 def _cancel_job(self, volname, job):
225 """
226 cancel a executing job for a given volume. return True if canceled, False
227 otherwise (volume/job not found).
228 """
229 canceled = False
230 log.info("canceling job {0} for volume {1}".format(job, volname))
231 try:
232 if not volname in self.q and not volname in self.jobs and not job in self.jobs[volname]:
233 return canceled
234 for j in self.jobs[volname]:
235 if j[0] == job:
236 j[1].cancel_job()
237 # be safe against _cancel_jobs() running concurrently
238 while j in self.jobs.get(volname, []):
239 self.cancel_cv.wait()
240 canceled = True
241 break
242 except (KeyError, ValueError):
243 pass
244 return canceled
245
246 def cancel_job(self, volname, job):
247 with self.lock:
248 return self._cancel_job(volname, job)
249
250 def cancel_jobs(self, volname):
251 """
252 cancel all executing jobs for a given volume.
253 """
254 with self.lock:
255 self._cancel_jobs(volname)
256
257 def cancel_all_jobs(self):
258 """
259 call all executing jobs for all volumes.
260 """
261 with self.lock:
262 for volname in list(self.q):
263 self._cancel_jobs(volname)
264
265 def get_next_job(self, volname, running_jobs):
266 """
267 get the next job for asynchronous execution as (retcode, job) tuple. if no
268 jobs are available return (0, None) else return (0, job). on error return
269 (-ret, None). called under `self.lock`.
270 """
271 raise NotImplementedException()
272
273 def execute_job(self, volname, job, should_cancel):
274 """
275 execute a job for a volume. the job can block on I/O operations, sleep for long
276 hours and do all kinds of synchronous work. called outside `self.lock`.
277 """
278 raise NotImplementedException()
279