]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/deepsea/module.py
1 # vim: ts=8 et sw=4 sts=4
3 ceph-mgr DeepSea orchestrator module
6 # We want orchestrator methods in this to be 1:1 mappings to DeepSea runners,
7 # we don't want to aggregate multiple salt invocations here, because that means
8 # this module would need to know too much about how DeepSea works internally.
9 # Better to expose new runners from DeepSea to match what the orchestrator needs.
15 from threading
import Event
, Thread
, Lock
17 from mgr_module
import MgrModule
21 class RequestException(Exception):
22 def __init__(self
, message
, status_code
=None):
23 super(RequestException
, self
).__init
__(message
)
24 self
.status_code
= status_code
27 class DeepSeaReadCompletion(orchestrator
.ReadCompletion
):
28 def __init__(self
, process_result_callback
):
29 super(DeepSeaReadCompletion
, self
).__init
__()
30 self
._complete
= False
31 self
._cb
= process_result_callback
33 def _process_result(self
, data
):
34 self
._result
= self
._cb
(data
)
42 def is_complete(self
):
46 class DeepSeaOrchestrator(MgrModule
, orchestrator
.Orchestrator
):
49 'name': 'salt_api_url',
53 'name': 'salt_api_eauth',
54 'default': 'sharedsecret'
57 'name': 'salt_api_username',
61 'name': 'salt_api_password',
69 "cmd": "deepsea config-set name=key,type=CephString "
70 "name=value,type=CephString",
71 "desc": "Set a configuration value",
75 "cmd": "deepsea config-show",
76 "desc": "Show current configuration",
83 def config_keys(self
):
84 return dict((o
['name'], o
.get('default', None)) for o
in self
.MODULE_OPTIONS
)
87 def get_module_option(self
, key
, default
=None):
89 Overrides the default MgrModule get_module_option() method to pull in defaults
90 specific to this module
92 return super(DeepSeaOrchestrator
, self
).get_module_option(key
, default
=self
.config_keys
[key
])
95 def _config_valid(self
):
96 for key
in self
.config_keys
.keys():
97 if not self
.get_module_option(key
, self
.config_keys
[key
]):
102 def __init__(self
, *args
, **kwargs
):
103 super(DeepSeaOrchestrator
, self
).__init
__(*args
, **kwargs
)
104 self
._event
= Event()
106 self
._event
_reader
= None
107 self
._reading
_events
= False
108 self
._last
_failure
_msg
= None
109 self
._all
_completions
= dict()
110 self
._completion
_lock
= Lock()
111 self
.inventory_cache
= orchestrator
.OutdatableDict()
112 self
.service_cache
= orchestrator
.OutdatableDict()
115 if not self
._config
_valid
():
116 return False, "Configuration invalid; try `ceph deepsea config-set [...]`"
118 if not self
._reading
_events
and self
._last
_failure
_msg
:
119 return False, self
._last
_failure
_msg
123 def get_inventory(self
, node_filter
=None, refresh
=False):
125 Note that this will raise an exception (e.g. if the salt-api is down,
126 or the username/password is incorret). Same for other methods.
127 Callers should expect this and react appropriately. The orchestrator
128 cli, for example, just prints the traceback in the console, so the
129 user at least sees the error.
131 self
.inventory_cache
.remove_outdated()
132 if not self
.inventory_cache
.any_outdated() and not refresh
:
133 if node_filter
is None:
134 return orchestrator
.TrivialReadCompletion(
135 orchestrator
.InventoryNode
.from_nested_items(self
.inventory_cache
.items()))
136 elif node_filter
.labels
is None:
138 return orchestrator
.TrivialReadCompletion(
139 orchestrator
.InventoryNode
.from_nested_items(
140 self
.inventory_cache
.items_filtered(node_filter
.nodes
)))
142 # items_filtered() will raise KeyError if passed a node name that doesn't exist
143 return orchestrator
.TrivialReadCompletion([])
145 def process_result(event_data
):
147 if event_data
['success']:
148 for node_name
, node_devs
in event_data
["return"].items():
149 if node_filter
is None:
150 # The cache will only be populated when this function is invoked
151 # without a node filter, i.e. if you run it once for the whole
152 # cluster, you can then call it for individual nodes and return
153 # cached data. However, if you only *ever* call it for individual
154 # nodes, the cache will never be populated, and you'll always have
155 # the full round trip to DeepSea.
156 self
.inventory_cache
[node_name
] = orchestrator
.OutdatableData(node_devs
)
157 devs
= orchestrator
.InventoryDevice
.from_ceph_volume_inventory_list(node_devs
)
158 result
.append(orchestrator
.InventoryNode(node_name
, devs
))
160 self
.log
.error(event_data
['return'])
163 with self
._completion
_lock
:
164 c
= DeepSeaReadCompletion(process_result
)
169 nodes
= node_filter
.nodes
170 roles
= node_filter
.labels
172 resp
= self
._do
_request
_with
_login
("POST", data
= {
173 "client": "runner_async",
174 "fun": "mgr_orch.get_inventory",
179 # ['return'][0]['tag'] in the resonse JSON is what we need to match
180 # on when looking for the result event (e.g.: "salt/run/20181018074024331230")
181 self
._all
_completions
["{}/ret".format(resp
.json()['return'][0]['tag'])] = c
185 def describe_service(self
, service_type
=None, service_id
=None, node_name
=None, refresh
=False):
187 # Note: describe_service() does *not* support OSDs. This is because
188 # DeepSea doesn't really record what OSDs are deployed where; Ceph is
189 # considered the canonical source of this information, so having this
190 # function query OSD information from DeepSea doesn't make a lot of
191 # sense (DeepSea would have to call back into Ceph).
193 assert service_type
in ("mon", "mgr", "mds", "rgw", "nfs", "iscsi", None), service_type
+ " unsupported"
195 def _deepsea_to_ceph(service
):
196 if service
== "ganesha":
198 elif service
== "igw":
204 def _ceph_to_deepsea(service
):
207 elif service
== "iscsi":
212 self
.service_cache
.remove_outdated()
213 if not self
.service_cache
.any_outdated() and not refresh
:
214 # Let's hope the services are complete.
216 node_filter
= [node_name
] if node_name
else None
217 services_by_node
= [d
[1].data
for d
in self
.service_cache
.items_filtered(node_filter
)]
218 services
= [orchestrator
.ServiceDescription
.from_json(s
) for services
in services_by_node
for s
in services
]
219 services
= [s
for s
in services
if
220 (True if service_type
is None else s
.service_type
== service_type
) and
221 (True if service_id
is None else s
.service_instance
== service_id
)]
222 return orchestrator
.TrivialReadCompletion(services
)
224 # items_filtered() will raise KeyError if passed a node name that doesn't exist
225 return orchestrator
.TrivialReadCompletion([])
227 def process_result(event_data
):
229 if event_data
['success']:
230 for service_node
, service_info
in event_data
["return"].items():
231 node_service_cache
= []
232 for this_service_type
, service_dict
in service_info
.items():
233 if isinstance(service_dict
, str):
234 # map old form where deepsea only returned service IDs
235 # to new form where it retuns a dict
236 service_dict
= { 'service_instance': service_dict
}
237 desc
= orchestrator
.ServiceDescription(nodename
=service_node
,
238 service_instance
=service_dict
['service_instance'],
239 service_type
=_deepsea_to_ceph(this_service_type
),
240 # the following may or may not be present
241 container_id
=service_dict
.get('container_id', None),
242 service
=service_dict
.get('service', None),
243 version
=service_dict
.get('version', None),
244 rados_config_location
=service_dict
.get('rados_config_location', None),
245 service_url
= service_dict
.get('service_url', None),
246 status
=service_dict
.get('status', None),
247 status_desc
=service_dict
.get('status_desc', None)
249 # Always add every service to the cache...
250 node_service_cache
.append(desc
.to_json())
251 # ...but only return the ones the caller asked for
252 if ((service_type
is None or desc
.service_type
== service_type
) and
253 (service_id
is None or desc
.service_instance
== service_id
) and
254 (node_name
is None or desc
.nodename
== node_name
)):
257 self
.service_cache
[service_node
] = orchestrator
.OutdatableData(node_service_cache
)
259 self
.log
.error(event_data
['return'])
262 with self
._completion
_lock
:
263 c
= DeepSeaReadCompletion(process_result
)
265 # Always request all services, so we always have all services cached.
266 resp
= self
._do
_request
_with
_login
("POST", data
= {
267 "client": "runner_async",
268 "fun": "mgr_orch.describe_service"
270 self
._all
_completions
["{}/ret".format(resp
.json()['return'][0]['tag'])] = c
274 def wait(self
, completions
):
277 with self
._completion
_lock
:
278 for c
in completions
:
281 if not c
.is_complete
:
282 # TODO: the job is in the bus, it should reach us eventually
283 # unless something has gone wrong (e.g. salt-api died, etc.),
284 # in which case it's possible the job finished but we never
285 # noticed the salt/run/$id/ret event. Need to add the job ID
286 # (or possibly the full event tag) to the completion object.
287 # That way, if we want to double check on a job that hasn't
288 # been completed yet, we can make a synchronous request to
289 # salt-api to invoke jobs.lookup_jid, and if it's complete we
290 # should be able to pass its return value to _process_result()
291 # Question: do we do this automatically after some timeout?
292 # Or do we add a function so the admin can check and "unstick"
293 # a stuck completion?
296 return not incomplete
299 def handle_command(self
, inbuf
, cmd
):
300 if cmd
['prefix'] == 'deepsea config-show':
301 return 0, json
.dumps(dict([(key
, self
.get_module_option(key
)) for key
in self
.config_keys
.keys()])), ''
303 elif cmd
['prefix'] == 'deepsea config-set':
304 if cmd
['key'] not in self
.config_keys
.keys():
305 return (-errno
.EINVAL
, '',
306 "Unknown configuration option '{0}'".format(cmd
['key']))
308 self
.set_module_option(cmd
['key'], cmd
['value'])
310 return 0, "Configuration option '{0}' updated".format(cmd
['key']), ''
312 return (-errno
.EINVAL
, '',
313 "Command not found '{0}'".format(cmd
['prefix']))
317 self
.log
.info('DeepSea module starting up')
320 if not self
._config
_valid
():
321 # This will spin until the config is valid, spitting a warning
322 # that the config is invalid every 60 seconds. The one oddity
323 # is that while setting the various parameters, this log warning
324 # will print once for each parameter set until the config is valid.
325 self
.log
.warn("Configuration invalid; try `ceph deepsea config-set [...]`")
330 if self
._event
_reader
and not self
._reading
_events
:
331 self
._event
_reader
= None
333 if not self
._event
_reader
:
334 self
._last
_failure
_msg
= None
336 # This spawns a separate thread to read the salt event bus
337 # stream. We can't do it in the serve thead, because reading
338 # from the response blocks, which would prevent the serve
339 # thread from handling anything else.
341 # TODO: figure out how to restart the _event_reader thread if
342 # config changes, e.g.: a new username or password is set.
343 # This will be difficult, because _read_sse() just blocks waiting
344 # for response lines. The closest I got was setting a read timeout
345 # on the request, but in the general case (where not much is
346 # happening most of the time), this will result in continual
347 # timeouts and reconnects. We really need an asynchronous read
349 self
._event
_response
= self
._do
_request
_with
_login
("GET", "events", stream
=True)
350 self
._event
_reader
= Thread(target
=self
._read
_sse
)
351 self
._reading
_events
= True
352 self
._event
_reader
.start()
353 except Exception as ex
:
354 self
._set
_last
_failure
_msg
("Failure setting up event reader: " + str(ex
))
355 # gives an (arbitrary) 60 second retry if we can't attach to
356 # the salt-api event bus for some reason (e.g.: invalid username,
357 # or password, which will be logged as "Request failed with status
358 # code 401"). Note that this 60 second retry will also happen if
364 # Wait indefinitely for something interesting to happen (e.g.
365 # config-set, or shutdown), or the event reader to fail, which
366 # will happen if the salt-api server dies or restarts).
372 self
.log
.info('DeepSea module shutting down')
377 def _set_last_failure_msg(self
, msg
):
378 self
._last
_failure
_msg
= msg
382 # Reader/parser of SSE events, see:
383 # - https://docs.saltstack.com/en/latest/ref/netapi/all/salt.netapi.rest_cherrypy.html#events)
384 # - https://www.w3.org/TR/2009/WD-eventsource-20090421/
385 # Note: this is pretty braindead and doesn't implement the full eventsource
386 # spec, but it *does* implement enough for us to listen to events from salt
387 # and potentially do something with them.
391 # Just starting the event reader; if we've made it here, we know we're
392 # talking to salt-api (_do_request would have raised an exception if the
393 # response wasn't ok), so check if there's any completions inflight that
394 # need to be dealt with. This handles the case where some command was
395 # invoked, then salt-api died somehow, and we reconneced, but missed the
396 # completion at the time it actually happened.
397 for tag
in list(self
._all
_completions
):
398 self
.log
.info("Found event {} inflight".format(tag
))
400 resp
= self
._do
_request
_with
_login
("POST", data
= {
402 "fun": "jobs.lookup_jid",
403 "jid": tag
.split('/')[2]
405 # jobs.lookup_jid returns a dict keyed by hostname.
406 return_dict
= resp
.json()['return'][0]
408 # If the job is complete, there'll be one item in the dict.
409 self
.log
.info("Event {} complete".format(tag
))
410 # The key is the salt master hostname, but we don't care
411 # about that, so just grab the data.
412 data
= next(iter(return_dict
.items()))[1]
413 self
._all
_completions
[tag
]._process
_result
(data
)
414 # TODO: decide whether it's bad to delete the completion
415 # here -- would we ever need to resurrect it?
416 del self
._all
_completions
[tag
]
418 # if the job is not complete, there'll be nothing in the dict
419 self
.log
.info("Event {} still pending".format(tag
))
420 except Exception as ex
:
421 # Logging a warning if the request failed, so we can continue
422 # checking any other completions, then get onto reading events
423 self
.log
.warn("Error looking up inflight event {}: {}".format(tag
, str(ex
)))
425 for line
in self
._event
_response
.iter_lines():
426 with self
._completion
_lock
:
428 line
= line
.decode('utf-8')
429 colon
= line
.find(':')
434 # TODO: find out if we need to obey this reconnection time
435 self
.log
.warn("Server requested retry {}, ignored".format(v
))
439 # Empty line, terminates an event. Note that event['tag']
440 # is a salt-api extension to SSE to avoid having to decode
441 # json data if you don't care about it. To get to the
442 # interesting stuff, you want event['data'], which is json.
443 # If you want to have some fun, try
444 # `ceph daemon mgr.$(hostname) config set debug_mgr 20`
445 # then `salt '*' test.ping` on the master
446 self
.log
.debug("Got event '{}'".format(str(event
)))
448 # If we're actually interested in this event (i.e. it's
449 # in our completion dict), fire off that completion's
450 # _process_result() callback and remove it from our list.
451 if event
['tag'] in self
._all
_completions
:
452 self
.log
.info("Event {} complete".format(event
['tag']))
453 self
._all
_completions
[event
['tag']]._process
_result
(json
.loads(event
['data'])['data'])
454 # TODO: decide whether it's bad to delete the completion
455 # here -- would we ever need to resurrect it?
456 del self
._all
_completions
[event
['tag']]
459 self
._set
_last
_failure
_msg
("SSE read terminated")
460 except Exception as ex
:
461 self
.log
.exception(ex
)
462 self
._set
_last
_failure
_msg
("SSE read failed: {}".format(str(ex
)))
464 self
._reading
_events
= False
468 # _do_request(), _login() and _do_request_with_login() are an extremely
469 # minimalist form of the following, with notably terse error handling:
470 # https://bitbucket.org/openattic/openattic/src/ce4543d4cbedadc21b484a098102a16efec234f9/backend/rest_client.py?at=master&fileviewer=file-view-default
471 # https://bitbucket.org/openattic/openattic/src/ce4543d4cbedadc21b484a098102a16efec234f9/backend/deepsea.py?at=master&fileviewer=file-view-default
473 # - I needed slightly different behaviour than in openATTIC (I want the
474 # caller to read the response, to allow streaming the salt-api event bus)
475 # - I didn't want to pull in 400+ lines more code into this presently
476 # experimental module, to save everyone having to review it ;-)
478 def _do_request(self
, method
, path
="", data
=None, stream
=False):
480 returns the response, which the caller then has to read
482 url
= "{0}/{1}".format(self
.get_module_option('salt_api_url'), path
)
484 if method
.lower() == 'get':
485 resp
= requests
.get(url
, headers
= { "X-Auth-Token": self
._token
},
486 data
=data
, stream
=stream
)
487 elif method
.lower() == 'post':
488 resp
= requests
.post(url
, headers
= { "X-Auth-Token": self
._token
},
492 raise RequestException("Method '{}' not supported".format(method
.upper()))
496 msg
= "Request failed with status code {}".format(resp
.status_code
)
497 raise RequestException(msg
, resp
.status_code
)
498 except requests
.exceptions
.ConnectionError
as ex
:
499 self
.log
.exception(str(ex
))
500 raise RequestException(str(ex
))
501 except requests
.exceptions
.InvalidURL
as ex
:
502 self
.log
.exception(str(ex
))
503 raise RequestException(str(ex
))
507 resp
= self
._do
_request
('POST', 'login', data
= {
508 "eauth": self
.get_module_option('salt_api_eauth'),
509 "password": self
.get_module_option('salt_api_password'),
510 "username": self
.get_module_option('salt_api_username')
512 self
._token
= resp
.json()['return'][0]['token']
513 self
.log
.info("Salt API login successful")
516 def _do_request_with_login(self
, method
, path
="", data
=None, stream
=False):
522 return self
._do
_request
(method
, path
, data
, stream
)
523 except RequestException
as ex
:
525 if ex
.status_code
not in [401, 403] or retries
== 0: