]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/async_job.py
6 from collections
import deque
8 from .exception
import NotImplementedException
10 log
= logging
.getLogger(__name__
)
12 class JobThread(threading
.Thread
):
13 # this is "not" configurable and there is no need for it to be
14 # configurable. if a thread encounters an exception, we retry
15 # until it hits this many consecutive exceptions.
16 MAX_RETRIES_ON_EXCEPTION
= 10
18 def __init__(self
, async_job
, volume_client
, name
):
19 self
.vc
= volume_client
20 self
.async_job
= async_job
21 # event object to cancel jobs
22 self
.cancel_event
= threading
.Event()
23 threading
.Thread
.__init
__(self
, name
=name
)
27 thread_id
= threading
.currentThread()
28 assert isinstance(thread_id
, JobThread
)
29 thread_name
= thread_id
.getName()
30 log
.debug("thread [{0}] starting".format(thread_name
))
32 while retries
< JobThread
.MAX_RETRIES_ON_EXCEPTION
:
35 # fetch next job to execute
36 with self
.async_job
.lock
:
38 if self
.should_reconfigure_num_threads():
39 log
.info("thread [{0}] terminating due to reconfigure".format(thread_name
))
40 self
.async_job
.threads
.remove(self
)
42 vol_job
= self
.async_job
.get_job()
45 self
.async_job
.cv
.wait()
46 self
.async_job
.register_async_job(vol_job
[0], vol_job
[1], thread_id
)
48 # execute the job (outside lock)
49 self
.async_job
.execute_job(vol_job
[0], vol_job
[1], should_cancel
=lambda: thread_id
.should_cancel())
51 except NotImplementedException
:
54 # unless the jobs fetching and execution routines are not implemented
55 # retry till we hit cap limit.
57 log
.warning("thread [{0}] encountered fatal error: (attempt#" \
58 " {1}/{2})".format(thread_name
, retries
, JobThread
.MAX_RETRIES_ON_EXCEPTION
))
59 exc_type
, exc_value
, exc_traceback
= sys
.exc_info()
60 log
.warning("traceback: {0}".format("".join(
61 traceback
.format_exception(exc_type
, exc_value
, exc_traceback
))))
63 # when done, unregister the job
65 with self
.async_job
.lock
:
66 self
.async_job
.unregister_async_job(vol_job
[0], vol_job
[1], thread_id
)
68 log
.error("thread [{0}] reached exception limit, bailing out...".format(thread_name
))
69 self
.vc
.cluster_log("thread {0} bailing out due to exception".format(thread_name
))
70 with self
.async_job
.lock
:
71 self
.async_job
.threads
.remove(self
)
73 def should_reconfigure_num_threads(self
):
74 # reconfigure of max_concurrent_clones
75 return len(self
.async_job
.threads
) > self
.async_job
.nr_concurrent_jobs
78 self
.cancel_event
.set()
80 def should_cancel(self
):
81 return self
.cancel_event
.is_set()
83 def reset_cancel(self
):
84 self
.cancel_event
.clear()
86 class AsyncJobs(object):
88 Class providing asynchronous execution of jobs via worker threads.
89 `jobs` are grouped by `volume`, so a `volume` can have N number of
90 `jobs` executing concurrently (capped by number of concurrent jobs).
92 Usability is simple: subclass this and implement the following:
93 - get_next_job(volname, running_jobs)
94 - execute_job(volname, job, should_cancel)
96 ... and do not forget to invoke base class constructor.
98 Job cancelation is for a volume as a whole, i.e., all executing jobs
99 for a volume are canceled. Cancelation is poll based -- jobs need to
100 periodically check if cancelation is requested, after which the job
101 should return as soon as possible. Cancelation check is provided
102 via `should_cancel()` lambda passed to `execute_job()`.
105 def __init__(self
, volume_client
, name_pfx
, nr_concurrent_jobs
):
106 self
.vc
= volume_client
107 # queue of volumes for starting async jobs
108 self
.q
= deque() # type: deque
109 # volume => job tracking
111 # lock, cv for kickstarting jobs
112 self
.lock
= threading
.Lock()
113 self
.cv
= threading
.Condition(self
.lock
)
114 # cv for job cancelation
116 self
.cancel_cv
= threading
.Condition(self
.lock
)
117 self
.nr_concurrent_jobs
= nr_concurrent_jobs
120 for i
in range(nr_concurrent_jobs
):
121 self
.threads
.append(JobThread(self
, volume_client
, name
="{0}.{1}".format(name_pfx
, i
)))
122 self
.threads
[-1].start()
124 def reconfigure_max_concurrent_clones(self
, name_pfx
, nr_concurrent_jobs
):
126 reconfigure number of cloner threads
129 self
.nr_concurrent_jobs
= nr_concurrent_jobs
130 # Decrease in concurrency. Notify threads which are waiting for a job to terminate.
131 if len(self
.threads
) > nr_concurrent_jobs
:
133 # Increase in concurrency
134 if len(self
.threads
) < nr_concurrent_jobs
:
135 for i
in range(len(self
.threads
), nr_concurrent_jobs
):
136 self
.threads
.append(JobThread(self
, self
.vc
, name
="{0}.{1}.{2}".format(name_pfx
, time
.time(), i
)))
137 self
.threads
[-1].start()
140 log
.debug("processing {0} volume entries".format(len(self
.q
)))
141 nr_vols
= len(self
.q
)
146 # do this now so that the other thread pick up jobs for other volumes
148 running_jobs
= [j
[0] for j
in self
.jobs
[volname
]]
149 (ret
, job
) = self
.get_next_job(volname
, running_jobs
)
151 next_job
= (volname
, job
)
153 # this is an optimization when for a given volume there are no more
154 # jobs and no jobs are in progress. in such cases we remove the volume
155 # from the tracking list so as to:
157 # a. not query the filesystem for jobs over and over again
158 # b. keep the filesystem connection idle so that it can be freed
159 # from the connection pool
161 # if at all there are jobs for a volume, the volume gets added again
162 # to the tracking list and the jobs get kickstarted.
163 # note that, we do not iterate the volume list fully if there is a
164 # jobs to process (that will take place eventually).
165 if ret
== 0 and not job
and not running_jobs
:
166 to_remove
.append(volname
)
168 for vol
in to_remove
:
169 log
.debug("auto removing volume '{0}' from tracked volumes".format(vol
))
174 def register_async_job(self
, volname
, job
, thread_id
):
175 log
.debug("registering async job {0}.{1} with thread {2}".format(volname
, job
, thread_id
))
176 self
.jobs
[volname
].append((job
, thread_id
))
178 def unregister_async_job(self
, volname
, job
, thread_id
):
179 log
.debug("unregistering async job {0}.{1} from thread {2}".format(volname
, job
, thread_id
))
180 self
.jobs
[volname
].remove((job
, thread_id
))
182 cancelled
= thread_id
.should_cancel()
183 thread_id
.reset_cancel()
185 # wake up cancellation waiters if needed
187 logging
.info("waking up cancellation waiters")
188 self
.cancel_cv
.notifyAll()
190 def queue_job(self
, volname
):
192 queue a volume for asynchronous job execution.
194 log
.info("queuing job for volume '{0}'".format(volname
))
196 if not volname
in self
.q
:
197 self
.q
.append(volname
)
198 self
.jobs
[volname
] = []
201 def _cancel_jobs(self
, volname
):
203 cancel all jobs for the volume. do nothing is the no jobs are
204 executing for the given volume. this would wait until all jobs
205 get interrupted and finish execution.
207 log
.info("cancelling jobs for volume '{0}'".format(volname
))
209 if not volname
in self
.q
and not volname
in self
.jobs
:
211 self
.q
.remove(volname
)
212 # cancel in-progress operation and wait until complete
213 for j
in self
.jobs
[volname
]:
215 # wait for cancellation to complete
216 while self
.jobs
[volname
]:
217 log
.debug("waiting for {0} in-progress jobs for volume '{1}' to " \
218 "cancel".format(len(self
.jobs
[volname
]), volname
))
219 self
.cancel_cv
.wait()
220 self
.jobs
.pop(volname
)
221 except (KeyError, ValueError):
224 def _cancel_job(self
, volname
, job
):
226 cancel a executing job for a given volume. return True if canceled, False
227 otherwise (volume/job not found).
230 log
.info("canceling job {0} for volume {1}".format(job
, volname
))
232 if not volname
in self
.q
and not volname
in self
.jobs
and not job
in self
.jobs
[volname
]:
234 for j
in self
.jobs
[volname
]:
237 # be safe against _cancel_jobs() running concurrently
238 while j
in self
.jobs
.get(volname
, []):
239 self
.cancel_cv
.wait()
242 except (KeyError, ValueError):
246 def cancel_job(self
, volname
, job
):
248 return self
._cancel
_job
(volname
, job
)
250 def cancel_jobs(self
, volname
):
252 cancel all executing jobs for a given volume.
255 self
._cancel
_jobs
(volname
)
257 def cancel_all_jobs(self
):
259 call all executing jobs for all volumes.
262 for volname
in list(self
.q
):
263 self
._cancel
_jobs
(volname
)
265 def get_next_job(self
, volname
, running_jobs
):
267 get the next job for asynchronous execution as (retcode, job) tuple. if no
268 jobs are available return (0, None) else return (0, job). on error return
269 (-ret, None). called under `self.lock`.
271 raise NotImplementedException()
273 def execute_job(self
, volname
, job
, should_cancel
):
275 execute a job for a volume. the job can block on I/O operations, sleep for long
276 hours and do all kinds of synchronous work. called outside `self.lock`.
278 raise NotImplementedException()