]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/deepsea/module.py
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / pybind / mgr / deepsea / module.py
1 # vim: ts=8 et sw=4 sts=4
2 """
3 ceph-mgr DeepSea orchestrator module
4 """
5
6 # We want orchestrator methods in this to be 1:1 mappings to DeepSea runners,
7 # we don't want to aggregate multiple salt invocations here, because that means
8 # this module would need to know too much about how DeepSea works internally.
9 # Better to expose new runners from DeepSea to match what the orchestrator needs.
10
11 import json
12 import errno
13 import requests
14
15 from threading import Event, Thread, Lock
16
17 from mgr_module import MgrModule
18 import orchestrator
19
20
21 class RequestException(Exception):
22 def __init__(self, message, status_code=None):
23 super(RequestException, self).__init__(message)
24 self.status_code = status_code
25
26
27 class DeepSeaReadCompletion(orchestrator.ReadCompletion):
28 def __init__(self, process_result_callback):
29 super(DeepSeaReadCompletion, self).__init__()
30 self._complete = False
31 self._cb = process_result_callback
32
33 def _process_result(self, data):
34 self._result = self._cb(data)
35 self._complete = True
36
37 @property
38 def result(self):
39 return self._result
40
41 @property
42 def is_complete(self):
43 return self._complete
44
45
46 class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
47 MODULE_OPTIONS = [
48 {
49 'name': 'salt_api_url',
50 'default': ''
51 },
52 {
53 'name': 'salt_api_eauth',
54 'default': 'sharedsecret'
55 },
56 {
57 'name': 'salt_api_username',
58 'default': ''
59 },
60 {
61 'name': 'salt_api_password',
62 'default': ''
63 }
64 ]
65
66
67 COMMANDS = [
68 {
69 "cmd": "deepsea config-set name=key,type=CephString "
70 "name=value,type=CephString",
71 "desc": "Set a configuration value",
72 "perm": "rw"
73 },
74 {
75 "cmd": "deepsea config-show",
76 "desc": "Show current configuration",
77 "perm": "r"
78 }
79 ]
80
81
82 @property
83 def config_keys(self):
84 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
85
86
87 def get_module_option(self, key, default=None):
88 """
89 Overrides the default MgrModule get_module_option() method to pull in defaults
90 specific to this module
91 """
92 return super(DeepSeaOrchestrator, self).get_module_option(key, default=self.config_keys[key])
93
94
95 def _config_valid(self):
96 for key in self.config_keys.keys():
97 if not self.get_module_option(key, self.config_keys[key]):
98 return False
99 return True
100
101
102 def __init__(self, *args, **kwargs):
103 super(DeepSeaOrchestrator, self).__init__(*args, **kwargs)
104 self._event = Event()
105 self._token = None
106 self._event_reader = None
107 self._reading_events = False
108 self._last_failure_msg = None
109 self._all_completions = dict()
110 self._completion_lock = Lock()
111
112
113 def available(self):
114 if not self._config_valid():
115 return False, "Configuration invalid; try `ceph deepsea config-set [...]`"
116
117 if not self._reading_events and self._last_failure_msg:
118 return False, self._last_failure_msg
119
120 return True, ""
121
122
123 def get_inventory(self, node_filter=None, refresh=False):
124 """
125 Note that this will raise an exception (e.g. if the salt-api is down,
126 or the username/password is incorret). Same for other methods.
127 Callers should expect this and react appropriately. The orchestrator
128 cli, for example, just prints the traceback in the console, so the
129 user at least sees the error.
130 """
131
132 def process_result(event_data):
133 result = []
134 if event_data['success']:
135 for node_name, node_devs in event_data["return"].items():
136 devs = list(map(lambda di:
137 orchestrator.InventoryDevice.from_ceph_volume_inventory(di),
138 node_devs))
139 result.append(orchestrator.InventoryNode(node_name, devs))
140 else:
141 self.log.error(event_data['return'])
142 return result
143
144 with self._completion_lock:
145 c = DeepSeaReadCompletion(process_result)
146
147 nodes = []
148 roles = []
149 if node_filter:
150 nodes = node_filter.nodes
151 roles = node_filter.labels
152
153 resp = self._do_request_with_login("POST", data = {
154 "client": "runner_async",
155 "fun": "mgr_orch.get_inventory",
156 "nodes": nodes,
157 "roles": roles
158 })
159
160 # ['return'][0]['tag'] in the resonse JSON is what we need to match
161 # on when looking for the result event (e.g.: "salt/run/20181018074024331230")
162 self._all_completions["{}/ret".format(resp.json()['return'][0]['tag'])] = c
163
164 return c
165
166
167 def describe_service(self, service_type, service_id, node_name):
168
169 # Note: describe_service() does *not* support OSDs. This is because
170 # DeepSea doesn't really record what OSDs are deployed where; Ceph is
171 # considered the canonical source of this information, so having this
172 # function query OSD information from DeepSea doesn't make a lot of
173 # sense (DeepSea would have to call back into Ceph).
174
175 assert service_type in ("mon", "mgr", "mds", "rgw", None), service_type + " unsupported"
176
177 def process_result(event_data):
178 result = []
179 if event_data['success']:
180 for node_name, service_info in event_data["return"].items():
181 for service_type, service_instance in service_info.items():
182 desc = orchestrator.ServiceDescription()
183 desc.nodename = node_name
184 desc.service_instance = service_instance
185 desc.service_type = service_type
186 result.append(desc)
187 else:
188 self.log.error(event_data['return'])
189 return result
190
191 with self._completion_lock:
192 c = DeepSeaReadCompletion(process_result)
193
194 resp = self._do_request_with_login("POST", data = {
195 "client": "runner_async",
196 "fun": "mgr_orch.describe_service",
197 "role": service_type,
198 "service_id": service_id,
199 "node": node_name
200 })
201 self._all_completions["{}/ret".format(resp.json()['return'][0]['tag'])] = c
202
203 return c
204
205
206 def wait(self, completions):
207 incomplete = False
208
209 with self._completion_lock:
210 for c in completions:
211 if c.is_complete:
212 continue
213 if not c.is_complete:
214 # TODO: the job is in the bus, it should reach us eventually
215 # unless something has gone wrong (e.g. salt-api died, etc.),
216 # in which case it's possible the job finished but we never
217 # noticed the salt/run/$id/ret event. Need to add the job ID
218 # (or possibly the full event tag) to the completion object.
219 # That way, if we want to double check on a job that hasn't
220 # been completed yet, we can make a synchronous request to
221 # salt-api to invoke jobs.lookup_jid, and if it's complete we
222 # should be able to pass its return value to _process_result()
223 # Question: do we do this automatically after some timeout?
224 # Or do we add a function so the admin can check and "unstick"
225 # a stuck completion?
226 incomplete = True
227
228 return not incomplete
229
230
231 def handle_command(self, inbuf, cmd):
232 if cmd['prefix'] == 'deepsea config-show':
233 return 0, json.dumps(dict([(key, self.get_module_option(key)) for key in self.config_keys.keys()])), ''
234
235 elif cmd['prefix'] == 'deepsea config-set':
236 if cmd['key'] not in self.config_keys.keys():
237 return (-errno.EINVAL, '',
238 "Unknown configuration option '{0}'".format(cmd['key']))
239
240 self.set_module_option(cmd['key'], cmd['value'])
241 self._event.set();
242 return 0, "Configuration option '{0}' updated".format(cmd['key']), ''
243
244 return (-errno.EINVAL, '',
245 "Command not found '{0}'".format(cmd['prefix']))
246
247
248 def serve(self):
249 self.log.info('DeepSea module starting up')
250 self.run = True
251 while self.run:
252 if not self._config_valid():
253 # This will spin until the config is valid, spitting a warning
254 # that the config is invalid every 60 seconds. The one oddity
255 # is that while setting the various parameters, this log warning
256 # will print once for each parameter set until the config is valid.
257 self.log.warn("Configuration invalid; try `ceph deepsea config-set [...]`")
258 self._event.wait(60)
259 self._event.clear()
260 continue
261
262 if self._event_reader and not self._reading_events:
263 self._event_reader = None
264
265 if not self._event_reader:
266 self._last_failure_msg = None
267 try:
268 # This spawns a separate thread to read the salt event bus
269 # stream. We can't do it in the serve thead, because reading
270 # from the response blocks, which would prevent the serve
271 # thread from handling anything else.
272 #
273 # TODO: figure out how to restart the _event_reader thread if
274 # config changes, e.g.: a new username or password is set.
275 # This will be difficult, because _read_sse() just blocks waiting
276 # for response lines. The closest I got was setting a read timeout
277 # on the request, but in the general case (where not much is
278 # happening most of the time), this will result in continual
279 # timeouts and reconnects. We really need an asynchronous read
280 # to support this.
281 self._event_response = self._do_request_with_login("GET", "events", stream=True)
282 self._event_reader = Thread(target=self._read_sse)
283 self._reading_events = True
284 self._event_reader.start()
285 except Exception as ex:
286 self._set_last_failure_msg("Failure setting up event reader: " + str(ex))
287 # gives an (arbitrary) 60 second retry if we can't attach to
288 # the salt-api event bus for some reason (e.g.: invalid username,
289 # or password, which will be logged as "Request failed with status
290 # code 401"). Note that this 60 second retry will also happen if
291 # salt-api dies.
292 self._event.wait(60)
293 self._event.clear()
294 continue
295
296 # Wait indefinitely for something interesting to happen (e.g.
297 # config-set, or shutdown), or the event reader to fail, which
298 # will happen if the salt-api server dies or restarts).
299 self._event.wait()
300 self._event.clear()
301
302
303 def shutdown(self):
304 self.log.info('DeepSea module shutting down')
305 self.run = False
306 self._event.set()
307
308
309 def _set_last_failure_msg(self, msg):
310 self._last_failure_msg = msg
311 self.log.warn(msg)
312
313
314 # Reader/parser of SSE events, see:
315 # - https://docs.saltstack.com/en/latest/ref/netapi/all/salt.netapi.rest_cherrypy.html#events)
316 # - https://www.w3.org/TR/2009/WD-eventsource-20090421/
317 # Note: this is pretty braindead and doesn't implement the full eventsource
318 # spec, but it *does* implement enough for us to listen to events from salt
319 # and potentially do something with them.
320 def _read_sse(self):
321 event = {}
322 try:
323 # Just starting the event reader; if we've made it here, we know we're
324 # talking to salt-api (_do_request would have raised an exception if the
325 # response wasn't ok), so check if there's any completions inflight that
326 # need to be dealt with. This handles the case where some command was
327 # invoked, then salt-api died somehow, and we reconneced, but missed the
328 # completion at the time it actually happened.
329 for tag in list(self._all_completions):
330 self.log.info("Found event {} inflight".format(tag))
331 try:
332 resp = self._do_request_with_login("POST", data = {
333 "client": "runner",
334 "fun": "jobs.lookup_jid",
335 "jid": tag.split('/')[2]
336 })
337 # jobs.lookup_jid returns a dict keyed by hostname.
338 return_dict = resp.json()['return'][0]
339 if return_dict:
340 # If the job is complete, there'll be one item in the dict.
341 self.log.info("Event {} complete".format(tag))
342 # The key is the salt master hostname, but we don't care
343 # about that, so just grab the data.
344 data = next(iter(return_dict.items()))[1]
345 self._all_completions[tag]._process_result(data)
346 # TODO: decide whether it's bad to delete the completion
347 # here -- would we ever need to resurrect it?
348 del self._all_completions[tag]
349 else:
350 # if the job is not complete, there'll be nothing in the dict
351 self.log.info("Event {} still pending".format(tag))
352 except Exception as ex:
353 # Logging a warning if the request failed, so we can continue
354 # checking any other completions, then get onto reading events
355 self.log.warn("Error looking up inflight event {}: {}".format(tag, str(ex)))
356
357 for line in self._event_response.iter_lines():
358 with self._completion_lock:
359 if line:
360 line = line.decode('utf-8')
361 colon = line.find(':')
362 if colon > 0:
363 k = line[:colon]
364 v = line[colon+2:]
365 if k == "retry":
366 # TODO: find out if we need to obey this reconnection time
367 self.log.warn("Server requested retry {}, ignored".format(v))
368 else:
369 event[k] = v
370 else:
371 # Empty line, terminates an event. Note that event['tag']
372 # is a salt-api extension to SSE to avoid having to decode
373 # json data if you don't care about it. To get to the
374 # interesting stuff, you want event['data'], which is json.
375 # If you want to have some fun, try
376 # `ceph daemon mgr.$(hostname) config set debug_mgr 20`
377 # then `salt '*' test.ping` on the master
378 self.log.debug("Got event '{}'".format(str(event)))
379
380 # If we're actually interested in this event (i.e. it's
381 # in our completion dict), fire off that completion's
382 # _process_result() callback and remove it from our list.
383 if event['tag'] in self._all_completions:
384 self.log.info("Event {} complete".format(event['tag']))
385 self._all_completions[event['tag']]._process_result(json.loads(event['data'])['data'])
386 # TODO: decide whether it's bad to delete the completion
387 # here -- would we ever need to resurrect it?
388 del self._all_completions[event['tag']]
389
390 event = {}
391 self._set_last_failure_msg("SSE read terminated")
392 except Exception as ex:
393 self.log.exception(ex)
394 self._set_last_failure_msg("SSE read failed: {}".format(str(ex)))
395
396 self._reading_events = False
397 self._event.set()
398
399
400 # _do_request(), _login() and _do_request_with_login() are an extremely
401 # minimalist form of the following, with notably terse error handling:
402 # https://bitbucket.org/openattic/openattic/src/ce4543d4cbedadc21b484a098102a16efec234f9/backend/rest_client.py?at=master&fileviewer=file-view-default
403 # https://bitbucket.org/openattic/openattic/src/ce4543d4cbedadc21b484a098102a16efec234f9/backend/deepsea.py?at=master&fileviewer=file-view-default
404 # rationale:
405 # - I needed slightly different behaviour than in openATTIC (I want the
406 # caller to read the response, to allow streaming the salt-api event bus)
407 # - I didn't want to pull in 400+ lines more code into this presently
408 # experimental module, to save everyone having to review it ;-)
409
410 def _do_request(self, method, path="", data=None, stream=False):
411 """
412 returns the response, which the caller then has to read
413 """
414 url = "{0}/{1}".format(self.get_module_option('salt_api_url'), path)
415 try:
416 if method.lower() == 'get':
417 resp = requests.get(url, headers = { "X-Auth-Token": self._token },
418 data=data, stream=stream)
419 elif method.lower() == 'post':
420 resp = requests.post(url, headers = { "X-Auth-Token": self._token },
421 data=data)
422
423 else:
424 raise RequestException("Method '{}' not supported".format(method.upper()))
425 if resp.ok:
426 return resp
427 else:
428 msg = "Request failed with status code {}".format(resp.status_code)
429 raise RequestException(msg, resp.status_code)
430 except requests.exceptions.ConnectionError as ex:
431 self.log.exception(str(ex))
432 raise RequestException(str(ex))
433 except requests.exceptions.InvalidURL as ex:
434 self.log.exception(str(ex))
435 raise RequestException(str(ex))
436
437
438 def _login(self):
439 resp = self._do_request('POST', 'login', data = {
440 "eauth": self.get_module_option('salt_api_eauth'),
441 "password": self.get_module_option('salt_api_password'),
442 "username": self.get_module_option('salt_api_username')
443 })
444 self._token = resp.json()['return'][0]['token']
445 self.log.info("Salt API login successful")
446
447
448 def _do_request_with_login(self, method, path="", data=None, stream=False):
449 retries = 2
450 while True:
451 try:
452 if not self._token:
453 self._login()
454 return self._do_request(method, path, data, stream)
455 except RequestException as ex:
456 retries -= 1
457 if ex.status_code not in [401, 403] or retries == 0:
458 raise ex
459 self._token = None