]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/deepsea/module.py
1 # vim: ts=8 et sw=4 sts=4
3 ceph-mgr DeepSea orchestrator module
6 # We want orchestrator methods in this to be 1:1 mappings to DeepSea runners,
7 # we don't want to aggregate multiple salt invocations here, because that means
8 # this module would need to know too much about how DeepSea works internally.
9 # Better to expose new runners from DeepSea to match what the orchestrator needs.
15 from threading
import Event
, Thread
, Lock
17 from mgr_module
import MgrModule
21 class RequestException(Exception):
22 def __init__(self
, message
, status_code
=None):
23 super(RequestException
, self
).__init
__(message
)
24 self
.status_code
= status_code
27 class DeepSeaReadCompletion(orchestrator
.ReadCompletion
):
28 def __init__(self
, process_result_callback
):
29 super(DeepSeaReadCompletion
, self
).__init
__()
30 self
._complete
= False
31 self
._cb
= process_result_callback
33 def _process_result(self
, data
):
34 self
._result
= self
._cb
(data
)
42 def is_complete(self
):
46 class DeepSeaOrchestrator(MgrModule
, orchestrator
.Orchestrator
):
49 'name': 'salt_api_url',
53 'name': 'salt_api_eauth',
54 'default': 'sharedsecret'
57 'name': 'salt_api_username',
61 'name': 'salt_api_password',
69 "cmd": "deepsea config-set name=key,type=CephString "
70 "name=value,type=CephString",
71 "desc": "Set a configuration value",
75 "cmd": "deepsea config-show",
76 "desc": "Show current configuration",
83 def config_keys(self
):
84 return dict((o
['name'], o
.get('default', None)) for o
in self
.MODULE_OPTIONS
)
87 def get_module_option(self
, key
, default
=None):
89 Overrides the default MgrModule get_module_option() method to pull in defaults
90 specific to this module
92 return super(DeepSeaOrchestrator
, self
).get_module_option(key
, default
=self
.config_keys
[key
])
95 def _config_valid(self
):
96 for key
in self
.config_keys
.keys():
97 if not self
.get_module_option(key
, self
.config_keys
[key
]):
102 def __init__(self
, *args
, **kwargs
):
103 super(DeepSeaOrchestrator
, self
).__init
__(*args
, **kwargs
)
104 self
._event
= Event()
106 self
._event
_reader
= None
107 self
._reading
_events
= False
108 self
._last
_failure
_msg
= None
109 self
._all
_completions
= dict()
110 self
._completion
_lock
= Lock()
114 if not self
._config
_valid
():
115 return False, "Configuration invalid; try `ceph deepsea config-set [...]`"
117 if not self
._reading
_events
and self
._last
_failure
_msg
:
118 return False, self
._last
_failure
_msg
123 def get_inventory(self
, node_filter
=None, refresh
=False):
125 Note that this will raise an exception (e.g. if the salt-api is down,
126 or the username/password is incorret). Same for other methods.
127 Callers should expect this and react appropriately. The orchestrator
128 cli, for example, just prints the traceback in the console, so the
129 user at least sees the error.
132 def process_result(event_data
):
134 if event_data
['success']:
135 for node_name
, node_devs
in event_data
["return"].items():
136 devs
= list(map(lambda di
:
137 orchestrator
.InventoryDevice
.from_ceph_volume_inventory(di
),
139 result
.append(orchestrator
.InventoryNode(node_name
, devs
))
141 self
.log
.error(event_data
['return'])
144 with self
._completion
_lock
:
145 c
= DeepSeaReadCompletion(process_result
)
150 nodes
= node_filter
.nodes
151 roles
= node_filter
.labels
153 resp
= self
._do
_request
_with
_login
("POST", data
= {
154 "client": "runner_async",
155 "fun": "mgr_orch.get_inventory",
160 # ['return'][0]['tag'] in the resonse JSON is what we need to match
161 # on when looking for the result event (e.g.: "salt/run/20181018074024331230")
162 self
._all
_completions
["{}/ret".format(resp
.json()['return'][0]['tag'])] = c
167 def describe_service(self
, service_type
, service_id
, node_name
):
169 # Note: describe_service() does *not* support OSDs. This is because
170 # DeepSea doesn't really record what OSDs are deployed where; Ceph is
171 # considered the canonical source of this information, so having this
172 # function query OSD information from DeepSea doesn't make a lot of
173 # sense (DeepSea would have to call back into Ceph).
175 assert service_type
in ("mon", "mgr", "mds", "rgw", None), service_type
+ " unsupported"
177 def process_result(event_data
):
179 if event_data
['success']:
180 for node_name
, service_info
in event_data
["return"].items():
181 for service_type
, service_instance
in service_info
.items():
182 desc
= orchestrator
.ServiceDescription()
183 desc
.nodename
= node_name
184 desc
.service_instance
= service_instance
185 desc
.service_type
= service_type
188 self
.log
.error(event_data
['return'])
191 with self
._completion
_lock
:
192 c
= DeepSeaReadCompletion(process_result
)
194 resp
= self
._do
_request
_with
_login
("POST", data
= {
195 "client": "runner_async",
196 "fun": "mgr_orch.describe_service",
197 "role": service_type
,
198 "service_id": service_id
,
201 self
._all
_completions
["{}/ret".format(resp
.json()['return'][0]['tag'])] = c
206 def wait(self
, completions
):
209 with self
._completion
_lock
:
210 for c
in completions
:
213 if not c
.is_complete
:
214 # TODO: the job is in the bus, it should reach us eventually
215 # unless something has gone wrong (e.g. salt-api died, etc.),
216 # in which case it's possible the job finished but we never
217 # noticed the salt/run/$id/ret event. Need to add the job ID
218 # (or possibly the full event tag) to the completion object.
219 # That way, if we want to double check on a job that hasn't
220 # been completed yet, we can make a synchronous request to
221 # salt-api to invoke jobs.lookup_jid, and if it's complete we
222 # should be able to pass its return value to _process_result()
223 # Question: do we do this automatically after some timeout?
224 # Or do we add a function so the admin can check and "unstick"
225 # a stuck completion?
228 return not incomplete
231 def handle_command(self
, inbuf
, cmd
):
232 if cmd
['prefix'] == 'deepsea config-show':
233 return 0, json
.dumps(dict([(key
, self
.get_module_option(key
)) for key
in self
.config_keys
.keys()])), ''
235 elif cmd
['prefix'] == 'deepsea config-set':
236 if cmd
['key'] not in self
.config_keys
.keys():
237 return (-errno
.EINVAL
, '',
238 "Unknown configuration option '{0}'".format(cmd
['key']))
240 self
.set_module_option(cmd
['key'], cmd
['value'])
242 return 0, "Configuration option '{0}' updated".format(cmd
['key']), ''
244 return (-errno
.EINVAL
, '',
245 "Command not found '{0}'".format(cmd
['prefix']))
249 self
.log
.info('DeepSea module starting up')
252 if not self
._config
_valid
():
253 # This will spin until the config is valid, spitting a warning
254 # that the config is invalid every 60 seconds. The one oddity
255 # is that while setting the various parameters, this log warning
256 # will print once for each parameter set until the config is valid.
257 self
.log
.warn("Configuration invalid; try `ceph deepsea config-set [...]`")
262 if self
._event
_reader
and not self
._reading
_events
:
263 self
._event
_reader
= None
265 if not self
._event
_reader
:
266 self
._last
_failure
_msg
= None
268 # This spawns a separate thread to read the salt event bus
269 # stream. We can't do it in the serve thead, because reading
270 # from the response blocks, which would prevent the serve
271 # thread from handling anything else.
273 # TODO: figure out how to restart the _event_reader thread if
274 # config changes, e.g.: a new username or password is set.
275 # This will be difficult, because _read_sse() just blocks waiting
276 # for response lines. The closest I got was setting a read timeout
277 # on the request, but in the general case (where not much is
278 # happening most of the time), this will result in continual
279 # timeouts and reconnects. We really need an asynchronous read
281 self
._event
_response
= self
._do
_request
_with
_login
("GET", "events", stream
=True)
282 self
._event
_reader
= Thread(target
=self
._read
_sse
)
283 self
._reading
_events
= True
284 self
._event
_reader
.start()
285 except Exception as ex
:
286 self
._set
_last
_failure
_msg
("Failure setting up event reader: " + str(ex
))
287 # gives an (arbitrary) 60 second retry if we can't attach to
288 # the salt-api event bus for some reason (e.g.: invalid username,
289 # or password, which will be logged as "Request failed with status
290 # code 401"). Note that this 60 second retry will also happen if
296 # Wait indefinitely for something interesting to happen (e.g.
297 # config-set, or shutdown), or the event reader to fail, which
298 # will happen if the salt-api server dies or restarts).
304 self
.log
.info('DeepSea module shutting down')
309 def _set_last_failure_msg(self
, msg
):
310 self
._last
_failure
_msg
= msg
314 # Reader/parser of SSE events, see:
315 # - https://docs.saltstack.com/en/latest/ref/netapi/all/salt.netapi.rest_cherrypy.html#events)
316 # - https://www.w3.org/TR/2009/WD-eventsource-20090421/
317 # Note: this is pretty braindead and doesn't implement the full eventsource
318 # spec, but it *does* implement enough for us to listen to events from salt
319 # and potentially do something with them.
323 # Just starting the event reader; if we've made it here, we know we're
324 # talking to salt-api (_do_request would have raised an exception if the
325 # response wasn't ok), so check if there's any completions inflight that
326 # need to be dealt with. This handles the case where some command was
327 # invoked, then salt-api died somehow, and we reconneced, but missed the
328 # completion at the time it actually happened.
329 for tag
in list(self
._all
_completions
):
330 self
.log
.info("Found event {} inflight".format(tag
))
332 resp
= self
._do
_request
_with
_login
("POST", data
= {
334 "fun": "jobs.lookup_jid",
335 "jid": tag
.split('/')[2]
337 # jobs.lookup_jid returns a dict keyed by hostname.
338 return_dict
= resp
.json()['return'][0]
340 # If the job is complete, there'll be one item in the dict.
341 self
.log
.info("Event {} complete".format(tag
))
342 # The key is the salt master hostname, but we don't care
343 # about that, so just grab the data.
344 data
= next(iter(return_dict
.items()))[1]
345 self
._all
_completions
[tag
]._process
_result
(data
)
346 # TODO: decide whether it's bad to delete the completion
347 # here -- would we ever need to resurrect it?
348 del self
._all
_completions
[tag
]
350 # if the job is not complete, there'll be nothing in the dict
351 self
.log
.info("Event {} still pending".format(tag
))
352 except Exception as ex
:
353 # Logging a warning if the request failed, so we can continue
354 # checking any other completions, then get onto reading events
355 self
.log
.warn("Error looking up inflight event {}: {}".format(tag
, str(ex
)))
357 for line
in self
._event
_response
.iter_lines():
358 with self
._completion
_lock
:
360 line
= line
.decode('utf-8')
361 colon
= line
.find(':')
366 # TODO: find out if we need to obey this reconnection time
367 self
.log
.warn("Server requested retry {}, ignored".format(v
))
371 # Empty line, terminates an event. Note that event['tag']
372 # is a salt-api extension to SSE to avoid having to decode
373 # json data if you don't care about it. To get to the
374 # interesting stuff, you want event['data'], which is json.
375 # If you want to have some fun, try
376 # `ceph daemon mgr.$(hostname) config set debug_mgr 20`
377 # then `salt '*' test.ping` on the master
378 self
.log
.debug("Got event '{}'".format(str(event
)))
380 # If we're actually interested in this event (i.e. it's
381 # in our completion dict), fire off that completion's
382 # _process_result() callback and remove it from our list.
383 if event
['tag'] in self
._all
_completions
:
384 self
.log
.info("Event {} complete".format(event
['tag']))
385 self
._all
_completions
[event
['tag']]._process
_result
(json
.loads(event
['data'])['data'])
386 # TODO: decide whether it's bad to delete the completion
387 # here -- would we ever need to resurrect it?
388 del self
._all
_completions
[event
['tag']]
391 self
._set
_last
_failure
_msg
("SSE read terminated")
392 except Exception as ex
:
393 self
.log
.exception(ex
)
394 self
._set
_last
_failure
_msg
("SSE read failed: {}".format(str(ex
)))
396 self
._reading
_events
= False
400 # _do_request(), _login() and _do_request_with_login() are an extremely
401 # minimalist form of the following, with notably terse error handling:
402 # https://bitbucket.org/openattic/openattic/src/ce4543d4cbedadc21b484a098102a16efec234f9/backend/rest_client.py?at=master&fileviewer=file-view-default
403 # https://bitbucket.org/openattic/openattic/src/ce4543d4cbedadc21b484a098102a16efec234f9/backend/deepsea.py?at=master&fileviewer=file-view-default
405 # - I needed slightly different behaviour than in openATTIC (I want the
406 # caller to read the response, to allow streaming the salt-api event bus)
407 # - I didn't want to pull in 400+ lines more code into this presently
408 # experimental module, to save everyone having to review it ;-)
410 def _do_request(self
, method
, path
="", data
=None, stream
=False):
412 returns the response, which the caller then has to read
414 url
= "{0}/{1}".format(self
.get_module_option('salt_api_url'), path
)
416 if method
.lower() == 'get':
417 resp
= requests
.get(url
, headers
= { "X-Auth-Token": self
._token
},
418 data
=data
, stream
=stream
)
419 elif method
.lower() == 'post':
420 resp
= requests
.post(url
, headers
= { "X-Auth-Token": self
._token
},
424 raise RequestException("Method '{}' not supported".format(method
.upper()))
428 msg
= "Request failed with status code {}".format(resp
.status_code
)
429 raise RequestException(msg
, resp
.status_code
)
430 except requests
.exceptions
.ConnectionError
as ex
:
431 self
.log
.exception(str(ex
))
432 raise RequestException(str(ex
))
433 except requests
.exceptions
.InvalidURL
as ex
:
434 self
.log
.exception(str(ex
))
435 raise RequestException(str(ex
))
439 resp
= self
._do
_request
('POST', 'login', data
= {
440 "eauth": self
.get_module_option('salt_api_eauth'),
441 "password": self
.get_module_option('salt_api_password'),
442 "username": self
.get_module_option('salt_api_username')
444 self
._token
= resp
.json()['return'][0]['token']
445 self
.log
.info("Salt API login successful")
448 def _do_request_with_login(self
, method
, path
="", data
=None, stream
=False):
454 return self
._do
_request
(method
, path
, data
, stream
)
455 except RequestException
as ex
:
457 if ex
.status_code
not in [401, 403] or retries
== 0: