]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/deepsea/module.py
update download target update for octopus release
[ceph.git] / ceph / src / pybind / mgr / deepsea / module.py
1 # vim: ts=8 et sw=4 sts=4
2 """
3 ceph-mgr DeepSea orchestrator module
4 """
5
6 # We want orchestrator methods in this to be 1:1 mappings to DeepSea runners,
7 # we don't want to aggregate multiple salt invocations here, because that means
8 # this module would need to know too much about how DeepSea works internally.
9 # Better to expose new runners from DeepSea to match what the orchestrator needs.
10
11 import json
12 import errno
13 import requests
14
15 from threading import Event, Thread, Lock
16
17 from mgr_module import MgrModule
18 import orchestrator
19
20
21 class RequestException(Exception):
22 def __init__(self, message, status_code=None):
23 super(RequestException, self).__init__(message)
24 self.status_code = status_code
25
26
27 class DeepSeaReadCompletion(orchestrator.ReadCompletion):
28 def __init__(self, process_result_callback):
29 super(DeepSeaReadCompletion, self).__init__()
30 self._complete = False
31 self._cb = process_result_callback
32
33 def _process_result(self, data):
34 self._result = self._cb(data)
35 self._complete = True
36
37 @property
38 def result(self):
39 return self._result
40
41 @property
42 def is_complete(self):
43 return self._complete
44
45
46 class DeepSeaOrchestrator(MgrModule, orchestrator.Orchestrator):
47 MODULE_OPTIONS = [
48 {
49 'name': 'salt_api_url',
50 'default': ''
51 },
52 {
53 'name': 'salt_api_eauth',
54 'default': 'sharedsecret'
55 },
56 {
57 'name': 'salt_api_username',
58 'default': ''
59 },
60 {
61 'name': 'salt_api_password',
62 'default': ''
63 }
64 ]
65
66
67 COMMANDS = [
68 {
69 "cmd": "deepsea config-set name=key,type=CephString "
70 "name=value,type=CephString",
71 "desc": "Set a configuration value",
72 "perm": "rw"
73 },
74 {
75 "cmd": "deepsea config-show",
76 "desc": "Show current configuration",
77 "perm": "r"
78 }
79 ]
80
81
82 @property
83 def config_keys(self):
84 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
85
86
87 def get_module_option(self, key, default=None):
88 """
89 Overrides the default MgrModule get_module_option() method to pull in defaults
90 specific to this module
91 """
92 return super(DeepSeaOrchestrator, self).get_module_option(key, default=self.config_keys[key])
93
94
95 def _config_valid(self):
96 for key in self.config_keys.keys():
97 if not self.get_module_option(key, self.config_keys[key]):
98 return False
99 return True
100
101
102 def __init__(self, *args, **kwargs):
103 super(DeepSeaOrchestrator, self).__init__(*args, **kwargs)
104 self._event = Event()
105 self._token = None
106 self._event_reader = None
107 self._reading_events = False
108 self._last_failure_msg = None
109 self._all_completions = dict()
110 self._completion_lock = Lock()
111 self.inventory_cache = orchestrator.OutdatableDict()
112 self.service_cache = orchestrator.OutdatableDict()
113
114 def available(self):
115 if not self._config_valid():
116 return False, "Configuration invalid; try `ceph deepsea config-set [...]`"
117
118 if not self._reading_events and self._last_failure_msg:
119 return False, self._last_failure_msg
120
121 return True, ""
122
123 def get_inventory(self, node_filter=None, refresh=False):
124 """
125 Note that this will raise an exception (e.g. if the salt-api is down,
126 or the username/password is incorret). Same for other methods.
127 Callers should expect this and react appropriately. The orchestrator
128 cli, for example, just prints the traceback in the console, so the
129 user at least sees the error.
130 """
131 self.inventory_cache.remove_outdated()
132 if not self.inventory_cache.any_outdated() and not refresh:
133 if node_filter is None:
134 return orchestrator.TrivialReadCompletion(
135 orchestrator.InventoryNode.from_nested_items(self.inventory_cache.items()))
136 elif node_filter.labels is None:
137 try:
138 return orchestrator.TrivialReadCompletion(
139 orchestrator.InventoryNode.from_nested_items(
140 self.inventory_cache.items_filtered(node_filter.nodes)))
141 except KeyError:
142 # items_filtered() will raise KeyError if passed a node name that doesn't exist
143 return orchestrator.TrivialReadCompletion([])
144
145 def process_result(event_data):
146 result = []
147 if event_data['success']:
148 for node_name, node_devs in event_data["return"].items():
149 if node_filter is None:
150 # The cache will only be populated when this function is invoked
151 # without a node filter, i.e. if you run it once for the whole
152 # cluster, you can then call it for individual nodes and return
153 # cached data. However, if you only *ever* call it for individual
154 # nodes, the cache will never be populated, and you'll always have
155 # the full round trip to DeepSea.
156 self.inventory_cache[node_name] = orchestrator.OutdatableData(node_devs)
157 devs = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(node_devs)
158 result.append(orchestrator.InventoryNode(node_name, devs))
159 else:
160 self.log.error(event_data['return'])
161 return result
162
163 with self._completion_lock:
164 c = DeepSeaReadCompletion(process_result)
165
166 nodes = []
167 roles = []
168 if node_filter:
169 nodes = node_filter.nodes
170 roles = node_filter.labels
171
172 resp = self._do_request_with_login("POST", data = {
173 "client": "runner_async",
174 "fun": "mgr_orch.get_inventory",
175 "nodes": nodes,
176 "roles": roles
177 })
178
179 # ['return'][0]['tag'] in the resonse JSON is what we need to match
180 # on when looking for the result event (e.g.: "salt/run/20181018074024331230")
181 self._all_completions["{}/ret".format(resp.json()['return'][0]['tag'])] = c
182
183 return c
184
185 def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
186
187 # Note: describe_service() does *not* support OSDs. This is because
188 # DeepSea doesn't really record what OSDs are deployed where; Ceph is
189 # considered the canonical source of this information, so having this
190 # function query OSD information from DeepSea doesn't make a lot of
191 # sense (DeepSea would have to call back into Ceph).
192
193 assert service_type in ("mon", "mgr", "mds", "rgw", "nfs", "iscsi", None), service_type + " unsupported"
194
195 def _deepsea_to_ceph(service):
196 if service == "ganesha":
197 return "nfs"
198 elif service == "igw":
199 return "iscsi"
200 else:
201 return service
202
203 # presently unused
204 def _ceph_to_deepsea(service):
205 if service == "nfs":
206 return "ganesha"
207 elif service == "iscsi":
208 return "igw"
209 else:
210 return service
211
212 self.service_cache.remove_outdated()
213 if not self.service_cache.any_outdated() and not refresh:
214 # Let's hope the services are complete.
215 try:
216 node_filter = [node_name] if node_name else None
217 services_by_node = [d[1].data for d in self.service_cache.items_filtered(node_filter)]
218 services = [orchestrator.ServiceDescription.from_json(s) for services in services_by_node for s in services]
219 services = [s for s in services if
220 (True if service_type is None else s.service_type == service_type) and
221 (True if service_id is None else s.service_instance == service_id)]
222 return orchestrator.TrivialReadCompletion(services)
223 except KeyError:
224 # items_filtered() will raise KeyError if passed a node name that doesn't exist
225 return orchestrator.TrivialReadCompletion([])
226
227 def process_result(event_data):
228 result = []
229 if event_data['success']:
230 for service_node, service_info in event_data["return"].items():
231 node_service_cache = []
232 for this_service_type, service_dict in service_info.items():
233 if isinstance(service_dict, str):
234 # map old form where deepsea only returned service IDs
235 # to new form where it retuns a dict
236 service_dict = { 'service_instance': service_dict }
237 desc = orchestrator.ServiceDescription(nodename=service_node,
238 service_instance=service_dict['service_instance'],
239 service_type=_deepsea_to_ceph(this_service_type),
240 # the following may or may not be present
241 container_id=service_dict.get('container_id', None),
242 service=service_dict.get('service', None),
243 version=service_dict.get('version', None),
244 rados_config_location=service_dict.get('rados_config_location', None),
245 service_url = service_dict.get('service_url', None),
246 status=service_dict.get('status', None),
247 status_desc=service_dict.get('status_desc', None)
248 )
249 # Always add every service to the cache...
250 node_service_cache.append(desc.to_json())
251 # ...but only return the ones the caller asked for
252 if ((service_type is None or desc.service_type == service_type) and
253 (service_id is None or desc.service_instance == service_id) and
254 (node_name is None or desc.nodename == node_name)):
255 result.append(desc)
256
257 self.service_cache[service_node] = orchestrator.OutdatableData(node_service_cache)
258 else:
259 self.log.error(event_data['return'])
260 return result
261
262 with self._completion_lock:
263 c = DeepSeaReadCompletion(process_result)
264
265 # Always request all services, so we always have all services cached.
266 resp = self._do_request_with_login("POST", data = {
267 "client": "runner_async",
268 "fun": "mgr_orch.describe_service"
269 })
270 self._all_completions["{}/ret".format(resp.json()['return'][0]['tag'])] = c
271
272 return c
273
274 def wait(self, completions):
275 incomplete = False
276
277 with self._completion_lock:
278 for c in completions:
279 if c.is_complete:
280 continue
281 if not c.is_complete:
282 # TODO: the job is in the bus, it should reach us eventually
283 # unless something has gone wrong (e.g. salt-api died, etc.),
284 # in which case it's possible the job finished but we never
285 # noticed the salt/run/$id/ret event. Need to add the job ID
286 # (or possibly the full event tag) to the completion object.
287 # That way, if we want to double check on a job that hasn't
288 # been completed yet, we can make a synchronous request to
289 # salt-api to invoke jobs.lookup_jid, and if it's complete we
290 # should be able to pass its return value to _process_result()
291 # Question: do we do this automatically after some timeout?
292 # Or do we add a function so the admin can check and "unstick"
293 # a stuck completion?
294 incomplete = True
295
296 return not incomplete
297
298
299 def handle_command(self, inbuf, cmd):
300 if cmd['prefix'] == 'deepsea config-show':
301 return 0, json.dumps(dict([(key, self.get_module_option(key)) for key in self.config_keys.keys()])), ''
302
303 elif cmd['prefix'] == 'deepsea config-set':
304 if cmd['key'] not in self.config_keys.keys():
305 return (-errno.EINVAL, '',
306 "Unknown configuration option '{0}'".format(cmd['key']))
307
308 self.set_module_option(cmd['key'], cmd['value'])
309 self._event.set()
310 return 0, "Configuration option '{0}' updated".format(cmd['key']), ''
311
312 return (-errno.EINVAL, '',
313 "Command not found '{0}'".format(cmd['prefix']))
314
315
316 def serve(self):
317 self.log.info('DeepSea module starting up')
318 self.run = True
319 while self.run:
320 if not self._config_valid():
321 # This will spin until the config is valid, spitting a warning
322 # that the config is invalid every 60 seconds. The one oddity
323 # is that while setting the various parameters, this log warning
324 # will print once for each parameter set until the config is valid.
325 self.log.warn("Configuration invalid; try `ceph deepsea config-set [...]`")
326 self._event.wait(60)
327 self._event.clear()
328 continue
329
330 if self._event_reader and not self._reading_events:
331 self._event_reader = None
332
333 if not self._event_reader:
334 self._last_failure_msg = None
335 try:
336 # This spawns a separate thread to read the salt event bus
337 # stream. We can't do it in the serve thead, because reading
338 # from the response blocks, which would prevent the serve
339 # thread from handling anything else.
340 #
341 # TODO: figure out how to restart the _event_reader thread if
342 # config changes, e.g.: a new username or password is set.
343 # This will be difficult, because _read_sse() just blocks waiting
344 # for response lines. The closest I got was setting a read timeout
345 # on the request, but in the general case (where not much is
346 # happening most of the time), this will result in continual
347 # timeouts and reconnects. We really need an asynchronous read
348 # to support this.
349 self._event_response = self._do_request_with_login("GET", "events", stream=True)
350 self._event_reader = Thread(target=self._read_sse)
351 self._reading_events = True
352 self._event_reader.start()
353 except Exception as ex:
354 self._set_last_failure_msg("Failure setting up event reader: " + str(ex))
355 # gives an (arbitrary) 60 second retry if we can't attach to
356 # the salt-api event bus for some reason (e.g.: invalid username,
357 # or password, which will be logged as "Request failed with status
358 # code 401"). Note that this 60 second retry will also happen if
359 # salt-api dies.
360 self._event.wait(60)
361 self._event.clear()
362 continue
363
364 # Wait indefinitely for something interesting to happen (e.g.
365 # config-set, or shutdown), or the event reader to fail, which
366 # will happen if the salt-api server dies or restarts).
367 self._event.wait()
368 self._event.clear()
369
370
371 def shutdown(self):
372 self.log.info('DeepSea module shutting down')
373 self.run = False
374 self._event.set()
375
376
377 def _set_last_failure_msg(self, msg):
378 self._last_failure_msg = msg
379 self.log.warn(msg)
380
381
382 # Reader/parser of SSE events, see:
383 # - https://docs.saltstack.com/en/latest/ref/netapi/all/salt.netapi.rest_cherrypy.html#events)
384 # - https://www.w3.org/TR/2009/WD-eventsource-20090421/
385 # Note: this is pretty braindead and doesn't implement the full eventsource
386 # spec, but it *does* implement enough for us to listen to events from salt
387 # and potentially do something with them.
388 def _read_sse(self):
389 event = {}
390 try:
391 # Just starting the event reader; if we've made it here, we know we're
392 # talking to salt-api (_do_request would have raised an exception if the
393 # response wasn't ok), so check if there's any completions inflight that
394 # need to be dealt with. This handles the case where some command was
395 # invoked, then salt-api died somehow, and we reconneced, but missed the
396 # completion at the time it actually happened.
397 for tag in list(self._all_completions):
398 self.log.info("Found event {} inflight".format(tag))
399 try:
400 resp = self._do_request_with_login("POST", data = {
401 "client": "runner",
402 "fun": "jobs.lookup_jid",
403 "jid": tag.split('/')[2]
404 })
405 # jobs.lookup_jid returns a dict keyed by hostname.
406 return_dict = resp.json()['return'][0]
407 if return_dict:
408 # If the job is complete, there'll be one item in the dict.
409 self.log.info("Event {} complete".format(tag))
410 # The key is the salt master hostname, but we don't care
411 # about that, so just grab the data.
412 data = next(iter(return_dict.items()))[1]
413 self._all_completions[tag]._process_result(data)
414 # TODO: decide whether it's bad to delete the completion
415 # here -- would we ever need to resurrect it?
416 del self._all_completions[tag]
417 else:
418 # if the job is not complete, there'll be nothing in the dict
419 self.log.info("Event {} still pending".format(tag))
420 except Exception as ex:
421 # Logging a warning if the request failed, so we can continue
422 # checking any other completions, then get onto reading events
423 self.log.warn("Error looking up inflight event {}: {}".format(tag, str(ex)))
424
425 for line in self._event_response.iter_lines():
426 with self._completion_lock:
427 if line:
428 line = line.decode('utf-8')
429 colon = line.find(':')
430 if colon > 0:
431 k = line[:colon]
432 v = line[colon+2:]
433 if k == "retry":
434 # TODO: find out if we need to obey this reconnection time
435 self.log.warn("Server requested retry {}, ignored".format(v))
436 else:
437 event[k] = v
438 else:
439 # Empty line, terminates an event. Note that event['tag']
440 # is a salt-api extension to SSE to avoid having to decode
441 # json data if you don't care about it. To get to the
442 # interesting stuff, you want event['data'], which is json.
443 # If you want to have some fun, try
444 # `ceph daemon mgr.$(hostname) config set debug_mgr 20`
445 # then `salt '*' test.ping` on the master
446 self.log.debug("Got event '{}'".format(str(event)))
447
448 # If we're actually interested in this event (i.e. it's
449 # in our completion dict), fire off that completion's
450 # _process_result() callback and remove it from our list.
451 if event['tag'] in self._all_completions:
452 self.log.info("Event {} complete".format(event['tag']))
453 self._all_completions[event['tag']]._process_result(json.loads(event['data'])['data'])
454 # TODO: decide whether it's bad to delete the completion
455 # here -- would we ever need to resurrect it?
456 del self._all_completions[event['tag']]
457
458 event = {}
459 self._set_last_failure_msg("SSE read terminated")
460 except Exception as ex:
461 self.log.exception(ex)
462 self._set_last_failure_msg("SSE read failed: {}".format(str(ex)))
463
464 self._reading_events = False
465 self._event.set()
466
467
468 # _do_request(), _login() and _do_request_with_login() are an extremely
469 # minimalist form of the following, with notably terse error handling:
470 # https://bitbucket.org/openattic/openattic/src/ce4543d4cbedadc21b484a098102a16efec234f9/backend/rest_client.py?at=master&fileviewer=file-view-default
471 # https://bitbucket.org/openattic/openattic/src/ce4543d4cbedadc21b484a098102a16efec234f9/backend/deepsea.py?at=master&fileviewer=file-view-default
472 # rationale:
473 # - I needed slightly different behaviour than in openATTIC (I want the
474 # caller to read the response, to allow streaming the salt-api event bus)
475 # - I didn't want to pull in 400+ lines more code into this presently
476 # experimental module, to save everyone having to review it ;-)
477
478 def _do_request(self, method, path="", data=None, stream=False):
479 """
480 returns the response, which the caller then has to read
481 """
482 url = "{0}/{1}".format(self.get_module_option('salt_api_url'), path)
483 try:
484 if method.lower() == 'get':
485 resp = requests.get(url, headers = { "X-Auth-Token": self._token },
486 data=data, stream=stream)
487 elif method.lower() == 'post':
488 resp = requests.post(url, headers = { "X-Auth-Token": self._token },
489 data=data)
490
491 else:
492 raise RequestException("Method '{}' not supported".format(method.upper()))
493 if resp.ok:
494 return resp
495 else:
496 msg = "Request failed with status code {}".format(resp.status_code)
497 raise RequestException(msg, resp.status_code)
498 except requests.exceptions.ConnectionError as ex:
499 self.log.exception(str(ex))
500 raise RequestException(str(ex))
501 except requests.exceptions.InvalidURL as ex:
502 self.log.exception(str(ex))
503 raise RequestException(str(ex))
504
505
506 def _login(self):
507 resp = self._do_request('POST', 'login', data = {
508 "eauth": self.get_module_option('salt_api_eauth'),
509 "password": self.get_module_option('salt_api_password'),
510 "username": self.get_module_option('salt_api_username')
511 })
512 self._token = resp.json()['return'][0]['token']
513 self.log.info("Salt API login successful")
514
515
516 def _do_request_with_login(self, method, path="", data=None, stream=False):
517 retries = 2
518 while True:
519 try:
520 if not self._token:
521 self._login()
522 return self._do_request(method, path, data, stream)
523 except RequestException as ex:
524 retries -= 1
525 if ex.status_code not in [401, 403] or retries == 0:
526 raise ex
527 self._token = None