]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/async_job.py
6 from collections
import deque
7 from mgr_util
import lock_timeout_log
, CephfsClient
9 from .exception
import NotImplementedException
11 log
= logging
.getLogger(__name__
)
14 class JobThread(threading
.Thread
):
15 # this is "not" configurable and there is no need for it to be
16 # configurable. if a thread encounters an exception, we retry
17 # until it hits this many consecutive exceptions.
18 MAX_RETRIES_ON_EXCEPTION
= 10
20 def __init__(self
, async_job
, volume_client
, name
):
21 self
.vc
= volume_client
22 self
.async_job
= async_job
23 # event object to cancel jobs
24 self
.cancel_event
= threading
.Event()
25 threading
.Thread
.__init
__(self
, name
=name
)
29 thread_id
= threading
.currentThread()
30 assert isinstance(thread_id
, JobThread
)
31 thread_name
= thread_id
.getName()
32 log
.debug("thread [{0}] starting".format(thread_name
))
34 while retries
< JobThread
.MAX_RETRIES_ON_EXCEPTION
:
37 # fetch next job to execute
38 with self
.async_job
.lock
:
40 if self
.should_reconfigure_num_threads():
41 log
.info("thread [{0}] terminating due to reconfigure".format(thread_name
))
42 self
.async_job
.threads
.remove(self
)
44 vol_job
= self
.async_job
.get_job()
47 self
.async_job
.cv
.wait()
48 self
.async_job
.register_async_job(vol_job
[0], vol_job
[1], thread_id
)
50 # execute the job (outside lock)
51 self
.async_job
.execute_job(vol_job
[0], vol_job
[1], should_cancel
=lambda: thread_id
.should_cancel())
53 except NotImplementedException
:
56 # unless the jobs fetching and execution routines are not implemented
57 # retry till we hit cap limit.
59 log
.warning("thread [{0}] encountered fatal error: (attempt#"
60 " {1}/{2})".format(thread_name
, retries
, JobThread
.MAX_RETRIES_ON_EXCEPTION
))
61 exc_type
, exc_value
, exc_traceback
= sys
.exc_info()
62 log
.warning("traceback: {0}".format("".join(
63 traceback
.format_exception(exc_type
, exc_value
, exc_traceback
))))
65 # when done, unregister the job
67 with self
.async_job
.lock
:
68 self
.async_job
.unregister_async_job(vol_job
[0], vol_job
[1], thread_id
)
70 log
.error("thread [{0}] reached exception limit, bailing out...".format(thread_name
))
71 self
.vc
.cluster_log("thread {0} bailing out due to exception".format(thread_name
))
72 with self
.async_job
.lock
:
73 self
.async_job
.threads
.remove(self
)
75 def should_reconfigure_num_threads(self
):
76 # reconfigure of max_concurrent_clones
77 return len(self
.async_job
.threads
) > self
.async_job
.nr_concurrent_jobs
80 self
.cancel_event
.set()
82 def should_cancel(self
):
83 return self
.cancel_event
.is_set()
85 def reset_cancel(self
):
86 self
.cancel_event
.clear()
89 class AsyncJobs(threading
.Thread
):
91 Class providing asynchronous execution of jobs via worker threads.
92 `jobs` are grouped by `volume`, so a `volume` can have N number of
93 `jobs` executing concurrently (capped by number of concurrent jobs).
95 Usability is simple: subclass this and implement the following:
96 - get_next_job(volname, running_jobs)
97 - execute_job(volname, job, should_cancel)
99 ... and do not forget to invoke base class constructor.
101 Job cancelation is for a volume as a whole, i.e., all executing jobs
102 for a volume are canceled. Cancelation is poll based -- jobs need to
103 periodically check if cancelation is requested, after which the job
104 should return as soon as possible. Cancelation check is provided
105 via `should_cancel()` lambda passed to `execute_job()`.
108 def __init__(self
, volume_client
, name_pfx
, nr_concurrent_jobs
):
109 threading
.Thread
.__init
__(self
, name
="{0}.tick".format(name_pfx
))
110 self
.vc
= volume_client
111 # queue of volumes for starting async jobs
112 self
.q
= deque() # type: deque
113 # volume => job tracking
115 # lock, cv for kickstarting jobs
116 self
.lock
= threading
.Lock()
117 self
.cv
= threading
.Condition(self
.lock
)
118 # cv for job cancelation
120 self
.stopping
= threading
.Event()
121 self
.cancel_cv
= threading
.Condition(self
.lock
)
122 self
.nr_concurrent_jobs
= nr_concurrent_jobs
123 self
.name_pfx
= name_pfx
124 # each async job group uses its own libcephfs connection (pool)
125 self
.fs_client
= CephfsClient(self
.vc
.mgr
)
128 for i
in range(self
.nr_concurrent_jobs
):
129 self
.threads
.append(JobThread(self
, volume_client
, name
="{0}.{1}".format(self
.name_pfx
, i
)))
130 self
.threads
[-1].start()
134 log
.debug("tick thread {} starting".format(self
.name
))
135 with
lock_timeout_log(self
.lock
):
136 while not self
.stopping
.is_set():
137 c
= len(self
.threads
)
138 if c
> self
.nr_concurrent_jobs
:
139 # Decrease concurrency: notify threads which are waiting for a job to terminate.
140 log
.debug("waking threads to terminate due to job reduction")
142 elif c
< self
.nr_concurrent_jobs
:
143 # Increase concurrency: create more threads.
144 log
.debug("creating new threads to job increase")
145 for i
in range(c
, self
.nr_concurrent_jobs
):
146 self
.threads
.append(JobThread(self
, self
.vc
, name
="{0}.{1}.{2}".format(self
.name_pfx
, time
.time(), i
)))
147 self
.threads
[-1].start()
148 self
.cv
.wait(timeout
=5)
152 self
.cancel_all_jobs()
157 def reconfigure_max_async_threads(self
, nr_concurrent_jobs
):
159 reconfigure number of cloner threads
161 self
.nr_concurrent_jobs
= nr_concurrent_jobs
164 log
.debug("processing {0} volume entries".format(len(self
.q
)))
165 nr_vols
= len(self
.q
)
170 # do this now so that the other thread pick up jobs for other volumes
172 running_jobs
= [j
[0] for j
in self
.jobs
[volname
]]
173 (ret
, job
) = self
.get_next_job(volname
, running_jobs
)
175 next_job
= (volname
, job
)
177 # this is an optimization when for a given volume there are no more
178 # jobs and no jobs are in progress. in such cases we remove the volume
179 # from the tracking list so as to:
181 # a. not query the filesystem for jobs over and over again
182 # b. keep the filesystem connection idle so that it can be freed
183 # from the connection pool
185 # if at all there are jobs for a volume, the volume gets added again
186 # to the tracking list and the jobs get kickstarted.
187 # note that, we do not iterate the volume list fully if there is a
188 # jobs to process (that will take place eventually).
189 if ret
== 0 and not job
and not running_jobs
:
190 to_remove
.append(volname
)
192 for vol
in to_remove
:
193 log
.debug("auto removing volume '{0}' from tracked volumes".format(vol
))
198 def register_async_job(self
, volname
, job
, thread_id
):
199 log
.debug("registering async job {0}.{1} with thread {2}".format(volname
, job
, thread_id
))
200 self
.jobs
[volname
].append((job
, thread_id
))
202 def unregister_async_job(self
, volname
, job
, thread_id
):
203 log
.debug("unregistering async job {0}.{1} from thread {2}".format(volname
, job
, thread_id
))
204 self
.jobs
[volname
].remove((job
, thread_id
))
206 cancelled
= thread_id
.should_cancel()
207 thread_id
.reset_cancel()
209 # wake up cancellation waiters if needed
211 logging
.info("waking up cancellation waiters")
212 self
.cancel_cv
.notifyAll()
214 def queue_job(self
, volname
):
216 queue a volume for asynchronous job execution.
218 log
.info("queuing job for volume '{0}'".format(volname
))
219 with
lock_timeout_log(self
.lock
):
220 if volname
not in self
.q
:
221 self
.q
.append(volname
)
222 self
.jobs
[volname
] = []
225 def _cancel_jobs(self
, volname
):
227 cancel all jobs for the volume. do nothing is the no jobs are
228 executing for the given volume. this would wait until all jobs
229 get interrupted and finish execution.
231 log
.info("cancelling jobs for volume '{0}'".format(volname
))
233 if volname
not in self
.q
and volname
not in self
.jobs
:
235 self
.q
.remove(volname
)
236 # cancel in-progress operation and wait until complete
237 for j
in self
.jobs
[volname
]:
239 # wait for cancellation to complete
240 while self
.jobs
[volname
]:
241 log
.debug("waiting for {0} in-progress jobs for volume '{1}' to "
242 "cancel".format(len(self
.jobs
[volname
]), volname
))
243 self
.cancel_cv
.wait()
244 self
.jobs
.pop(volname
)
245 except (KeyError, ValueError):
248 def _cancel_job(self
, volname
, job
):
250 cancel a executing job for a given volume. return True if canceled, False
251 otherwise (volume/job not found).
254 log
.info("canceling job {0} for volume {1}".format(job
, volname
))
256 vol_jobs
= [j
[0] for j
in self
.jobs
.get(volname
, [])]
257 if volname
not in self
.q
and job
not in vol_jobs
:
259 for j
in self
.jobs
[volname
]:
262 # be safe against _cancel_jobs() running concurrently
263 while j
in self
.jobs
.get(volname
, []):
264 self
.cancel_cv
.wait()
267 except (KeyError, ValueError):
271 def cancel_job(self
, volname
, job
):
272 with
lock_timeout_log(self
.lock
):
273 return self
._cancel
_job
(volname
, job
)
275 def cancel_jobs(self
, volname
):
277 cancel all executing jobs for a given volume.
279 with
lock_timeout_log(self
.lock
):
280 self
._cancel
_jobs
(volname
)
282 def cancel_all_jobs(self
):
284 call all executing jobs for all volumes.
286 with
lock_timeout_log(self
.lock
):
287 for volname
in list(self
.q
):
288 self
._cancel
_jobs
(volname
)
290 def get_next_job(self
, volname
, running_jobs
):
292 get the next job for asynchronous execution as (retcode, job) tuple. if no
293 jobs are available return (0, None) else return (0, job). on error return
294 (-ret, None). called under `self.lock`.
296 raise NotImplementedException()
298 def execute_job(self
, volname
, job
, should_cancel
):
300 execute a job for a volume. the job can block on I/O operations, sleep for long
301 hours and do all kinds of synchronous work. called outside `self.lock`.
303 raise NotImplementedException()