]>
Commit | Line | Data |
---|---|---|
92f5a8d4 TL |
1 | import sys |
2 | import time | |
3 | import logging | |
4 | import threading | |
5 | import traceback | |
6 | from collections import deque | |
f67539c2 | 7 | from mgr_util import lock_timeout_log |
92f5a8d4 TL |
8 | |
9 | from .exception import NotImplementedException | |
10 | ||
11 | log = logging.getLogger(__name__) | |
12 | ||
13 | class JobThread(threading.Thread): | |
14 | # this is "not" configurable and there is no need for it to be | |
15 | # configurable. if a thread encounters an exception, we retry | |
16 | # until it hits this many consecutive exceptions. | |
17 | MAX_RETRIES_ON_EXCEPTION = 10 | |
18 | ||
19 | def __init__(self, async_job, volume_client, name): | |
20 | self.vc = volume_client | |
21 | self.async_job = async_job | |
22 | # event object to cancel jobs | |
23 | self.cancel_event = threading.Event() | |
24 | threading.Thread.__init__(self, name=name) | |
25 | ||
26 | def run(self): | |
27 | retries = 0 | |
28 | thread_id = threading.currentThread() | |
9f95a23c | 29 | assert isinstance(thread_id, JobThread) |
92f5a8d4 | 30 | thread_name = thread_id.getName() |
f91f0fd5 | 31 | log.debug("thread [{0}] starting".format(thread_name)) |
92f5a8d4 TL |
32 | |
33 | while retries < JobThread.MAX_RETRIES_ON_EXCEPTION: | |
34 | vol_job = None | |
35 | try: | |
36 | # fetch next job to execute | |
37 | with self.async_job.lock: | |
38 | while True: | |
f91f0fd5 TL |
39 | if self.should_reconfigure_num_threads(): |
40 | log.info("thread [{0}] terminating due to reconfigure".format(thread_name)) | |
41 | self.async_job.threads.remove(self) | |
42 | return | |
92f5a8d4 TL |
43 | vol_job = self.async_job.get_job() |
44 | if vol_job: | |
45 | break | |
46 | self.async_job.cv.wait() | |
47 | self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id) | |
48 | ||
49 | # execute the job (outside lock) | |
50 | self.async_job.execute_job(vol_job[0], vol_job[1], should_cancel=lambda: thread_id.should_cancel()) | |
51 | retries = 0 | |
52 | except NotImplementedException: | |
53 | raise | |
54 | except Exception: | |
55 | # unless the jobs fetching and execution routines are not implemented | |
56 | # retry till we hit cap limit. | |
57 | retries += 1 | |
58 | log.warning("thread [{0}] encountered fatal error: (attempt#" \ | |
59 | " {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION)) | |
60 | exc_type, exc_value, exc_traceback = sys.exc_info() | |
61 | log.warning("traceback: {0}".format("".join( | |
62 | traceback.format_exception(exc_type, exc_value, exc_traceback)))) | |
63 | finally: | |
64 | # when done, unregister the job | |
65 | if vol_job: | |
66 | with self.async_job.lock: | |
67 | self.async_job.unregister_async_job(vol_job[0], vol_job[1], thread_id) | |
68 | time.sleep(1) | |
69 | log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name)) | |
70 | self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name)) | |
f91f0fd5 TL |
71 | with self.async_job.lock: |
72 | self.async_job.threads.remove(self) | |
73 | ||
74 | def should_reconfigure_num_threads(self): | |
75 | # reconfigure of max_concurrent_clones | |
76 | return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs | |
92f5a8d4 TL |
77 | |
78 | def cancel_job(self): | |
79 | self.cancel_event.set() | |
80 | ||
81 | def should_cancel(self): | |
9f95a23c | 82 | return self.cancel_event.is_set() |
92f5a8d4 TL |
83 | |
84 | def reset_cancel(self): | |
85 | self.cancel_event.clear() | |
86 | ||
f67539c2 | 87 | class AsyncJobs(threading.Thread): |
92f5a8d4 TL |
88 | """ |
89 | Class providing asynchronous execution of jobs via worker threads. | |
90 | `jobs` are grouped by `volume`, so a `volume` can have N number of | |
91 | `jobs` executing concurrently (capped by number of concurrent jobs). | |
92 | ||
93 | Usability is simple: subclass this and implement the following: | |
94 | - get_next_job(volname, running_jobs) | |
95 | - execute_job(volname, job, should_cancel) | |
96 | ||
97 | ... and do not forget to invoke base class constructor. | |
98 | ||
99 | Job cancelation is for a volume as a whole, i.e., all executing jobs | |
100 | for a volume are canceled. Cancelation is poll based -- jobs need to | |
101 | periodically check if cancelation is requested, after which the job | |
102 | should return as soon as possible. Cancelation check is provided | |
103 | via `should_cancel()` lambda passed to `execute_job()`. | |
104 | """ | |
105 | ||
106 | def __init__(self, volume_client, name_pfx, nr_concurrent_jobs): | |
f67539c2 | 107 | threading.Thread.__init__(self, name="{0}.tick".format(name_pfx)) |
92f5a8d4 TL |
108 | self.vc = volume_client |
109 | # queue of volumes for starting async jobs | |
9f95a23c | 110 | self.q = deque() # type: deque |
92f5a8d4 TL |
111 | # volume => job tracking |
112 | self.jobs = {} | |
113 | # lock, cv for kickstarting jobs | |
114 | self.lock = threading.Lock() | |
115 | self.cv = threading.Condition(self.lock) | |
116 | # cv for job cancelation | |
117 | self.waiting = False | |
f67539c2 | 118 | self.stopping = threading.Event() |
92f5a8d4 | 119 | self.cancel_cv = threading.Condition(self.lock) |
f91f0fd5 | 120 | self.nr_concurrent_jobs = nr_concurrent_jobs |
f67539c2 | 121 | self.name_pfx = name_pfx |
92f5a8d4 TL |
122 | |
123 | self.threads = [] | |
f67539c2 TL |
124 | for i in range(self.nr_concurrent_jobs): |
125 | self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(self.name_pfx, i))) | |
92f5a8d4 | 126 | self.threads[-1].start() |
f67539c2 | 127 | self.start() |
92f5a8d4 | 128 | |
f67539c2 TL |
129 | def run(self): |
130 | log.debug("tick thread {} starting".format(self.name)) | |
131 | with lock_timeout_log(self.lock): | |
132 | while not self.stopping.is_set(): | |
133 | c = len(self.threads) | |
134 | if c > self.nr_concurrent_jobs: | |
135 | # Decrease concurrency: notify threads which are waiting for a job to terminate. | |
136 | log.debug("waking threads to terminate due to job reduction") | |
137 | self.cv.notifyAll() | |
138 | elif c < self.nr_concurrent_jobs: | |
139 | # Increase concurrency: create more threads. | |
140 | log.debug("creating new threads to job increase") | |
141 | for i in range(c, self.nr_concurrent_jobs): | |
142 | self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(self.name_pfx, time.time(), i))) | |
143 | self.threads[-1].start() | |
144 | self.cv.wait(timeout=5) | |
145 | ||
146 | def shutdown(self): | |
147 | self.stopping.set() | |
148 | self.cancel_all_jobs() | |
149 | with self.lock: | |
150 | self.cv.notifyAll() | |
151 | self.join() | |
152 | ||
153 | def reconfigure_max_async_threads(self, nr_concurrent_jobs): | |
f91f0fd5 TL |
154 | """ |
155 | reconfigure number of cloner threads | |
156 | """ | |
f67539c2 | 157 | self.nr_concurrent_jobs = nr_concurrent_jobs |
f91f0fd5 | 158 | |
92f5a8d4 TL |
159 | def get_job(self): |
160 | log.debug("processing {0} volume entries".format(len(self.q))) | |
161 | nr_vols = len(self.q) | |
162 | to_remove = [] | |
163 | next_job = None | |
164 | while nr_vols > 0: | |
165 | volname = self.q[0] | |
166 | # do this now so that the other thread pick up jobs for other volumes | |
167 | self.q.rotate(1) | |
168 | running_jobs = [j[0] for j in self.jobs[volname]] | |
169 | (ret, job) = self.get_next_job(volname, running_jobs) | |
170 | if job: | |
171 | next_job = (volname, job) | |
172 | break | |
173 | # this is an optimization when for a given volume there are no more | |
174 | # jobs and no jobs are in progress. in such cases we remove the volume | |
175 | # from the tracking list so as to: | |
176 | # | |
177 | # a. not query the filesystem for jobs over and over again | |
178 | # b. keep the filesystem connection idle so that it can be freed | |
179 | # from the connection pool | |
180 | # | |
181 | # if at all there are jobs for a volume, the volume gets added again | |
182 | # to the tracking list and the jobs get kickstarted. | |
183 | # note that, we do not iterate the volume list fully if there is a | |
184 | # jobs to process (that will take place eventually). | |
185 | if ret == 0 and not job and not running_jobs: | |
186 | to_remove.append(volname) | |
187 | nr_vols -= 1 | |
188 | for vol in to_remove: | |
189 | log.debug("auto removing volume '{0}' from tracked volumes".format(vol)) | |
190 | self.q.remove(vol) | |
191 | self.jobs.pop(vol) | |
192 | return next_job | |
193 | ||
194 | def register_async_job(self, volname, job, thread_id): | |
195 | log.debug("registering async job {0}.{1} with thread {2}".format(volname, job, thread_id)) | |
196 | self.jobs[volname].append((job, thread_id)) | |
197 | ||
198 | def unregister_async_job(self, volname, job, thread_id): | |
199 | log.debug("unregistering async job {0}.{1} from thread {2}".format(volname, job, thread_id)) | |
200 | self.jobs[volname].remove((job, thread_id)) | |
201 | ||
202 | cancelled = thread_id.should_cancel() | |
203 | thread_id.reset_cancel() | |
204 | ||
205 | # wake up cancellation waiters if needed | |
9f95a23c | 206 | if cancelled: |
92f5a8d4 TL |
207 | logging.info("waking up cancellation waiters") |
208 | self.cancel_cv.notifyAll() | |
209 | ||
210 | def queue_job(self, volname): | |
211 | """ | |
212 | queue a volume for asynchronous job execution. | |
213 | """ | |
214 | log.info("queuing job for volume '{0}'".format(volname)) | |
f67539c2 | 215 | with lock_timeout_log(self.lock): |
92f5a8d4 TL |
216 | if not volname in self.q: |
217 | self.q.append(volname) | |
218 | self.jobs[volname] = [] | |
219 | self.cv.notifyAll() | |
220 | ||
221 | def _cancel_jobs(self, volname): | |
222 | """ | |
223 | cancel all jobs for the volume. do nothing is the no jobs are | |
224 | executing for the given volume. this would wait until all jobs | |
225 | get interrupted and finish execution. | |
226 | """ | |
227 | log.info("cancelling jobs for volume '{0}'".format(volname)) | |
228 | try: | |
229 | if not volname in self.q and not volname in self.jobs: | |
230 | return | |
231 | self.q.remove(volname) | |
232 | # cancel in-progress operation and wait until complete | |
233 | for j in self.jobs[volname]: | |
234 | j[1].cancel_job() | |
235 | # wait for cancellation to complete | |
236 | while self.jobs[volname]: | |
237 | log.debug("waiting for {0} in-progress jobs for volume '{1}' to " \ | |
238 | "cancel".format(len(self.jobs[volname]), volname)) | |
239 | self.cancel_cv.wait() | |
240 | self.jobs.pop(volname) | |
241 | except (KeyError, ValueError): | |
242 | pass | |
243 | ||
9f95a23c TL |
244 | def _cancel_job(self, volname, job): |
245 | """ | |
246 | cancel a executing job for a given volume. return True if canceled, False | |
247 | otherwise (volume/job not found). | |
248 | """ | |
249 | canceled = False | |
250 | log.info("canceling job {0} for volume {1}".format(job, volname)) | |
251 | try: | |
252 | if not volname in self.q and not volname in self.jobs and not job in self.jobs[volname]: | |
253 | return canceled | |
254 | for j in self.jobs[volname]: | |
255 | if j[0] == job: | |
256 | j[1].cancel_job() | |
257 | # be safe against _cancel_jobs() running concurrently | |
258 | while j in self.jobs.get(volname, []): | |
259 | self.cancel_cv.wait() | |
260 | canceled = True | |
261 | break | |
262 | except (KeyError, ValueError): | |
263 | pass | |
264 | return canceled | |
265 | ||
266 | def cancel_job(self, volname, job): | |
f67539c2 | 267 | with lock_timeout_log(self.lock): |
9f95a23c TL |
268 | return self._cancel_job(volname, job) |
269 | ||
92f5a8d4 TL |
270 | def cancel_jobs(self, volname): |
271 | """ | |
272 | cancel all executing jobs for a given volume. | |
273 | """ | |
f67539c2 | 274 | with lock_timeout_log(self.lock): |
92f5a8d4 TL |
275 | self._cancel_jobs(volname) |
276 | ||
277 | def cancel_all_jobs(self): | |
278 | """ | |
279 | call all executing jobs for all volumes. | |
280 | """ | |
f67539c2 | 281 | with lock_timeout_log(self.lock): |
92f5a8d4 TL |
282 | for volname in list(self.q): |
283 | self._cancel_jobs(volname) | |
284 | ||
285 | def get_next_job(self, volname, running_jobs): | |
286 | """ | |
287 | get the next job for asynchronous execution as (retcode, job) tuple. if no | |
288 | jobs are available return (0, None) else return (0, job). on error return | |
289 | (-ret, None). called under `self.lock`. | |
290 | """ | |
291 | raise NotImplementedException() | |
292 | ||
293 | def execute_job(self, volname, job, should_cancel): | |
294 | """ | |
295 | execute a job for a volume. the job can block on I/O operations, sleep for long | |
296 | hours and do all kinds of synchronous work. called outside `self.lock`. | |
297 | """ | |
298 | raise NotImplementedException() | |
299 |