]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/async_job.py
6 from collections
import deque
8 from .exception
import NotImplementedException
10 log
= logging
.getLogger(__name__
)
12 class JobThread(threading
.Thread
):
13 # this is "not" configurable and there is no need for it to be
14 # configurable. if a thread encounters an exception, we retry
15 # until it hits this many consecutive exceptions.
16 MAX_RETRIES_ON_EXCEPTION
= 10
18 def __init__(self
, async_job
, volume_client
, name
):
19 self
.vc
= volume_client
20 self
.async_job
= async_job
21 # event object to cancel jobs
22 self
.cancel_event
= threading
.Event()
23 threading
.Thread
.__init
__(self
, name
=name
)
27 thread_id
= threading
.currentThread()
28 assert isinstance(thread_id
, JobThread
)
29 thread_name
= thread_id
.getName()
31 while retries
< JobThread
.MAX_RETRIES_ON_EXCEPTION
:
34 # fetch next job to execute
35 with self
.async_job
.lock
:
37 vol_job
= self
.async_job
.get_job()
40 self
.async_job
.cv
.wait()
41 self
.async_job
.register_async_job(vol_job
[0], vol_job
[1], thread_id
)
43 # execute the job (outside lock)
44 self
.async_job
.execute_job(vol_job
[0], vol_job
[1], should_cancel
=lambda: thread_id
.should_cancel())
46 except NotImplementedException
:
49 # unless the jobs fetching and execution routines are not implemented
50 # retry till we hit cap limit.
52 log
.warning("thread [{0}] encountered fatal error: (attempt#" \
53 " {1}/{2})".format(thread_name
, retries
, JobThread
.MAX_RETRIES_ON_EXCEPTION
))
54 exc_type
, exc_value
, exc_traceback
= sys
.exc_info()
55 log
.warning("traceback: {0}".format("".join(
56 traceback
.format_exception(exc_type
, exc_value
, exc_traceback
))))
58 # when done, unregister the job
60 with self
.async_job
.lock
:
61 self
.async_job
.unregister_async_job(vol_job
[0], vol_job
[1], thread_id
)
63 log
.error("thread [{0}] reached exception limit, bailing out...".format(thread_name
))
64 self
.vc
.cluster_log("thread {0} bailing out due to exception".format(thread_name
))
67 self
.cancel_event
.set()
69 def should_cancel(self
):
70 return self
.cancel_event
.is_set()
72 def reset_cancel(self
):
73 self
.cancel_event
.clear()
75 class AsyncJobs(object):
77 Class providing asynchronous execution of jobs via worker threads.
78 `jobs` are grouped by `volume`, so a `volume` can have N number of
79 `jobs` executing concurrently (capped by number of concurrent jobs).
81 Usability is simple: subclass this and implement the following:
82 - get_next_job(volname, running_jobs)
83 - execute_job(volname, job, should_cancel)
85 ... and do not forget to invoke base class constructor.
87 Job cancelation is for a volume as a whole, i.e., all executing jobs
88 for a volume are canceled. Cancelation is poll based -- jobs need to
89 periodically check if cancelation is requested, after which the job
90 should return as soon as possible. Cancelation check is provided
91 via `should_cancel()` lambda passed to `execute_job()`.
94 def __init__(self
, volume_client
, name_pfx
, nr_concurrent_jobs
):
95 self
.vc
= volume_client
96 # queue of volumes for starting async jobs
97 self
.q
= deque() # type: deque
98 # volume => job tracking
100 # lock, cv for kickstarting jobs
101 self
.lock
= threading
.Lock()
102 self
.cv
= threading
.Condition(self
.lock
)
103 # cv for job cancelation
105 self
.cancel_cv
= threading
.Condition(self
.lock
)
108 for i
in range(nr_concurrent_jobs
):
109 self
.threads
.append(JobThread(self
, volume_client
, name
="{0}.{1}".format(name_pfx
, i
)))
110 self
.threads
[-1].start()
113 log
.debug("processing {0} volume entries".format(len(self
.q
)))
114 nr_vols
= len(self
.q
)
119 # do this now so that the other thread pick up jobs for other volumes
121 running_jobs
= [j
[0] for j
in self
.jobs
[volname
]]
122 (ret
, job
) = self
.get_next_job(volname
, running_jobs
)
124 next_job
= (volname
, job
)
126 # this is an optimization when for a given volume there are no more
127 # jobs and no jobs are in progress. in such cases we remove the volume
128 # from the tracking list so as to:
130 # a. not query the filesystem for jobs over and over again
131 # b. keep the filesystem connection idle so that it can be freed
132 # from the connection pool
134 # if at all there are jobs for a volume, the volume gets added again
135 # to the tracking list and the jobs get kickstarted.
136 # note that, we do not iterate the volume list fully if there is a
137 # jobs to process (that will take place eventually).
138 if ret
== 0 and not job
and not running_jobs
:
139 to_remove
.append(volname
)
141 for vol
in to_remove
:
142 log
.debug("auto removing volume '{0}' from tracked volumes".format(vol
))
147 def register_async_job(self
, volname
, job
, thread_id
):
148 log
.debug("registering async job {0}.{1} with thread {2}".format(volname
, job
, thread_id
))
149 self
.jobs
[volname
].append((job
, thread_id
))
151 def unregister_async_job(self
, volname
, job
, thread_id
):
152 log
.debug("unregistering async job {0}.{1} from thread {2}".format(volname
, job
, thread_id
))
153 self
.jobs
[volname
].remove((job
, thread_id
))
155 cancelled
= thread_id
.should_cancel()
156 thread_id
.reset_cancel()
158 # wake up cancellation waiters if needed
160 logging
.info("waking up cancellation waiters")
161 self
.cancel_cv
.notifyAll()
163 def queue_job(self
, volname
):
165 queue a volume for asynchronous job execution.
167 log
.info("queuing job for volume '{0}'".format(volname
))
169 if not volname
in self
.q
:
170 self
.q
.append(volname
)
171 self
.jobs
[volname
] = []
174 def _cancel_jobs(self
, volname
):
176 cancel all jobs for the volume. do nothing is the no jobs are
177 executing for the given volume. this would wait until all jobs
178 get interrupted and finish execution.
180 log
.info("cancelling jobs for volume '{0}'".format(volname
))
182 if not volname
in self
.q
and not volname
in self
.jobs
:
184 self
.q
.remove(volname
)
185 # cancel in-progress operation and wait until complete
186 for j
in self
.jobs
[volname
]:
188 # wait for cancellation to complete
189 while self
.jobs
[volname
]:
190 log
.debug("waiting for {0} in-progress jobs for volume '{1}' to " \
191 "cancel".format(len(self
.jobs
[volname
]), volname
))
192 self
.cancel_cv
.wait()
193 self
.jobs
.pop(volname
)
194 except (KeyError, ValueError):
197 def _cancel_job(self
, volname
, job
):
199 cancel a executing job for a given volume. return True if canceled, False
200 otherwise (volume/job not found).
203 log
.info("canceling job {0} for volume {1}".format(job
, volname
))
205 if not volname
in self
.q
and not volname
in self
.jobs
and not job
in self
.jobs
[volname
]:
207 for j
in self
.jobs
[volname
]:
210 # be safe against _cancel_jobs() running concurrently
211 while j
in self
.jobs
.get(volname
, []):
212 self
.cancel_cv
.wait()
215 except (KeyError, ValueError):
219 def cancel_job(self
, volname
, job
):
221 return self
._cancel
_job
(volname
, job
)
223 def cancel_jobs(self
, volname
):
225 cancel all executing jobs for a given volume.
228 self
._cancel
_jobs
(volname
)
230 def cancel_all_jobs(self
):
232 call all executing jobs for all volumes.
235 for volname
in list(self
.q
):
236 self
._cancel
_jobs
(volname
)
238 def get_next_job(self
, volname
, running_jobs
):
240 get the next job for asynchronous execution as (retcode, job) tuple. if no
241 jobs are available return (0, None) else return (0, job). on error return
242 (-ret, None). called under `self.lock`.
244 raise NotImplementedException()
246 def execute_job(self
, volname
, job
, should_cancel
):
248 execute a job for a volume. the job can block on I/O operations, sleep for long
249 hours and do all kinds of synchronous work. called outside `self.lock`.
251 raise NotImplementedException()