]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/volumes/fs/async_job.py
import ceph 16.2.6
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / async_job.py
CommitLineData
92f5a8d4
TL
1import sys
2import time
3import logging
4import threading
5import traceback
6from collections import deque
522d829b 7from mgr_util import lock_timeout_log, CephfsClient
92f5a8d4
TL
8
9from .exception import NotImplementedException
10
11log = logging.getLogger(__name__)
12
13class JobThread(threading.Thread):
14 # this is "not" configurable and there is no need for it to be
15 # configurable. if a thread encounters an exception, we retry
16 # until it hits this many consecutive exceptions.
17 MAX_RETRIES_ON_EXCEPTION = 10
18
19 def __init__(self, async_job, volume_client, name):
20 self.vc = volume_client
21 self.async_job = async_job
22 # event object to cancel jobs
23 self.cancel_event = threading.Event()
24 threading.Thread.__init__(self, name=name)
25
26 def run(self):
27 retries = 0
28 thread_id = threading.currentThread()
9f95a23c 29 assert isinstance(thread_id, JobThread)
92f5a8d4 30 thread_name = thread_id.getName()
f91f0fd5 31 log.debug("thread [{0}] starting".format(thread_name))
92f5a8d4
TL
32
33 while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
34 vol_job = None
35 try:
36 # fetch next job to execute
37 with self.async_job.lock:
38 while True:
f91f0fd5
TL
39 if self.should_reconfigure_num_threads():
40 log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
41 self.async_job.threads.remove(self)
42 return
92f5a8d4
TL
43 vol_job = self.async_job.get_job()
44 if vol_job:
45 break
46 self.async_job.cv.wait()
47 self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id)
48
49 # execute the job (outside lock)
50 self.async_job.execute_job(vol_job[0], vol_job[1], should_cancel=lambda: thread_id.should_cancel())
51 retries = 0
52 except NotImplementedException:
53 raise
54 except Exception:
55 # unless the jobs fetching and execution routines are not implemented
56 # retry till we hit cap limit.
57 retries += 1
58 log.warning("thread [{0}] encountered fatal error: (attempt#" \
59 " {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION))
60 exc_type, exc_value, exc_traceback = sys.exc_info()
61 log.warning("traceback: {0}".format("".join(
62 traceback.format_exception(exc_type, exc_value, exc_traceback))))
63 finally:
64 # when done, unregister the job
65 if vol_job:
66 with self.async_job.lock:
67 self.async_job.unregister_async_job(vol_job[0], vol_job[1], thread_id)
68 time.sleep(1)
69 log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
70 self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
f91f0fd5
TL
71 with self.async_job.lock:
72 self.async_job.threads.remove(self)
73
74 def should_reconfigure_num_threads(self):
75 # reconfigure of max_concurrent_clones
76 return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs
92f5a8d4
TL
77
78 def cancel_job(self):
79 self.cancel_event.set()
80
81 def should_cancel(self):
9f95a23c 82 return self.cancel_event.is_set()
92f5a8d4
TL
83
84 def reset_cancel(self):
85 self.cancel_event.clear()
86
f67539c2 87class AsyncJobs(threading.Thread):
92f5a8d4
TL
88 """
89 Class providing asynchronous execution of jobs via worker threads.
90 `jobs` are grouped by `volume`, so a `volume` can have N number of
91 `jobs` executing concurrently (capped by number of concurrent jobs).
92
93 Usability is simple: subclass this and implement the following:
94 - get_next_job(volname, running_jobs)
95 - execute_job(volname, job, should_cancel)
96
97 ... and do not forget to invoke base class constructor.
98
99 Job cancelation is for a volume as a whole, i.e., all executing jobs
100 for a volume are canceled. Cancelation is poll based -- jobs need to
101 periodically check if cancelation is requested, after which the job
102 should return as soon as possible. Cancelation check is provided
103 via `should_cancel()` lambda passed to `execute_job()`.
104 """
105
106 def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
f67539c2 107 threading.Thread.__init__(self, name="{0}.tick".format(name_pfx))
92f5a8d4
TL
108 self.vc = volume_client
109 # queue of volumes for starting async jobs
9f95a23c 110 self.q = deque() # type: deque
92f5a8d4
TL
111 # volume => job tracking
112 self.jobs = {}
113 # lock, cv for kickstarting jobs
114 self.lock = threading.Lock()
115 self.cv = threading.Condition(self.lock)
116 # cv for job cancelation
117 self.waiting = False
f67539c2 118 self.stopping = threading.Event()
92f5a8d4 119 self.cancel_cv = threading.Condition(self.lock)
f91f0fd5 120 self.nr_concurrent_jobs = nr_concurrent_jobs
f67539c2 121 self.name_pfx = name_pfx
522d829b
TL
122 # each async job group uses its own libcephfs connection (pool)
123 self.fs_client = CephfsClient(self.vc.mgr)
92f5a8d4
TL
124
125 self.threads = []
f67539c2
TL
126 for i in range(self.nr_concurrent_jobs):
127 self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(self.name_pfx, i)))
92f5a8d4 128 self.threads[-1].start()
f67539c2 129 self.start()
92f5a8d4 130
f67539c2
TL
131 def run(self):
132 log.debug("tick thread {} starting".format(self.name))
133 with lock_timeout_log(self.lock):
134 while not self.stopping.is_set():
135 c = len(self.threads)
136 if c > self.nr_concurrent_jobs:
137 # Decrease concurrency: notify threads which are waiting for a job to terminate.
138 log.debug("waking threads to terminate due to job reduction")
139 self.cv.notifyAll()
140 elif c < self.nr_concurrent_jobs:
141 # Increase concurrency: create more threads.
142 log.debug("creating new threads to job increase")
143 for i in range(c, self.nr_concurrent_jobs):
144 self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(self.name_pfx, time.time(), i)))
145 self.threads[-1].start()
146 self.cv.wait(timeout=5)
147
148 def shutdown(self):
149 self.stopping.set()
150 self.cancel_all_jobs()
151 with self.lock:
152 self.cv.notifyAll()
153 self.join()
154
155 def reconfigure_max_async_threads(self, nr_concurrent_jobs):
f91f0fd5
TL
156 """
157 reconfigure number of cloner threads
158 """
f67539c2 159 self.nr_concurrent_jobs = nr_concurrent_jobs
f91f0fd5 160
92f5a8d4
TL
161 def get_job(self):
162 log.debug("processing {0} volume entries".format(len(self.q)))
163 nr_vols = len(self.q)
164 to_remove = []
165 next_job = None
166 while nr_vols > 0:
167 volname = self.q[0]
168 # do this now so that the other thread pick up jobs for other volumes
169 self.q.rotate(1)
170 running_jobs = [j[0] for j in self.jobs[volname]]
171 (ret, job) = self.get_next_job(volname, running_jobs)
172 if job:
173 next_job = (volname, job)
174 break
175 # this is an optimization when for a given volume there are no more
176 # jobs and no jobs are in progress. in such cases we remove the volume
177 # from the tracking list so as to:
178 #
179 # a. not query the filesystem for jobs over and over again
180 # b. keep the filesystem connection idle so that it can be freed
181 # from the connection pool
182 #
183 # if at all there are jobs for a volume, the volume gets added again
184 # to the tracking list and the jobs get kickstarted.
185 # note that, we do not iterate the volume list fully if there is a
186 # jobs to process (that will take place eventually).
187 if ret == 0 and not job and not running_jobs:
188 to_remove.append(volname)
189 nr_vols -= 1
190 for vol in to_remove:
191 log.debug("auto removing volume '{0}' from tracked volumes".format(vol))
192 self.q.remove(vol)
193 self.jobs.pop(vol)
194 return next_job
195
196 def register_async_job(self, volname, job, thread_id):
197 log.debug("registering async job {0}.{1} with thread {2}".format(volname, job, thread_id))
198 self.jobs[volname].append((job, thread_id))
199
200 def unregister_async_job(self, volname, job, thread_id):
201 log.debug("unregistering async job {0}.{1} from thread {2}".format(volname, job, thread_id))
202 self.jobs[volname].remove((job, thread_id))
203
204 cancelled = thread_id.should_cancel()
205 thread_id.reset_cancel()
206
207 # wake up cancellation waiters if needed
9f95a23c 208 if cancelled:
92f5a8d4
TL
209 logging.info("waking up cancellation waiters")
210 self.cancel_cv.notifyAll()
211
212 def queue_job(self, volname):
213 """
214 queue a volume for asynchronous job execution.
215 """
216 log.info("queuing job for volume '{0}'".format(volname))
f67539c2 217 with lock_timeout_log(self.lock):
92f5a8d4
TL
218 if not volname in self.q:
219 self.q.append(volname)
220 self.jobs[volname] = []
221 self.cv.notifyAll()
222
223 def _cancel_jobs(self, volname):
224 """
225 cancel all jobs for the volume. do nothing is the no jobs are
226 executing for the given volume. this would wait until all jobs
227 get interrupted and finish execution.
228 """
229 log.info("cancelling jobs for volume '{0}'".format(volname))
230 try:
231 if not volname in self.q and not volname in self.jobs:
232 return
233 self.q.remove(volname)
234 # cancel in-progress operation and wait until complete
235 for j in self.jobs[volname]:
236 j[1].cancel_job()
237 # wait for cancellation to complete
238 while self.jobs[volname]:
239 log.debug("waiting for {0} in-progress jobs for volume '{1}' to " \
240 "cancel".format(len(self.jobs[volname]), volname))
241 self.cancel_cv.wait()
242 self.jobs.pop(volname)
243 except (KeyError, ValueError):
244 pass
245
9f95a23c
TL
246 def _cancel_job(self, volname, job):
247 """
248 cancel a executing job for a given volume. return True if canceled, False
249 otherwise (volume/job not found).
250 """
251 canceled = False
252 log.info("canceling job {0} for volume {1}".format(job, volname))
253 try:
254 if not volname in self.q and not volname in self.jobs and not job in self.jobs[volname]:
255 return canceled
256 for j in self.jobs[volname]:
257 if j[0] == job:
258 j[1].cancel_job()
259 # be safe against _cancel_jobs() running concurrently
260 while j in self.jobs.get(volname, []):
261 self.cancel_cv.wait()
262 canceled = True
263 break
264 except (KeyError, ValueError):
265 pass
266 return canceled
267
268 def cancel_job(self, volname, job):
f67539c2 269 with lock_timeout_log(self.lock):
9f95a23c
TL
270 return self._cancel_job(volname, job)
271
92f5a8d4
TL
272 def cancel_jobs(self, volname):
273 """
274 cancel all executing jobs for a given volume.
275 """
f67539c2 276 with lock_timeout_log(self.lock):
92f5a8d4
TL
277 self._cancel_jobs(volname)
278
279 def cancel_all_jobs(self):
280 """
281 call all executing jobs for all volumes.
282 """
f67539c2 283 with lock_timeout_log(self.lock):
92f5a8d4
TL
284 for volname in list(self.q):
285 self._cancel_jobs(volname)
286
287 def get_next_job(self, volname, running_jobs):
288 """
289 get the next job for asynchronous execution as (retcode, job) tuple. if no
290 jobs are available return (0, None) else return (0, job). on error return
291 (-ret, None). called under `self.lock`.
292 """
293 raise NotImplementedException()
294
295 def execute_job(self, volname, job, should_cancel):
296 """
297 execute a job for a volume. the job can block on I/O operations, sleep for long
298 hours and do all kinds of synchronous work. called outside `self.lock`.
299 """
300 raise NotImplementedException()
301