]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | from threading import RLock | |
3 | from rest.app.manager.user_request import UserRequest | |
4 | from rest.logger import logger | |
5 | log = logger() | |
6 | ||
7 | from rest.module import global_instance as rest_plugin | |
8 | ||
9 | TICK_PERIOD = 20 | |
10 | ||
11 | ||
12 | log = log.getChild("request_collection") | |
13 | ||
14 | ||
15 | class RequestCollection(object): | |
16 | """ | |
17 | Manage a collection of UserRequests, indexed by | |
18 | salt JID and request ID. | |
19 | ||
20 | Unlike most of cthulhu, this class contains a lock, which | |
21 | is used in all entry points which may sleep (anything which | |
22 | progresses a UserRequest might involve I/O to create jobs | |
23 | in the salt master), so that they don't go to sleep and | |
24 | wake up in a different world. | |
25 | """ | |
26 | ||
27 | def __init__(self): | |
28 | super(RequestCollection, self).__init__() | |
29 | ||
30 | self._by_request_id = {} | |
31 | self._lock = RLock() | |
32 | ||
33 | def get_by_id(self, request_id): | |
34 | return self._by_request_id[request_id] | |
35 | ||
36 | def get_all(self, state=None): | |
37 | if not state: | |
38 | return self._by_request_id.values() | |
39 | else: | |
40 | return [r for r in self._by_request_id.values() if r.state == state] | |
41 | ||
42 | # def tick(self): | |
43 | # """ | |
44 | # For walltime-based monitoring of running requests. Long-running requests | |
45 | # get a periodic call to saltutil.running to verify that things really | |
46 | # are still happening. | |
47 | # """ | |
48 | # | |
49 | # if not self._by_tag: | |
50 | # return | |
51 | # else: | |
52 | # log.debug("RequestCollection.tick: %s JIDs underway" % len(self._by_tag)) | |
53 | # | |
54 | # # Identify JIDs who haven't had a saltutil.running reponse for too long. | |
55 | # # Kill requests in a separate phase because request:JID is not 1:1 | |
56 | # stale_jobs = set() | |
57 | # _now = now() | |
58 | # for request in self._by_tag.values(): | |
59 | # if _now - request.alive_at > datetime.timedelta(seconds=TICK_PERIOD * 3): | |
60 | # log.error("Request %s JID %s stale: now=%s, alive_at=%s" % ( | |
61 | # request.id, request.jid, _now, request.alive_at | |
62 | # )) | |
63 | # stale_jobs.add(request) | |
64 | # | |
65 | # # Any identified stale jobs are errored out. | |
66 | # for request in stale_jobs: | |
67 | # with self._update_index(request): | |
68 | # request.set_error("Lost contact") | |
69 | # request.jid = None | |
70 | # request.complete() | |
71 | # | |
72 | # # Identify minions associated with JIDs in flight | |
73 | # query_minions = set() | |
74 | # for jid, request in self._by_tag.items(): | |
75 | # query_minions.add(request.minion_id) | |
76 | # | |
77 | # # Attempt to emit a saltutil.running to ping jobs, next tick we | |
78 | # # will see if we got updates to the alive_at attribute to indicate non-staleness | |
79 | # if query_minions: | |
80 | # log.info("RequestCollection.tick: sending get_running for {0}".format(query_minions)) | |
81 | # self._remote.get_running(list(query_minions)) | |
82 | ||
83 | # def on_tick_response(self, minion_id, jobs): | |
84 | # """ | |
85 | # Update the alive_at parameter of requests to record that they | |
86 | # are still running remotely. | |
87 | # | |
88 | # :param jobs: The response from a saltutil.running | |
89 | # """ | |
90 | # log.debug("RequestCollection.on_tick_response: %s from %s" % (len(jobs), minion_id)) | |
91 | # for job in jobs: | |
92 | # try: | |
93 | # request = self._by_tag[job['jid']] | |
94 | # except KeyError: | |
95 | # # Not one of mine, ignore it | |
96 | # pass | |
97 | # else: | |
98 | # request.alive_at = now() | |
99 | ||
100 | # def cancel(self, request_id): | |
101 | # """ | |
102 | # Immediately mark a request as cancelled, and in the background | |
103 | # try and cancel any outstanding JID for it. | |
104 | # """ | |
105 | # request = self._by_request_id[request_id] | |
106 | # | |
107 | # # Idempotent behaviour: no-op if already cancelled | |
108 | # if request.state == request.COMPLETE: | |
109 | # return | |
110 | # | |
111 | # with self._update_index(request): | |
112 | # # I will take over cancelling the JID from the request | |
113 | # cancel_jid = request.jid | |
114 | # request.jid = None | |
115 | # | |
116 | # # Request is now done, no further calls | |
117 | # request.set_error("Cancelled") | |
118 | # request.complete() | |
119 | # | |
120 | # # In the background, try to cancel the request's JID on a best-effort basis | |
121 | # if cancel_jid: | |
122 | # self._remote.cancel(request.minion_id, cancel_jid) | |
123 | # # We don't check for completion or errors, it's a best-effort thing. If we're | |
124 | # # cancelling something we will do our best to kill any subprocess but can't | |
125 | # # any guarantees because running nodes may be out of touch with the calamari server. | |
126 | # | |
127 | # @nosleep | |
128 | # def fail_all(self, failed_minion): | |
129 | # """ | |
130 | # For use when we lose contact with the minion that was in use for running | |
131 | # requests: assume all these requests are never going to return now. | |
132 | # """ | |
133 | # for request in self.get_all(UserRequest.SUBMITTED): | |
134 | # with self._update_index(request): | |
135 | # request.set_error("Lost contact with server %s" % failed_minion) | |
136 | # if request.jid: | |
137 | # log.error("Giving up on JID %s" % request.jid) | |
138 | # request.jid = None | |
139 | # request.complete() | |
140 | ||
141 | def submit(self, request): | |
142 | """ | |
143 | Submit a request and store it. Do this in one operation | |
144 | to hold the lock over both operations, otherwise a response | |
145 | to a job could arrive before the request was filed here. | |
146 | """ | |
147 | with self._lock: | |
148 | log.info("RequestCollection.submit: {0} {1}".format( | |
149 | request.id, request.headline)) | |
150 | self._by_request_id[request.id] = request | |
151 | request.submit() | |
152 | ||
153 | def on_map(self, sync_type, sync_object): | |
154 | """ | |
155 | Callback for when a new cluster map is available, in which | |
156 | we notify any interested ongoing UserRequests of the new map | |
157 | so that they can progress if they were waiting for it. | |
158 | """ | |
159 | with self._lock: | |
160 | log.info("RequestCollection.on_map: {0}".format(sync_type)) | |
161 | requests = self.get_all(state=UserRequest.SUBMITTED) | |
162 | for request in requests: | |
163 | try: | |
164 | # If this is one of the types that this request | |
165 | # is waiting for, invoke on_map. | |
166 | for awaited_type in request.awaiting_versions.keys(): | |
167 | if awaited_type == sync_type: | |
168 | request.on_map(sync_type, sync_object) | |
169 | except Exception as e: | |
170 | log.error("e.__class__ = {0}".format(e.__class__)) | |
171 | log.exception("Request %s threw exception in on_map", request.id) | |
172 | request.set_error("Internal error %s" % e) | |
173 | request.complete() | |
174 | # | |
175 | # def _on_rados_completion(self, request, result): | |
176 | # """ | |
177 | # Handle JID completion from a ceph.rados_commands operation | |
178 | # """ | |
179 | # if request.state != UserRequest.SUBMITTED: | |
180 | # # Unexpected, ignore. | |
181 | # log.error("Received completion for request %s/%s in state %s" % ( | |
182 | # request.id, request.jid, request.state | |
183 | # )) | |
184 | # return | |
185 | # | |
186 | # if result['error']: | |
187 | # # This indicates a failure within ceph.rados_commands which was caught | |
188 | # # by our code, like one of our Ceph commands returned an error code. | |
189 | # # NB in future there may be UserRequest subclasses which want to receive | |
190 | # # and handle these errors themselves, so this branch would be refactored | |
191 | # # to allow that. | |
192 | # log.error("Request %s experienced an error: %s" % (request.id, result['error_status'])) | |
193 | # request.jid = None | |
194 | # request.set_error(result['error_status']) | |
195 | # request.complete() | |
196 | # return | |
197 | # | |
198 | # try: | |
199 | # request.complete_jid() | |
200 | # | |
201 | # # After a jid completes, requests may start waiting for cluster | |
202 | # # map updates, we ask ClusterMonitor to hurry up and get them | |
203 | # # on behalf of the request. | |
204 | # if request.awaiting_versions: | |
205 | # # The request may be waiting for an epoch that we already | |
206 | # # have, if so give it to the request right away | |
207 | # for sync_type, want_version in request.awaiting_versions.items(): | |
208 | # data = ceph_state.get(sync_type) | |
209 | # | |
210 | # if want_version and sync_type.cmp(data['epoch'], | |
211 | # want_version) >= 0: | |
212 | # log.info( | |
213 | # "Awaited %s %s is immediately available" % ( | |
214 | # sync_type, want_version)) | |
215 | # request.on_map(sync_type, data) | |
216 | # | |
217 | # except Exception as e: | |
218 | # # Ensure that a misbehaving piece of code in a UserRequest subclass | |
219 | # # results in a terminated job, not a zombie job | |
220 | # log.exception("Calling complete_jid for %s/%s" % (request.id, request.jid)) | |
221 | # request.jid = None | |
222 | # request.set_error("Internal error %s" % e) | |
223 | # request.complete() | |
224 | ||
225 | def on_completion(self, tag): | |
226 | """ | |
227 | Callback for when a salt/job/<jid>/ret event is received, in which | |
228 | we find the UserRequest that created the job, and inform it of | |
229 | completion so that it can progress. | |
230 | """ | |
231 | with self._lock: | |
232 | log.info("RequestCollection.on_completion: {0}".format(tag)) | |
233 | ||
234 | try: | |
235 | request = self.get_by_id(tag) | |
236 | except KeyError: | |
237 | log.warning("on_completion: unknown tag {0}" % tag) | |
238 | return | |
239 | ||
240 | request.rados_commands.advance() | |
241 | if request.rados_commands.is_complete(): | |
242 | if request.rados_commands.r == 0: | |
243 | try: | |
244 | request.complete_jid() | |
245 | except Exception as e: | |
246 | log.exception("Request %s threw exception in on_map", request.id) | |
247 | request.set_error("Internal error %s" % e) | |
248 | request.complete() | |
249 | ||
250 | # The request may be waiting for an epoch that we already have, if so | |
251 | # give it to the request right away | |
252 | for sync_type, want_version in request.awaiting_versions.items(): | |
253 | sync_object = rest_plugin().get_sync_object(sync_type) | |
254 | if want_version and sync_type.cmp(sync_object.version, want_version) >= 0: | |
255 | log.info("Awaited %s %s is immediately available" % (sync_type, want_version)) | |
256 | request.on_map(sync_type, sync_object) | |
257 | else: | |
258 | request.set_error(request.rados_commands.outs) | |
259 | request.complete() |