]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/rest/app/manager/request_collection.py
2 from threading
import RLock
3 from rest
.app
.manager
.user_request
import UserRequest
4 from rest
.logger
import logger
7 from rest
.module
import global_instance
as rest_plugin
12 log
= log
.getChild("request_collection")
15 class RequestCollection(object):
17 Manage a collection of UserRequests, indexed by
18 salt JID and request ID.
20 Unlike most of cthulhu, this class contains a lock, which
21 is used in all entry points which may sleep (anything which
22 progresses a UserRequest might involve I/O to create jobs
23 in the salt master), so that they don't go to sleep and
24 wake up in a different world.
28 super(RequestCollection
, self
).__init
__()
30 self
._by
_request
_id
= {}
33 def get_by_id(self
, request_id
):
34 return self
._by
_request
_id
[request_id
]
36 def get_all(self
, state
=None):
38 return self
._by
_request
_id
.values()
40 return [r
for r
in self
._by
_request
_id
.values() if r
.state
== state
]
44 # For walltime-based monitoring of running requests. Long-running requests
45 # get a periodic call to saltutil.running to verify that things really
46 # are still happening.
49 # if not self._by_tag:
52 # log.debug("RequestCollection.tick: %s JIDs underway" % len(self._by_tag))
54 # # Identify JIDs who haven't had a saltutil.running reponse for too long.
55 # # Kill requests in a separate phase because request:JID is not 1:1
58 # for request in self._by_tag.values():
59 # if _now - request.alive_at > datetime.timedelta(seconds=TICK_PERIOD * 3):
60 # log.error("Request %s JID %s stale: now=%s, alive_at=%s" % (
61 # request.id, request.jid, _now, request.alive_at
63 # stale_jobs.add(request)
65 # # Any identified stale jobs are errored out.
66 # for request in stale_jobs:
67 # with self._update_index(request):
68 # request.set_error("Lost contact")
72 # # Identify minions associated with JIDs in flight
73 # query_minions = set()
74 # for jid, request in self._by_tag.items():
75 # query_minions.add(request.minion_id)
77 # # Attempt to emit a saltutil.running to ping jobs, next tick we
78 # # will see if we got updates to the alive_at attribute to indicate non-staleness
80 # log.info("RequestCollection.tick: sending get_running for {0}".format(query_minions))
81 # self._remote.get_running(list(query_minions))
83 # def on_tick_response(self, minion_id, jobs):
85 # Update the alive_at parameter of requests to record that they
86 # are still running remotely.
88 # :param jobs: The response from a saltutil.running
90 # log.debug("RequestCollection.on_tick_response: %s from %s" % (len(jobs), minion_id))
93 # request = self._by_tag[job['jid']]
95 # # Not one of mine, ignore it
98 # request.alive_at = now()
100 # def cancel(self, request_id):
102 # Immediately mark a request as cancelled, and in the background
103 # try and cancel any outstanding JID for it.
105 # request = self._by_request_id[request_id]
107 # # Idempotent behaviour: no-op if already cancelled
108 # if request.state == request.COMPLETE:
111 # with self._update_index(request):
112 # # I will take over cancelling the JID from the request
113 # cancel_jid = request.jid
116 # # Request is now done, no further calls
117 # request.set_error("Cancelled")
120 # # In the background, try to cancel the request's JID on a best-effort basis
122 # self._remote.cancel(request.minion_id, cancel_jid)
123 # # We don't check for completion or errors, it's a best-effort thing. If we're
124 # # cancelling something we will do our best to kill any subprocess but can't
125 # # any guarantees because running nodes may be out of touch with the calamari server.
128 # def fail_all(self, failed_minion):
130 # For use when we lose contact with the minion that was in use for running
131 # requests: assume all these requests are never going to return now.
133 # for request in self.get_all(UserRequest.SUBMITTED):
134 # with self._update_index(request):
135 # request.set_error("Lost contact with server %s" % failed_minion)
137 # log.error("Giving up on JID %s" % request.jid)
141 def submit(self
, request
):
143 Submit a request and store it. Do this in one operation
144 to hold the lock over both operations, otherwise a response
145 to a job could arrive before the request was filed here.
148 log
.info("RequestCollection.submit: {0} {1}".format(
149 request
.id, request
.headline
))
150 self
._by
_request
_id
[request
.id] = request
153 def on_map(self
, sync_type
, sync_object
):
155 Callback for when a new cluster map is available, in which
156 we notify any interested ongoing UserRequests of the new map
157 so that they can progress if they were waiting for it.
160 log
.info("RequestCollection.on_map: {0}".format(sync_type
))
161 requests
= self
.get_all(state
=UserRequest
.SUBMITTED
)
162 for request
in requests
:
164 # If this is one of the types that this request
165 # is waiting for, invoke on_map.
166 for awaited_type
in request
.awaiting_versions
.keys():
167 if awaited_type
== sync_type
:
168 request
.on_map(sync_type
, sync_object
)
169 except Exception as e
:
170 log
.error("e.__class__ = {0}".format(e
.__class
__))
171 log
.exception("Request %s threw exception in on_map", request
.id)
172 request
.set_error("Internal error %s" % e
)
175 # def _on_rados_completion(self, request, result):
177 # Handle JID completion from a ceph.rados_commands operation
179 # if request.state != UserRequest.SUBMITTED:
180 # # Unexpected, ignore.
181 # log.error("Received completion for request %s/%s in state %s" % (
182 # request.id, request.jid, request.state
186 # if result['error']:
187 # # This indicates a failure within ceph.rados_commands which was caught
188 # # by our code, like one of our Ceph commands returned an error code.
189 # # NB in future there may be UserRequest subclasses which want to receive
190 # # and handle these errors themselves, so this branch would be refactored
192 # log.error("Request %s experienced an error: %s" % (request.id, result['error_status']))
194 # request.set_error(result['error_status'])
199 # request.complete_jid()
201 # # After a jid completes, requests may start waiting for cluster
202 # # map updates, we ask ClusterMonitor to hurry up and get them
203 # # on behalf of the request.
204 # if request.awaiting_versions:
205 # # The request may be waiting for an epoch that we already
206 # # have, if so give it to the request right away
207 # for sync_type, want_version in request.awaiting_versions.items():
208 # data = ceph_state.get(sync_type)
210 # if want_version and sync_type.cmp(data['epoch'],
211 # want_version) >= 0:
213 # "Awaited %s %s is immediately available" % (
214 # sync_type, want_version))
215 # request.on_map(sync_type, data)
217 # except Exception as e:
218 # # Ensure that a misbehaving piece of code in a UserRequest subclass
219 # # results in a terminated job, not a zombie job
220 # log.exception("Calling complete_jid for %s/%s" % (request.id, request.jid))
222 # request.set_error("Internal error %s" % e)
225 def on_completion(self
, tag
):
227 Callback for when a salt/job/<jid>/ret event is received, in which
228 we find the UserRequest that created the job, and inform it of
229 completion so that it can progress.
232 log
.info("RequestCollection.on_completion: {0}".format(tag
))
235 request
= self
.get_by_id(tag
)
237 log
.warning("on_completion: unknown tag {0}" % tag
)
240 request
.rados_commands
.advance()
241 if request
.rados_commands
.is_complete():
242 if request
.rados_commands
.r
== 0:
244 request
.complete_jid()
245 except Exception as e
:
246 log
.exception("Request %s threw exception in on_map", request
.id)
247 request
.set_error("Internal error %s" % e
)
250 # The request may be waiting for an epoch that we already have, if so
251 # give it to the request right away
252 for sync_type
, want_version
in request
.awaiting_versions
.items():
253 sync_object
= rest_plugin().get_sync_object(sync_type
)
254 if want_version
and sync_type
.cmp(sync_object
.version
, want_version
) >= 0:
255 log
.info("Awaited %s %s is immediately available" % (sync_type
, want_version
))
256 request
.on_map(sync_type
, sync_object
)
258 request
.set_error(request
.rados_commands
.outs
)