]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rest/app/manager/request_collection.py
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / pybind / mgr / rest / app / manager / request_collection.py
1
2 from threading import RLock
3 from rest.app.manager.user_request import UserRequest
4 from rest.logger import logger
5 log = logger()
6
7 from rest.module import global_instance as rest_plugin
8
9 TICK_PERIOD = 20
10
11
12 log = log.getChild("request_collection")
13
14
15 class RequestCollection(object):
16 """
17 Manage a collection of UserRequests, indexed by
18 salt JID and request ID.
19
20 Unlike most of cthulhu, this class contains a lock, which
21 is used in all entry points which may sleep (anything which
22 progresses a UserRequest might involve I/O to create jobs
23 in the salt master), so that they don't go to sleep and
24 wake up in a different world.
25 """
26
27 def __init__(self):
28 super(RequestCollection, self).__init__()
29
30 self._by_request_id = {}
31 self._lock = RLock()
32
33 def get_by_id(self, request_id):
34 return self._by_request_id[request_id]
35
36 def get_all(self, state=None):
37 if not state:
38 return self._by_request_id.values()
39 else:
40 return [r for r in self._by_request_id.values() if r.state == state]
41
42 # def tick(self):
43 # """
44 # For walltime-based monitoring of running requests. Long-running requests
45 # get a periodic call to saltutil.running to verify that things really
46 # are still happening.
47 # """
48 #
49 # if not self._by_tag:
50 # return
51 # else:
52 # log.debug("RequestCollection.tick: %s JIDs underway" % len(self._by_tag))
53 #
54 # # Identify JIDs who haven't had a saltutil.running reponse for too long.
55 # # Kill requests in a separate phase because request:JID is not 1:1
56 # stale_jobs = set()
57 # _now = now()
58 # for request in self._by_tag.values():
59 # if _now - request.alive_at > datetime.timedelta(seconds=TICK_PERIOD * 3):
60 # log.error("Request %s JID %s stale: now=%s, alive_at=%s" % (
61 # request.id, request.jid, _now, request.alive_at
62 # ))
63 # stale_jobs.add(request)
64 #
65 # # Any identified stale jobs are errored out.
66 # for request in stale_jobs:
67 # with self._update_index(request):
68 # request.set_error("Lost contact")
69 # request.jid = None
70 # request.complete()
71 #
72 # # Identify minions associated with JIDs in flight
73 # query_minions = set()
74 # for jid, request in self._by_tag.items():
75 # query_minions.add(request.minion_id)
76 #
77 # # Attempt to emit a saltutil.running to ping jobs, next tick we
78 # # will see if we got updates to the alive_at attribute to indicate non-staleness
79 # if query_minions:
80 # log.info("RequestCollection.tick: sending get_running for {0}".format(query_minions))
81 # self._remote.get_running(list(query_minions))
82
83 # def on_tick_response(self, minion_id, jobs):
84 # """
85 # Update the alive_at parameter of requests to record that they
86 # are still running remotely.
87 #
88 # :param jobs: The response from a saltutil.running
89 # """
90 # log.debug("RequestCollection.on_tick_response: %s from %s" % (len(jobs), minion_id))
91 # for job in jobs:
92 # try:
93 # request = self._by_tag[job['jid']]
94 # except KeyError:
95 # # Not one of mine, ignore it
96 # pass
97 # else:
98 # request.alive_at = now()
99
100 # def cancel(self, request_id):
101 # """
102 # Immediately mark a request as cancelled, and in the background
103 # try and cancel any outstanding JID for it.
104 # """
105 # request = self._by_request_id[request_id]
106 #
107 # # Idempotent behaviour: no-op if already cancelled
108 # if request.state == request.COMPLETE:
109 # return
110 #
111 # with self._update_index(request):
112 # # I will take over cancelling the JID from the request
113 # cancel_jid = request.jid
114 # request.jid = None
115 #
116 # # Request is now done, no further calls
117 # request.set_error("Cancelled")
118 # request.complete()
119 #
120 # # In the background, try to cancel the request's JID on a best-effort basis
121 # if cancel_jid:
122 # self._remote.cancel(request.minion_id, cancel_jid)
123 # # We don't check for completion or errors, it's a best-effort thing. If we're
124 # # cancelling something we will do our best to kill any subprocess but can't
125 # # any guarantees because running nodes may be out of touch with the calamari server.
126 #
127 # @nosleep
128 # def fail_all(self, failed_minion):
129 # """
130 # For use when we lose contact with the minion that was in use for running
131 # requests: assume all these requests are never going to return now.
132 # """
133 # for request in self.get_all(UserRequest.SUBMITTED):
134 # with self._update_index(request):
135 # request.set_error("Lost contact with server %s" % failed_minion)
136 # if request.jid:
137 # log.error("Giving up on JID %s" % request.jid)
138 # request.jid = None
139 # request.complete()
140
141 def submit(self, request):
142 """
143 Submit a request and store it. Do this in one operation
144 to hold the lock over both operations, otherwise a response
145 to a job could arrive before the request was filed here.
146 """
147 with self._lock:
148 log.info("RequestCollection.submit: {0} {1}".format(
149 request.id, request.headline))
150 self._by_request_id[request.id] = request
151 request.submit()
152
153 def on_map(self, sync_type, sync_object):
154 """
155 Callback for when a new cluster map is available, in which
156 we notify any interested ongoing UserRequests of the new map
157 so that they can progress if they were waiting for it.
158 """
159 with self._lock:
160 log.info("RequestCollection.on_map: {0}".format(sync_type))
161 requests = self.get_all(state=UserRequest.SUBMITTED)
162 for request in requests:
163 try:
164 # If this is one of the types that this request
165 # is waiting for, invoke on_map.
166 for awaited_type in request.awaiting_versions.keys():
167 if awaited_type == sync_type:
168 request.on_map(sync_type, sync_object)
169 except Exception as e:
170 log.error("e.__class__ = {0}".format(e.__class__))
171 log.exception("Request %s threw exception in on_map", request.id)
172 request.set_error("Internal error %s" % e)
173 request.complete()
174 #
175 # def _on_rados_completion(self, request, result):
176 # """
177 # Handle JID completion from a ceph.rados_commands operation
178 # """
179 # if request.state != UserRequest.SUBMITTED:
180 # # Unexpected, ignore.
181 # log.error("Received completion for request %s/%s in state %s" % (
182 # request.id, request.jid, request.state
183 # ))
184 # return
185 #
186 # if result['error']:
187 # # This indicates a failure within ceph.rados_commands which was caught
188 # # by our code, like one of our Ceph commands returned an error code.
189 # # NB in future there may be UserRequest subclasses which want to receive
190 # # and handle these errors themselves, so this branch would be refactored
191 # # to allow that.
192 # log.error("Request %s experienced an error: %s" % (request.id, result['error_status']))
193 # request.jid = None
194 # request.set_error(result['error_status'])
195 # request.complete()
196 # return
197 #
198 # try:
199 # request.complete_jid()
200 #
201 # # After a jid completes, requests may start waiting for cluster
202 # # map updates, we ask ClusterMonitor to hurry up and get them
203 # # on behalf of the request.
204 # if request.awaiting_versions:
205 # # The request may be waiting for an epoch that we already
206 # # have, if so give it to the request right away
207 # for sync_type, want_version in request.awaiting_versions.items():
208 # data = ceph_state.get(sync_type)
209 #
210 # if want_version and sync_type.cmp(data['epoch'],
211 # want_version) >= 0:
212 # log.info(
213 # "Awaited %s %s is immediately available" % (
214 # sync_type, want_version))
215 # request.on_map(sync_type, data)
216 #
217 # except Exception as e:
218 # # Ensure that a misbehaving piece of code in a UserRequest subclass
219 # # results in a terminated job, not a zombie job
220 # log.exception("Calling complete_jid for %s/%s" % (request.id, request.jid))
221 # request.jid = None
222 # request.set_error("Internal error %s" % e)
223 # request.complete()
224
225 def on_completion(self, tag):
226 """
227 Callback for when a salt/job/<jid>/ret event is received, in which
228 we find the UserRequest that created the job, and inform it of
229 completion so that it can progress.
230 """
231 with self._lock:
232 log.info("RequestCollection.on_completion: {0}".format(tag))
233
234 try:
235 request = self.get_by_id(tag)
236 except KeyError:
237 log.warning("on_completion: unknown tag {0}" % tag)
238 return
239
240 request.rados_commands.advance()
241 if request.rados_commands.is_complete():
242 if request.rados_commands.r == 0:
243 try:
244 request.complete_jid()
245 except Exception as e:
246 log.exception("Request %s threw exception in on_map", request.id)
247 request.set_error("Internal error %s" % e)
248 request.complete()
249
250 # The request may be waiting for an epoch that we already have, if so
251 # give it to the request right away
252 for sync_type, want_version in request.awaiting_versions.items():
253 sync_object = rest_plugin().get_sync_object(sync_type)
254 if want_version and sync_type.cmp(sync_object.version, want_version) >= 0:
255 log.info("Awaited %s %s is immediately available" % (sync_type, want_version))
256 request.on_map(sync_type, sync_object)
257 else:
258 request.set_error(request.rados_commands.outs)
259 request.complete()