]>
Commit | Line | Data |
---|---|---|
92f5a8d4 TL |
1 | import sys |
2 | import time | |
3 | import logging | |
4 | import threading | |
5 | import traceback | |
6 | from collections import deque | |
7 | ||
8 | from .exception import NotImplementedException | |
9 | ||
10 | log = logging.getLogger(__name__) | |
11 | ||
12 | class JobThread(threading.Thread): | |
13 | # this is "not" configurable and there is no need for it to be | |
14 | # configurable. if a thread encounters an exception, we retry | |
15 | # until it hits this many consecutive exceptions. | |
16 | MAX_RETRIES_ON_EXCEPTION = 10 | |
17 | ||
18 | def __init__(self, async_job, volume_client, name): | |
19 | self.vc = volume_client | |
20 | self.async_job = async_job | |
21 | # event object to cancel jobs | |
22 | self.cancel_event = threading.Event() | |
23 | threading.Thread.__init__(self, name=name) | |
24 | ||
25 | def run(self): | |
26 | retries = 0 | |
27 | thread_id = threading.currentThread() | |
9f95a23c | 28 | assert isinstance(thread_id, JobThread) |
92f5a8d4 TL |
29 | thread_name = thread_id.getName() |
30 | ||
31 | while retries < JobThread.MAX_RETRIES_ON_EXCEPTION: | |
32 | vol_job = None | |
33 | try: | |
34 | # fetch next job to execute | |
35 | with self.async_job.lock: | |
36 | while True: | |
37 | vol_job = self.async_job.get_job() | |
38 | if vol_job: | |
39 | break | |
40 | self.async_job.cv.wait() | |
41 | self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id) | |
42 | ||
43 | # execute the job (outside lock) | |
44 | self.async_job.execute_job(vol_job[0], vol_job[1], should_cancel=lambda: thread_id.should_cancel()) | |
45 | retries = 0 | |
46 | except NotImplementedException: | |
47 | raise | |
48 | except Exception: | |
49 | # unless the jobs fetching and execution routines are not implemented | |
50 | # retry till we hit cap limit. | |
51 | retries += 1 | |
52 | log.warning("thread [{0}] encountered fatal error: (attempt#" \ | |
53 | " {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION)) | |
54 | exc_type, exc_value, exc_traceback = sys.exc_info() | |
55 | log.warning("traceback: {0}".format("".join( | |
56 | traceback.format_exception(exc_type, exc_value, exc_traceback)))) | |
57 | finally: | |
58 | # when done, unregister the job | |
59 | if vol_job: | |
60 | with self.async_job.lock: | |
61 | self.async_job.unregister_async_job(vol_job[0], vol_job[1], thread_id) | |
62 | time.sleep(1) | |
63 | log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name)) | |
64 | self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name)) | |
65 | ||
66 | def cancel_job(self): | |
67 | self.cancel_event.set() | |
68 | ||
69 | def should_cancel(self): | |
9f95a23c | 70 | return self.cancel_event.is_set() |
92f5a8d4 TL |
71 | |
72 | def reset_cancel(self): | |
73 | self.cancel_event.clear() | |
74 | ||
75 | class AsyncJobs(object): | |
76 | """ | |
77 | Class providing asynchronous execution of jobs via worker threads. | |
78 | `jobs` are grouped by `volume`, so a `volume` can have N number of | |
79 | `jobs` executing concurrently (capped by number of concurrent jobs). | |
80 | ||
81 | Usability is simple: subclass this and implement the following: | |
82 | - get_next_job(volname, running_jobs) | |
83 | - execute_job(volname, job, should_cancel) | |
84 | ||
85 | ... and do not forget to invoke base class constructor. | |
86 | ||
87 | Job cancelation is for a volume as a whole, i.e., all executing jobs | |
88 | for a volume are canceled. Cancelation is poll based -- jobs need to | |
89 | periodically check if cancelation is requested, after which the job | |
90 | should return as soon as possible. Cancelation check is provided | |
91 | via `should_cancel()` lambda passed to `execute_job()`. | |
92 | """ | |
93 | ||
94 | def __init__(self, volume_client, name_pfx, nr_concurrent_jobs): | |
95 | self.vc = volume_client | |
96 | # queue of volumes for starting async jobs | |
9f95a23c | 97 | self.q = deque() # type: deque |
92f5a8d4 TL |
98 | # volume => job tracking |
99 | self.jobs = {} | |
100 | # lock, cv for kickstarting jobs | |
101 | self.lock = threading.Lock() | |
102 | self.cv = threading.Condition(self.lock) | |
103 | # cv for job cancelation | |
104 | self.waiting = False | |
105 | self.cancel_cv = threading.Condition(self.lock) | |
106 | ||
107 | self.threads = [] | |
108 | for i in range(nr_concurrent_jobs): | |
109 | self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i))) | |
110 | self.threads[-1].start() | |
111 | ||
112 | def get_job(self): | |
113 | log.debug("processing {0} volume entries".format(len(self.q))) | |
114 | nr_vols = len(self.q) | |
115 | to_remove = [] | |
116 | next_job = None | |
117 | while nr_vols > 0: | |
118 | volname = self.q[0] | |
119 | # do this now so that the other thread pick up jobs for other volumes | |
120 | self.q.rotate(1) | |
121 | running_jobs = [j[0] for j in self.jobs[volname]] | |
122 | (ret, job) = self.get_next_job(volname, running_jobs) | |
123 | if job: | |
124 | next_job = (volname, job) | |
125 | break | |
126 | # this is an optimization when for a given volume there are no more | |
127 | # jobs and no jobs are in progress. in such cases we remove the volume | |
128 | # from the tracking list so as to: | |
129 | # | |
130 | # a. not query the filesystem for jobs over and over again | |
131 | # b. keep the filesystem connection idle so that it can be freed | |
132 | # from the connection pool | |
133 | # | |
134 | # if at all there are jobs for a volume, the volume gets added again | |
135 | # to the tracking list and the jobs get kickstarted. | |
136 | # note that, we do not iterate the volume list fully if there is a | |
137 | # jobs to process (that will take place eventually). | |
138 | if ret == 0 and not job and not running_jobs: | |
139 | to_remove.append(volname) | |
140 | nr_vols -= 1 | |
141 | for vol in to_remove: | |
142 | log.debug("auto removing volume '{0}' from tracked volumes".format(vol)) | |
143 | self.q.remove(vol) | |
144 | self.jobs.pop(vol) | |
145 | return next_job | |
146 | ||
147 | def register_async_job(self, volname, job, thread_id): | |
148 | log.debug("registering async job {0}.{1} with thread {2}".format(volname, job, thread_id)) | |
149 | self.jobs[volname].append((job, thread_id)) | |
150 | ||
151 | def unregister_async_job(self, volname, job, thread_id): | |
152 | log.debug("unregistering async job {0}.{1} from thread {2}".format(volname, job, thread_id)) | |
153 | self.jobs[volname].remove((job, thread_id)) | |
154 | ||
155 | cancelled = thread_id.should_cancel() | |
156 | thread_id.reset_cancel() | |
157 | ||
158 | # wake up cancellation waiters if needed | |
9f95a23c | 159 | if cancelled: |
92f5a8d4 TL |
160 | logging.info("waking up cancellation waiters") |
161 | self.cancel_cv.notifyAll() | |
162 | ||
163 | def queue_job(self, volname): | |
164 | """ | |
165 | queue a volume for asynchronous job execution. | |
166 | """ | |
167 | log.info("queuing job for volume '{0}'".format(volname)) | |
168 | with self.lock: | |
169 | if not volname in self.q: | |
170 | self.q.append(volname) | |
171 | self.jobs[volname] = [] | |
172 | self.cv.notifyAll() | |
173 | ||
174 | def _cancel_jobs(self, volname): | |
175 | """ | |
176 | cancel all jobs for the volume. do nothing is the no jobs are | |
177 | executing for the given volume. this would wait until all jobs | |
178 | get interrupted and finish execution. | |
179 | """ | |
180 | log.info("cancelling jobs for volume '{0}'".format(volname)) | |
181 | try: | |
182 | if not volname in self.q and not volname in self.jobs: | |
183 | return | |
184 | self.q.remove(volname) | |
185 | # cancel in-progress operation and wait until complete | |
186 | for j in self.jobs[volname]: | |
187 | j[1].cancel_job() | |
188 | # wait for cancellation to complete | |
189 | while self.jobs[volname]: | |
190 | log.debug("waiting for {0} in-progress jobs for volume '{1}' to " \ | |
191 | "cancel".format(len(self.jobs[volname]), volname)) | |
192 | self.cancel_cv.wait() | |
193 | self.jobs.pop(volname) | |
194 | except (KeyError, ValueError): | |
195 | pass | |
196 | ||
9f95a23c TL |
197 | def _cancel_job(self, volname, job): |
198 | """ | |
199 | cancel a executing job for a given volume. return True if canceled, False | |
200 | otherwise (volume/job not found). | |
201 | """ | |
202 | canceled = False | |
203 | log.info("canceling job {0} for volume {1}".format(job, volname)) | |
204 | try: | |
205 | if not volname in self.q and not volname in self.jobs and not job in self.jobs[volname]: | |
206 | return canceled | |
207 | for j in self.jobs[volname]: | |
208 | if j[0] == job: | |
209 | j[1].cancel_job() | |
210 | # be safe against _cancel_jobs() running concurrently | |
211 | while j in self.jobs.get(volname, []): | |
212 | self.cancel_cv.wait() | |
213 | canceled = True | |
214 | break | |
215 | except (KeyError, ValueError): | |
216 | pass | |
217 | return canceled | |
218 | ||
219 | def cancel_job(self, volname, job): | |
220 | with self.lock: | |
221 | return self._cancel_job(volname, job) | |
222 | ||
92f5a8d4 TL |
223 | def cancel_jobs(self, volname): |
224 | """ | |
225 | cancel all executing jobs for a given volume. | |
226 | """ | |
227 | with self.lock: | |
228 | self._cancel_jobs(volname) | |
229 | ||
230 | def cancel_all_jobs(self): | |
231 | """ | |
232 | call all executing jobs for all volumes. | |
233 | """ | |
234 | with self.lock: | |
235 | for volname in list(self.q): | |
236 | self._cancel_jobs(volname) | |
237 | ||
238 | def get_next_job(self, volname, running_jobs): | |
239 | """ | |
240 | get the next job for asynchronous execution as (retcode, job) tuple. if no | |
241 | jobs are available return (0, None) else return (0, job). on error return | |
242 | (-ret, None). called under `self.lock`. | |
243 | """ | |
244 | raise NotImplementedException() | |
245 | ||
246 | def execute_job(self, volname, job, should_cancel): | |
247 | """ | |
248 | execute a job for a volume. the job can block on I/O operations, sleep for long | |
249 | hours and do all kinds of synchronous work. called outside `self.lock`. | |
250 | """ | |
251 | raise NotImplementedException() | |
252 |