]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/volumes/fs/async_job.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / async_job.py
CommitLineData
92f5a8d4
TL
1import sys
2import time
3import logging
4import threading
5import traceback
6from collections import deque
7
8from .exception import NotImplementedException
9
10log = logging.getLogger(__name__)
11
12class JobThread(threading.Thread):
13 # this is "not" configurable and there is no need for it to be
14 # configurable. if a thread encounters an exception, we retry
15 # until it hits this many consecutive exceptions.
16 MAX_RETRIES_ON_EXCEPTION = 10
17
18 def __init__(self, async_job, volume_client, name):
19 self.vc = volume_client
20 self.async_job = async_job
21 # event object to cancel jobs
22 self.cancel_event = threading.Event()
23 threading.Thread.__init__(self, name=name)
24
25 def run(self):
26 retries = 0
27 thread_id = threading.currentThread()
9f95a23c 28 assert isinstance(thread_id, JobThread)
92f5a8d4
TL
29 thread_name = thread_id.getName()
30
31 while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
32 vol_job = None
33 try:
34 # fetch next job to execute
35 with self.async_job.lock:
36 while True:
37 vol_job = self.async_job.get_job()
38 if vol_job:
39 break
40 self.async_job.cv.wait()
41 self.async_job.register_async_job(vol_job[0], vol_job[1], thread_id)
42
43 # execute the job (outside lock)
44 self.async_job.execute_job(vol_job[0], vol_job[1], should_cancel=lambda: thread_id.should_cancel())
45 retries = 0
46 except NotImplementedException:
47 raise
48 except Exception:
49 # unless the jobs fetching and execution routines are not implemented
50 # retry till we hit cap limit.
51 retries += 1
52 log.warning("thread [{0}] encountered fatal error: (attempt#" \
53 " {1}/{2})".format(thread_name, retries, JobThread.MAX_RETRIES_ON_EXCEPTION))
54 exc_type, exc_value, exc_traceback = sys.exc_info()
55 log.warning("traceback: {0}".format("".join(
56 traceback.format_exception(exc_type, exc_value, exc_traceback))))
57 finally:
58 # when done, unregister the job
59 if vol_job:
60 with self.async_job.lock:
61 self.async_job.unregister_async_job(vol_job[0], vol_job[1], thread_id)
62 time.sleep(1)
63 log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
64 self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
65
66 def cancel_job(self):
67 self.cancel_event.set()
68
69 def should_cancel(self):
9f95a23c 70 return self.cancel_event.is_set()
92f5a8d4
TL
71
72 def reset_cancel(self):
73 self.cancel_event.clear()
74
75class AsyncJobs(object):
76 """
77 Class providing asynchronous execution of jobs via worker threads.
78 `jobs` are grouped by `volume`, so a `volume` can have N number of
79 `jobs` executing concurrently (capped by number of concurrent jobs).
80
81 Usability is simple: subclass this and implement the following:
82 - get_next_job(volname, running_jobs)
83 - execute_job(volname, job, should_cancel)
84
85 ... and do not forget to invoke base class constructor.
86
87 Job cancelation is for a volume as a whole, i.e., all executing jobs
88 for a volume are canceled. Cancelation is poll based -- jobs need to
89 periodically check if cancelation is requested, after which the job
90 should return as soon as possible. Cancelation check is provided
91 via `should_cancel()` lambda passed to `execute_job()`.
92 """
93
94 def __init__(self, volume_client, name_pfx, nr_concurrent_jobs):
95 self.vc = volume_client
96 # queue of volumes for starting async jobs
9f95a23c 97 self.q = deque() # type: deque
92f5a8d4
TL
98 # volume => job tracking
99 self.jobs = {}
100 # lock, cv for kickstarting jobs
101 self.lock = threading.Lock()
102 self.cv = threading.Condition(self.lock)
103 # cv for job cancelation
104 self.waiting = False
105 self.cancel_cv = threading.Condition(self.lock)
106
107 self.threads = []
108 for i in range(nr_concurrent_jobs):
109 self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
110 self.threads[-1].start()
111
112 def get_job(self):
113 log.debug("processing {0} volume entries".format(len(self.q)))
114 nr_vols = len(self.q)
115 to_remove = []
116 next_job = None
117 while nr_vols > 0:
118 volname = self.q[0]
119 # do this now so that the other thread pick up jobs for other volumes
120 self.q.rotate(1)
121 running_jobs = [j[0] for j in self.jobs[volname]]
122 (ret, job) = self.get_next_job(volname, running_jobs)
123 if job:
124 next_job = (volname, job)
125 break
126 # this is an optimization when for a given volume there are no more
127 # jobs and no jobs are in progress. in such cases we remove the volume
128 # from the tracking list so as to:
129 #
130 # a. not query the filesystem for jobs over and over again
131 # b. keep the filesystem connection idle so that it can be freed
132 # from the connection pool
133 #
134 # if at all there are jobs for a volume, the volume gets added again
135 # to the tracking list and the jobs get kickstarted.
136 # note that, we do not iterate the volume list fully if there is a
137 # jobs to process (that will take place eventually).
138 if ret == 0 and not job and not running_jobs:
139 to_remove.append(volname)
140 nr_vols -= 1
141 for vol in to_remove:
142 log.debug("auto removing volume '{0}' from tracked volumes".format(vol))
143 self.q.remove(vol)
144 self.jobs.pop(vol)
145 return next_job
146
147 def register_async_job(self, volname, job, thread_id):
148 log.debug("registering async job {0}.{1} with thread {2}".format(volname, job, thread_id))
149 self.jobs[volname].append((job, thread_id))
150
151 def unregister_async_job(self, volname, job, thread_id):
152 log.debug("unregistering async job {0}.{1} from thread {2}".format(volname, job, thread_id))
153 self.jobs[volname].remove((job, thread_id))
154
155 cancelled = thread_id.should_cancel()
156 thread_id.reset_cancel()
157
158 # wake up cancellation waiters if needed
9f95a23c 159 if cancelled:
92f5a8d4
TL
160 logging.info("waking up cancellation waiters")
161 self.cancel_cv.notifyAll()
162
163 def queue_job(self, volname):
164 """
165 queue a volume for asynchronous job execution.
166 """
167 log.info("queuing job for volume '{0}'".format(volname))
168 with self.lock:
169 if not volname in self.q:
170 self.q.append(volname)
171 self.jobs[volname] = []
172 self.cv.notifyAll()
173
174 def _cancel_jobs(self, volname):
175 """
176 cancel all jobs for the volume. do nothing is the no jobs are
177 executing for the given volume. this would wait until all jobs
178 get interrupted and finish execution.
179 """
180 log.info("cancelling jobs for volume '{0}'".format(volname))
181 try:
182 if not volname in self.q and not volname in self.jobs:
183 return
184 self.q.remove(volname)
185 # cancel in-progress operation and wait until complete
186 for j in self.jobs[volname]:
187 j[1].cancel_job()
188 # wait for cancellation to complete
189 while self.jobs[volname]:
190 log.debug("waiting for {0} in-progress jobs for volume '{1}' to " \
191 "cancel".format(len(self.jobs[volname]), volname))
192 self.cancel_cv.wait()
193 self.jobs.pop(volname)
194 except (KeyError, ValueError):
195 pass
196
9f95a23c
TL
197 def _cancel_job(self, volname, job):
198 """
199 cancel a executing job for a given volume. return True if canceled, False
200 otherwise (volume/job not found).
201 """
202 canceled = False
203 log.info("canceling job {0} for volume {1}".format(job, volname))
204 try:
205 if not volname in self.q and not volname in self.jobs and not job in self.jobs[volname]:
206 return canceled
207 for j in self.jobs[volname]:
208 if j[0] == job:
209 j[1].cancel_job()
210 # be safe against _cancel_jobs() running concurrently
211 while j in self.jobs.get(volname, []):
212 self.cancel_cv.wait()
213 canceled = True
214 break
215 except (KeyError, ValueError):
216 pass
217 return canceled
218
219 def cancel_job(self, volname, job):
220 with self.lock:
221 return self._cancel_job(volname, job)
222
92f5a8d4
TL
223 def cancel_jobs(self, volname):
224 """
225 cancel all executing jobs for a given volume.
226 """
227 with self.lock:
228 self._cancel_jobs(volname)
229
230 def cancel_all_jobs(self):
231 """
232 call all executing jobs for all volumes.
233 """
234 with self.lock:
235 for volname in list(self.q):
236 self._cancel_jobs(volname)
237
238 def get_next_job(self, volname, running_jobs):
239 """
240 get the next job for asynchronous execution as (retcode, job) tuple. if no
241 jobs are available return (0, None) else return (0, job). on error return
242 (-ret, None). called under `self.lock`.
243 """
244 raise NotImplementedException()
245
246 def execute_job(self, volname, job, should_cancel):
247 """
248 execute a job for a volume. the job can block on I/O operations, sleep for long
249 hours and do all kinds of synchronous work. called outside `self.lock`.
250 """
251 raise NotImplementedException()
252