]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/async_job.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / async_job.py
1 import sys
2 import time
3 import logging
4 import threading
5 import traceback
6 from collections import deque
7 from mgr_util import lock_timeout_log, CephfsClient
8
9 from .exception import NotImplementedException
10
11 log = logging.getLogger(__name__)
12
13
14 class JobThread(threading.Thread):
15 # this is "not" configurable and there is no need for it to be
16 # configurable. if a thread encounters an exception, we retry
17 # until it hits this many consecutive exceptions.
18 MAX_RETRIES_ON_EXCEPTION = 10
19
20 def __init__(self, async_job, volume_client, name):
21 self.vc = volume_client
22 self.async_job = async_job
23 # event object to cancel jobs
24 self.cancel_event = threading.Event()
25 threading.Thread.__init__(self, name=name)
26
27 def run(self):
28 retries = 0
29 thread_id = threading.currentThread()
30 assert isinstance(thread_id, JobThread)
31 thread_name = thread_id.getName()
32 log.debug("thread [{0}] starting".format(thread_name))
33
34 while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
35 vol_job = None
36 try:
37 # fetch next job to execute
38 with self.async_job.lock:
39 while True:
40 if self.should_reconfigure_num_threads():
41 log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
42 self.async_job.threads.remove(self)
43 return
44 vol_job = self.async_job.get_job()
45 if vol_job:
46 break
47 self.async_job.cv.wait()
48 self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id)
49
50 # execute the job (outside lock)
51 self.async_job.execute_job(vol_job[0], vol_job[1], should_cancel=lambda: thread_id.should_cancel())
52 retries = 0
53 except NotImplementedException:
54 raise
55 except Exception:
56 # unless the jobs fetching and execution routines are not implemented
57 # retry till we hit cap limit.
58 retries += 1
59 log.warning("thread [{0}] encountered fatal error: (attempt#"
60 " {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION))
61 exc_type, exc_value, exc_traceback = sys.exc_info()
62 log.warning("traceback: {0}".format("".join(
63 traceback.format_exception(exc_type, exc_value, exc_traceback))))
64 finally:
65 # when done, unregister the job
66 if vol_job:
67 with self.async_job.lock:
68 self.async_job.unregister_async_job(vol_job[0], vol_job[1], thread_id)
69 time.sleep(1)
70 log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
71 self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
72 with self.async_job.lock:
73 self.async_job.threads.remove(self)
74
75 def should_reconfigure_num_threads(self):
76 # reconfigure of max_concurrent_clones
77 return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs
78
79 def cancel_job(self):
80 self.cancel_event.set()
81
82 def should_cancel(self):
83 return self.cancel_event.is_set()
84
85 def reset_cancel(self):
86 self.cancel_event.clear()
87
88
89 class AsyncJobs(threading.Thread):
90 """
91 Class providing asynchronous execution of jobs via worker threads.
92 `jobs` are grouped by `volume`, so a `volume` can have N number of
93 `jobs` executing concurrently (capped by number of concurrent jobs).
94
95 Usability is simple: subclass this and implement the following:
96 - get_next_job(volname, running_jobs)
97 - execute_job(volname, job, should_cancel)
98
99 ... and do not forget to invoke base class constructor.
100
101 Job cancelation is for a volume as a whole, i.e., all executing jobs
102 for a volume are canceled. Cancelation is poll based -- jobs need to
103 periodically check if cancelation is requested, after which the job
104 should return as soon as possible. Cancelation check is provided
105 via `should_cancel()` lambda passed to `execute_job()`.
106 """
107
108 def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
109 threading.Thread.__init__(self, name="{0}.tick".format(name_pfx))
110 self.vc = volume_client
111 # queue of volumes for starting async jobs
112 self.q = deque() # type: deque
113 # volume => job tracking
114 self.jobs = {}
115 # lock, cv for kickstarting jobs
116 self.lock = threading.Lock()
117 self.cv = threading.Condition(self.lock)
118 # cv for job cancelation
119 self.waiting = False
120 self.stopping = threading.Event()
121 self.cancel_cv = threading.Condition(self.lock)
122 self.nr_concurrent_jobs = nr_concurrent_jobs
123 self.name_pfx = name_pfx
124 # each async job group uses its own libcephfs connection (pool)
125 self.fs_client = CephfsClient(self.vc.mgr)
126
127 self.threads = []
128 for i in range(self.nr_concurrent_jobs):
129 self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(self.name_pfx, i)))
130 self.threads[-1].start()
131 self.start()
132
133 def run(self):
134 log.debug("tick thread {} starting".format(self.name))
135 with lock_timeout_log(self.lock):
136 while not self.stopping.is_set():
137 c = len(self.threads)
138 if c > self.nr_concurrent_jobs:
139 # Decrease concurrency: notify threads which are waiting for a job to terminate.
140 log.debug("waking threads to terminate due to job reduction")
141 self.cv.notifyAll()
142 elif c < self.nr_concurrent_jobs:
143 # Increase concurrency: create more threads.
144 log.debug("creating new threads to job increase")
145 for i in range(c, self.nr_concurrent_jobs):
146 self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(self.name_pfx, time.time(), i)))
147 self.threads[-1].start()
148 self.cv.wait(timeout=5)
149
150 def shutdown(self):
151 self.stopping.set()
152 self.cancel_all_jobs()
153 with self.lock:
154 self.cv.notifyAll()
155 self.join()
156
157 def reconfigure_max_async_threads(self, nr_concurrent_jobs):
158 """
159 reconfigure number of cloner threads
160 """
161 self.nr_concurrent_jobs = nr_concurrent_jobs
162
163 def get_job(self):
164 log.debug("processing {0} volume entries".format(len(self.q)))
165 nr_vols = len(self.q)
166 to_remove = []
167 next_job = None
168 while nr_vols > 0:
169 volname = self.q[0]
170 # do this now so that the other thread pick up jobs for other volumes
171 self.q.rotate(1)
172 running_jobs = [j[0] for j in self.jobs[volname]]
173 (ret, job) = self.get_next_job(volname, running_jobs)
174 if job:
175 next_job = (volname, job)
176 break
177 # this is an optimization when for a given volume there are no more
178 # jobs and no jobs are in progress. in such cases we remove the volume
179 # from the tracking list so as to:
180 #
181 # a. not query the filesystem for jobs over and over again
182 # b. keep the filesystem connection idle so that it can be freed
183 # from the connection pool
184 #
185 # if at all there are jobs for a volume, the volume gets added again
186 # to the tracking list and the jobs get kickstarted.
187 # note that, we do not iterate the volume list fully if there is a
188 # jobs to process (that will take place eventually).
189 if ret == 0 and not job and not running_jobs:
190 to_remove.append(volname)
191 nr_vols -= 1
192 for vol in to_remove:
193 log.debug("auto removing volume '{0}' from tracked volumes".format(vol))
194 self.q.remove(vol)
195 self.jobs.pop(vol)
196 return next_job
197
198 def register_async_job(self, volname, job, thread_id):
199 log.debug("registering async job {0}.{1} with thread {2}".format(volname, job, thread_id))
200 self.jobs[volname].append((job, thread_id))
201
202 def unregister_async_job(self, volname, job, thread_id):
203 log.debug("unregistering async job {0}.{1} from thread {2}".format(volname, job, thread_id))
204 self.jobs[volname].remove((job, thread_id))
205
206 cancelled = thread_id.should_cancel()
207 thread_id.reset_cancel()
208
209 # wake up cancellation waiters if needed
210 if cancelled:
211 logging.info("waking up cancellation waiters")
212 self.cancel_cv.notifyAll()
213
214 def queue_job(self, volname):
215 """
216 queue a volume for asynchronous job execution.
217 """
218 log.info("queuing job for volume '{0}'".format(volname))
219 with lock_timeout_log(self.lock):
220 if volname not in self.q:
221 self.q.append(volname)
222 self.jobs[volname] = []
223 self.cv.notifyAll()
224
225 def _cancel_jobs(self, volname):
226 """
227 cancel all jobs for the volume. do nothing is the no jobs are
228 executing for the given volume. this would wait until all jobs
229 get interrupted and finish execution.
230 """
231 log.info("cancelling jobs for volume '{0}'".format(volname))
232 try:
233 if volname not in self.q and volname not in self.jobs:
234 return
235 self.q.remove(volname)
236 # cancel in-progress operation and wait until complete
237 for j in self.jobs[volname]:
238 j[1].cancel_job()
239 # wait for cancellation to complete
240 while self.jobs[volname]:
241 log.debug("waiting for {0} in-progress jobs for volume '{1}' to "
242 "cancel".format(len(self.jobs[volname]), volname))
243 self.cancel_cv.wait()
244 self.jobs.pop(volname)
245 except (KeyError, ValueError):
246 pass
247
248 def _cancel_job(self, volname, job):
249 """
250 cancel a executing job for a given volume. return True if canceled, False
251 otherwise (volume/job not found).
252 """
253 canceled = False
254 log.info("canceling job {0} for volume {1}".format(job, volname))
255 try:
256 vol_jobs = [j[0] for j in self.jobs.get(volname, [])]
257 if volname not in self.q and job not in vol_jobs:
258 return canceled
259 for j in self.jobs[volname]:
260 if j[0] == job:
261 j[1].cancel_job()
262 # be safe against _cancel_jobs() running concurrently
263 while j in self.jobs.get(volname, []):
264 self.cancel_cv.wait()
265 canceled = True
266 break
267 except (KeyError, ValueError):
268 pass
269 return canceled
270
271 def cancel_job(self, volname, job):
272 with lock_timeout_log(self.lock):
273 return self._cancel_job(volname, job)
274
275 def cancel_jobs(self, volname):
276 """
277 cancel all executing jobs for a given volume.
278 """
279 with lock_timeout_log(self.lock):
280 self._cancel_jobs(volname)
281
282 def cancel_all_jobs(self):
283 """
284 call all executing jobs for all volumes.
285 """
286 with lock_timeout_log(self.lock):
287 for volname in list(self.q):
288 self._cancel_jobs(volname)
289
290 def get_next_job(self, volname, running_jobs):
291 """
292 get the next job for asynchronous execution as (retcode, job) tuple. if no
293 jobs are available return (0, None) else return (0, job). on error return
294 (-ret, None). called under `self.lock`.
295 """
296 raise NotImplementedException()
297
298 def execute_job(self, volname, job, should_cancel):
299 """
300 execute a job for a volume. the job can block on I/O operations, sleep for long
301 hours and do all kinds of synchronous work. called outside `self.lock`.
302 """
303 raise NotImplementedException()