]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/volumes/fs/async_job.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / async_job.py
CommitLineData
92f5a8d4
TL
1import sys
2import time
3import logging
4import threading
5import traceback
6from collections import deque
f67539c2 7from mgr_util import lock_timeout_log
92f5a8d4
TL
8
9from .exception import NotImplementedException
10
11log = logging.getLogger(__name__)
12
13class JobThread(threading.Thread):
14 # this is "not" configurable and there is no need for it to be
15 # configurable. if a thread encounters an exception, we retry
16 # until it hits this many consecutive exceptions.
17 MAX_RETRIES_ON_EXCEPTION = 10
18
19 def __init__(self, async_job, volume_client, name):
20 self.vc = volume_client
21 self.async_job = async_job
22 # event object to cancel jobs
23 self.cancel_event = threading.Event()
24 threading.Thread.__init__(self, name=name)
25
26 def run(self):
27 retries = 0
28 thread_id = threading.currentThread()
9f95a23c 29 assert isinstance(thread_id, JobThread)
92f5a8d4 30 thread_name = thread_id.getName()
f91f0fd5 31 log.debug("thread [{0}] starting".format(thread_name))
92f5a8d4
TL
32
33 while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
34 vol_job = None
35 try:
36 # fetch next job to execute
37 with self.async_job.lock:
38 while True:
f91f0fd5
TL
39 if self.should_reconfigure_num_threads():
40 log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
41 self.async_job.threads.remove(self)
42 return
92f5a8d4
TL
43 vol_job = self.async_job.get_job()
44 if vol_job:
45 break
46 self.async_job.cv.wait()
47 self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id)
48
49 # execute the job (outside lock)
50 self.async_job.execute_job(vol_job[0], vol_job[1], should_cancel=lambda: thread_id.should_cancel())
51 retries = 0
52 except NotImplementedException:
53 raise
54 except Exception:
55 # unless the jobs fetching and execution routines are not implemented
56 # retry till we hit cap limit.
57 retries += 1
58 log.warning("thread [{0}] encountered fatal error: (attempt#" \
59 " {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION))
60 exc_type, exc_value, exc_traceback = sys.exc_info()
61 log.warning("traceback: {0}".format("".join(
62 traceback.format_exception(exc_type, exc_value, exc_traceback))))
63 finally:
64 # when done, unregister the job
65 if vol_job:
66 with self.async_job.lock:
67 self.async_job.unregister_async_job(vol_job[0], vol_job[1], thread_id)
68 time.sleep(1)
69 log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
70 self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
f91f0fd5
TL
71 with self.async_job.lock:
72 self.async_job.threads.remove(self)
73
74 def should_reconfigure_num_threads(self):
75 # reconfigure of max_concurrent_clones
76 return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs
92f5a8d4
TL
77
78 def cancel_job(self):
79 self.cancel_event.set()
80
81 def should_cancel(self):
9f95a23c 82 return self.cancel_event.is_set()
92f5a8d4
TL
83
84 def reset_cancel(self):
85 self.cancel_event.clear()
86
f67539c2 87class AsyncJobs(threading.Thread):
92f5a8d4
TL
88 """
89 Class providing asynchronous execution of jobs via worker threads.
90 `jobs` are grouped by `volume`, so a `volume` can have N number of
91 `jobs` executing concurrently (capped by number of concurrent jobs).
92
93 Usability is simple: subclass this and implement the following:
94 - get_next_job(volname, running_jobs)
95 - execute_job(volname, job, should_cancel)
96
97 ... and do not forget to invoke base class constructor.
98
99 Job cancelation is for a volume as a whole, i.e., all executing jobs
100 for a volume are canceled. Cancelation is poll based -- jobs need to
101 periodically check if cancelation is requested, after which the job
102 should return as soon as possible. Cancelation check is provided
103 via `should_cancel()` lambda passed to `execute_job()`.
104 """
105
106 def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
f67539c2 107 threading.Thread.__init__(self, name="{0}.tick".format(name_pfx))
92f5a8d4
TL
108 self.vc = volume_client
109 # queue of volumes for starting async jobs
9f95a23c 110 self.q = deque() # type: deque
92f5a8d4
TL
111 # volume => job tracking
112 self.jobs = {}
113 # lock, cv for kickstarting jobs
114 self.lock = threading.Lock()
115 self.cv = threading.Condition(self.lock)
116 # cv for job cancelation
117 self.waiting = False
f67539c2 118 self.stopping = threading.Event()
92f5a8d4 119 self.cancel_cv = threading.Condition(self.lock)
f91f0fd5 120 self.nr_concurrent_jobs = nr_concurrent_jobs
f67539c2 121 self.name_pfx = name_pfx
92f5a8d4
TL
122
123 self.threads = []
f67539c2
TL
124 for i in range(self.nr_concurrent_jobs):
125 self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(self.name_pfx, i)))
92f5a8d4 126 self.threads[-1].start()
f67539c2 127 self.start()
92f5a8d4 128
f67539c2
TL
129 def run(self):
130 log.debug("tick thread {} starting".format(self.name))
131 with lock_timeout_log(self.lock):
132 while not self.stopping.is_set():
133 c = len(self.threads)
134 if c > self.nr_concurrent_jobs:
135 # Decrease concurrency: notify threads which are waiting for a job to terminate.
136 log.debug("waking threads to terminate due to job reduction")
137 self.cv.notifyAll()
138 elif c < self.nr_concurrent_jobs:
139 # Increase concurrency: create more threads.
140 log.debug("creating new threads to job increase")
141 for i in range(c, self.nr_concurrent_jobs):
142 self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(self.name_pfx, time.time(), i)))
143 self.threads[-1].start()
144 self.cv.wait(timeout=5)
145
146 def shutdown(self):
147 self.stopping.set()
148 self.cancel_all_jobs()
149 with self.lock:
150 self.cv.notifyAll()
151 self.join()
152
153 def reconfigure_max_async_threads(self, nr_concurrent_jobs):
f91f0fd5
TL
154 """
155 reconfigure number of cloner threads
156 """
f67539c2 157 self.nr_concurrent_jobs = nr_concurrent_jobs
f91f0fd5 158
92f5a8d4
TL
159 def get_job(self):
160 log.debug("processing {0} volume entries".format(len(self.q)))
161 nr_vols = len(self.q)
162 to_remove = []
163 next_job = None
164 while nr_vols > 0:
165 volname = self.q[0]
166 # do this now so that the other thread pick up jobs for other volumes
167 self.q.rotate(1)
168 running_jobs = [j[0] for j in self.jobs[volname]]
169 (ret, job) = self.get_next_job(volname, running_jobs)
170 if job:
171 next_job = (volname, job)
172 break
173 # this is an optimization when for a given volume there are no more
174 # jobs and no jobs are in progress. in such cases we remove the volume
175 # from the tracking list so as to:
176 #
177 # a. not query the filesystem for jobs over and over again
178 # b. keep the filesystem connection idle so that it can be freed
179 # from the connection pool
180 #
181 # if at all there are jobs for a volume, the volume gets added again
182 # to the tracking list and the jobs get kickstarted.
183 # note that, we do not iterate the volume list fully if there is a
184 # jobs to process (that will take place eventually).
185 if ret == 0 and not job and not running_jobs:
186 to_remove.append(volname)
187 nr_vols -= 1
188 for vol in to_remove:
189 log.debug("auto removing volume '{0}' from tracked volumes".format(vol))
190 self.q.remove(vol)
191 self.jobs.pop(vol)
192 return next_job
193
194 def register_async_job(self, volname, job, thread_id):
195 log.debug("registering async job {0}.{1} with thread {2}".format(volname, job, thread_id))
196 self.jobs[volname].append((job, thread_id))
197
198 def unregister_async_job(self, volname, job, thread_id):
199 log.debug("unregistering async job {0}.{1} from thread {2}".format(volname, job, thread_id))
200 self.jobs[volname].remove((job, thread_id))
201
202 cancelled = thread_id.should_cancel()
203 thread_id.reset_cancel()
204
205 # wake up cancellation waiters if needed
9f95a23c 206 if cancelled:
92f5a8d4
TL
207 logging.info("waking up cancellation waiters")
208 self.cancel_cv.notifyAll()
209
210 def queue_job(self, volname):
211 """
212 queue a volume for asynchronous job execution.
213 """
214 log.info("queuing job for volume '{0}'".format(volname))
f67539c2 215 with lock_timeout_log(self.lock):
92f5a8d4
TL
216 if not volname in self.q:
217 self.q.append(volname)
218 self.jobs[volname] = []
219 self.cv.notifyAll()
220
221 def _cancel_jobs(self, volname):
222 """
223 cancel all jobs for the volume. do nothing is the no jobs are
224 executing for the given volume. this would wait until all jobs
225 get interrupted and finish execution.
226 """
227 log.info("cancelling jobs for volume '{0}'".format(volname))
228 try:
229 if not volname in self.q and not volname in self.jobs:
230 return
231 self.q.remove(volname)
232 # cancel in-progress operation and wait until complete
233 for j in self.jobs[volname]:
234 j[1].cancel_job()
235 # wait for cancellation to complete
236 while self.jobs[volname]:
237 log.debug("waiting for {0} in-progress jobs for volume '{1}' to " \
238 "cancel".format(len(self.jobs[volname]), volname))
239 self.cancel_cv.wait()
240 self.jobs.pop(volname)
241 except (KeyError, ValueError):
242 pass
243
9f95a23c
TL
244 def _cancel_job(self, volname, job):
245 """
246 cancel a executing job for a given volume. return True if canceled, False
247 otherwise (volume/job not found).
248 """
249 canceled = False
250 log.info("canceling job {0} for volume {1}".format(job, volname))
251 try:
252 if not volname in self.q and not volname in self.jobs and not job in self.jobs[volname]:
253 return canceled
254 for j in self.jobs[volname]:
255 if j[0] == job:
256 j[1].cancel_job()
257 # be safe against _cancel_jobs() running concurrently
258 while j in self.jobs.get(volname, []):
259 self.cancel_cv.wait()
260 canceled = True
261 break
262 except (KeyError, ValueError):
263 pass
264 return canceled
265
266 def cancel_job(self, volname, job):
f67539c2 267 with lock_timeout_log(self.lock):
9f95a23c
TL
268 return self._cancel_job(volname, job)
269
92f5a8d4
TL
270 def cancel_jobs(self, volname):
271 """
272 cancel all executing jobs for a given volume.
273 """
f67539c2 274 with lock_timeout_log(self.lock):
92f5a8d4
TL
275 self._cancel_jobs(volname)
276
277 def cancel_all_jobs(self):
278 """
279 call all executing jobs for all volumes.
280 """
f67539c2 281 with lock_timeout_log(self.lock):
92f5a8d4
TL
282 for volname in list(self.q):
283 self._cancel_jobs(volname)
284
285 def get_next_job(self, volname, running_jobs):
286 """
287 get the next job for asynchronous execution as (retcode, job) tuple. if no
288 jobs are available return (0, None) else return (0, job). on error return
289 (-ret, None). called under `self.lock`.
290 """
291 raise NotImplementedException()
292
293 def execute_job(self, volname, job, should_cancel):
294 """
295 execute a job for a volume. the job can block on I/O operations, sleep for long
296 hours and do all kinds of synchronous work. called outside `self.lock`.
297 """
298 raise NotImplementedException()
299