]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/restful/module.py
4 from __future__
import absolute_import
20 from uuid
import uuid4
21 from pecan
import jsonify
, make_app
22 from OpenSSL
import crypto
23 from pecan
.rest
import RestController
24 from werkzeug
.serving
import make_server
, make_ssl_devcert
26 from .hooks
import ErrorHook
27 from mgr_module
import MgrModule
, CommandResult
28 from mgr_util
import build_url
31 class CannotServe(Exception):
35 class CommandsRequest(object):
37 This class handles parallel as well as sequential execution of
38 commands. The class accept a list of iterables that should be
39 executed sequentially. Each iterable can contain several commands
40 that can be executed in parallel.
44 - run c1 and c2 in parallel
45 - wait for them to finish
46 - run c3 and c4 in parallel
47 - wait for them to finish
51 def __init__(self
, commands_arrays
):
52 self
.id = str(id(self
))
54 # Filter out empty sub-requests
55 commands_arrays
= [x
for x
in commands_arrays
59 self
.waiting
= commands_arrays
[1:]
63 self
.lock
= threading
.RLock()
64 if not len(commands_arrays
):
68 # Process first iteration of commands_arrays in parallel
69 results
= self
.run(commands_arrays
[0])
71 self
.running
.extend(results
)
74 def run(self
, commands
):
76 A static method that will execute the given list of commands in
77 parallel and will return the list of command results.
80 # Gather the results (in parallel)
82 for index
, command
in enumerate(commands
):
83 tag
= '%s:%s:%d' % (__name__
, self
.id, index
)
86 result
= CommandResult(tag
)
87 result
.command
= common
.humanify_command(command
)
88 results
.append(result
)
91 context
.instance
.send_command(result
, 'mon', '', json
.dumps(command
), tag
)
102 # Run a next iteration of commands
103 commands
= self
.waiting
[0]
104 self
.waiting
= self
.waiting
[1:]
106 self
.running
.extend(self
.run(commands
))
109 def finish(self
, tag
):
111 for index
in range(len(self
.running
)):
112 if self
.running
[index
].tag
== tag
:
113 if self
.running
[index
].r
== 0:
114 self
.finished
.append(self
.running
.pop(index
))
116 self
.failed
.append(self
.running
.pop(index
))
123 def is_running(self
, tag
):
124 for result
in self
.running
:
125 if result
.tag
== tag
:
132 return not self
.running
and self
.waiting
135 def is_waiting(self
):
136 return bool(self
.waiting
)
139 def is_finished(self
):
141 return not self
.running
and not self
.waiting
144 def has_failed(self
):
145 return bool(self
.failed
)
150 if not self
.is_finished():
153 if self
.has_failed():
164 'command': x
.command
,
167 } for x
in self
.running
171 'command': x
.command
,
174 } for x
in self
.finished
177 [common
.humanify_command(y
) for y
in x
]
178 for x
in self
.waiting
182 'command': x
.command
,
185 } for x
in self
.failed
187 'is_waiting': self
.is_waiting(),
188 'is_finished': self
.is_finished(),
189 'has_failed': self
.has_failed(),
190 'state': self
.get_state(),
195 class Module(MgrModule
):
197 {'name': 'server_addr'},
198 {'name': 'server_port'},
199 {'name': 'key_file'},
200 {'name': 'enable_auth', 'type': 'bool', 'default': True},
205 "cmd": "restful create-key name=key_name,type=CephString",
206 "desc": "Create an API key with this name",
210 "cmd": "restful delete-key name=key_name,type=CephString",
211 "desc": "Delete an API key with this name",
215 "cmd": "restful list-keys",
216 "desc": "List all API keys",
220 "cmd": "restful create-self-signed-cert",
221 "desc": "Create localized self signed certificate",
225 "cmd": "restful restart",
226 "desc": "Restart API server",
231 def __init__(self
, *args
, **kwargs
):
232 super(Module
, self
).__init
__(*args
, **kwargs
)
233 context
.instance
= self
236 self
.requests_lock
= threading
.RLock()
239 self
.enable_auth
= True
243 self
.stop_server
= False
244 self
.serve_event
= threading
.Event()
248 self
.log
.debug('serve enter')
249 while not self
.stop_server
:
252 self
.server
.socket
.close()
253 except CannotServe
as cs
:
254 self
.log
.warning("server not running: %s", cs
)
256 self
.log
.error(str(traceback
.format_exc()))
258 # Wait and clear the threading event
259 self
.serve_event
.wait()
260 self
.serve_event
.clear()
261 self
.log
.debug('serve exit')
263 def refresh_keys(self
):
265 rawkeys
= self
.get_store_prefix('keys/') or {}
266 for k
, v
in rawkeys
.items():
267 self
.keys
[k
[5:]] = v
# strip of keys/ prefix
270 # Load stored authentication keys
273 jsonify
._instance
= jsonify
.GenericJSON(
276 separators
=(',', ': '),
279 server_addr
= self
.get_localized_module_option('server_addr', '::')
280 if server_addr
is None:
281 raise CannotServe('no server_addr configured; try "ceph config-key set mgr/restful/server_addr <ip>"')
283 server_port
= int(self
.get_localized_module_option('server_port', '8003'))
284 self
.log
.info('server_addr: %s server_port: %d',
285 server_addr
, server_port
)
287 cert
= self
.get_localized_store("crt")
289 cert_tmp
= tempfile
.NamedTemporaryFile()
290 cert_tmp
.write(cert
.encode('utf-8'))
292 cert_fname
= cert_tmp
.name
294 cert_fname
= self
.get_localized_store('crt_file')
296 pkey
= self
.get_localized_store("key")
298 pkey_tmp
= tempfile
.NamedTemporaryFile()
299 pkey_tmp
.write(pkey
.encode('utf-8'))
301 pkey_fname
= pkey_tmp
.name
303 pkey_fname
= self
.get_localized_module_option('key_file')
305 self
.enable_auth
= self
.get_localized_module_option('enable_auth', True)
307 if not cert_fname
or not pkey_fname
:
308 raise CannotServe('no certificate configured')
309 if not os
.path
.isfile(cert_fname
):
310 raise CannotServe('certificate %s does not exist' % cert_fname
)
311 if not os
.path
.isfile(pkey_fname
):
312 raise CannotServe('private key %s does not exist' % pkey_fname
)
314 # Publish the URI that others may use to access the service we're
315 # about to start serving
316 addr
= self
.get_mgr_ip() if server_addr
== "::" else server_addr
317 self
.set_uri(build_url(scheme
='https', host
=addr
, port
=server_port
, path
='/'))
319 # Create the HTTPS werkzeug server serving pecan app
320 self
.server
= make_server(
324 root
='restful.api.Root',
325 hooks
= [ErrorHook()], # use a callable if pecan >= 0.3.2
327 ssl_context
=(cert_fname
, pkey_fname
),
329 sock_fd_flag
= fcntl
.fcntl(self
.server
.socket
.fileno(), fcntl
.F_GETFD
)
330 if not (sock_fd_flag
& fcntl
.FD_CLOEXEC
):
331 self
.log
.debug("set server socket close-on-exec")
332 fcntl
.fcntl(self
.server
.socket
.fileno(), fcntl
.F_SETFD
, sock_fd_flag | fcntl
.FD_CLOEXEC
)
334 self
.log
.debug('made server, but stop flag set')
336 self
.log
.debug('made server, serving forever')
337 self
.server
.serve_forever()
341 self
.log
.debug('shutdown enter')
343 self
.stop_server
= True
345 self
.log
.debug('calling server.shutdown')
346 self
.server
.shutdown()
347 self
.log
.debug('called server.shutdown')
348 self
.serve_event
.set()
350 self
.log
.error(str(traceback
.format_exc()))
352 self
.log
.debug('shutdown exit')
358 self
.server
.shutdown()
359 self
.serve_event
.set()
361 self
.log
.error(str(traceback
.format_exc()))
364 def notify(self
, notify_type
, tag
):
366 self
._notify
(notify_type
, tag
)
368 self
.log
.error(str(traceback
.format_exc()))
371 def _notify(self
, notify_type
, tag
):
372 if notify_type
!= "command":
373 self
.log
.debug("Unhandled notification type '%s'", notify_type
)
375 # we can safely skip all the sequential commands
379 with self
.requests_lock
:
380 request
= next(x
for x
in self
.requests
if x
.is_running(tag
))
382 if request
.is_ready():
384 except StopIteration:
385 # the command was not issued by me
388 def config_notify(self
):
389 self
.enable_auth
= self
.get_localized_module_option('enable_auth', True)
392 def create_self_signed_cert(self
):
395 pkey
.generate_key(crypto
.TYPE_RSA
, 2048)
397 # create a self-signed cert
399 cert
.get_subject().O
= "IT"
400 cert
.get_subject().CN
= "ceph-restful"
401 cert
.set_serial_number(int(uuid4()))
402 cert
.gmtime_adj_notBefore(0)
403 cert
.gmtime_adj_notAfter(10*365*24*60*60)
404 cert
.set_issuer(cert
.get_subject())
405 cert
.set_pubkey(pkey
)
406 cert
.sign(pkey
, 'sha512')
409 crypto
.dump_certificate(crypto
.FILETYPE_PEM
, cert
),
410 crypto
.dump_privatekey(crypto
.FILETYPE_PEM
, pkey
)
414 def handle_command(self
, inbuf
, command
):
415 self
.log
.warning("Handling command: '%s'" % str(command
))
416 if command
['prefix'] == "restful create-key":
417 if command
['key_name'] in self
.keys
:
418 return 0, self
.keys
[command
['key_name']], ""
422 self
.keys
[command
['key_name']] = key
423 self
.set_store('keys/' + command
['key_name'], key
)
427 self
.keys
[command
['key_name']],
431 elif command
['prefix'] == "restful delete-key":
432 if command
['key_name'] in self
.keys
:
433 del self
.keys
[command
['key_name']]
434 self
.set_store('keys/' + command
['key_name'], None)
442 elif command
['prefix'] == "restful list-keys":
446 json
.dumps(self
.keys
, indent
=4, sort_keys
=True),
450 elif command
['prefix'] == "restful create-self-signed-cert":
451 cert
, pkey
= self
.create_self_signed_cert()
452 self
.set_store(self
.get_mgr_id() + '/crt', cert
.decode('utf-8'))
453 self
.set_store(self
.get_mgr_id() + '/key', pkey
.decode('utf-8'))
458 "Restarting RESTful API server...",
462 elif command
['prefix'] == 'restful restart':
466 "Restarting RESTful API server...",
474 "Command not found '{0}'".format(command
['prefix'])
478 def get_doc_api(self
, root
, prefix
=''):
480 for _obj
in dir(root
):
481 obj
= getattr(root
, _obj
)
483 if isinstance(obj
, RestController
):
484 doc
.update(self
.get_doc_api(obj
, prefix
+ '/' + _obj
))
486 if getattr(root
, '_lookup', None) and isinstance(root
._lookup
('0')[0], RestController
):
487 doc
.update(self
.get_doc_api(root
._lookup
('0')[0], prefix
+ '/<arg>'))
489 prefix
= prefix
or '/'
492 for method
in 'get', 'post', 'patch', 'delete':
493 if getattr(root
, method
, None):
494 doc
[prefix
][method
.upper()] = inspect
.getdoc(getattr(root
, method
)).split('\n')
496 if len(doc
[prefix
]) == 0:
503 mon_map_mons
= self
.get('mon_map')['mons']
504 mon_status
= json
.loads(self
.get('mon_status')['json'])
506 # Add more information
507 for mon
in mon_map_mons
:
508 mon
['in_quorum'] = mon
['rank'] in mon_status
['quorum']
509 mon
['server'] = self
.get_metadata("mon", mon
['name'])['hostname']
510 mon
['leader'] = mon
['rank'] == mon_status
['quorum'][0]
515 def get_osd_pools(self
):
516 osds
= dict(map(lambda x
: (x
['osd'], []), self
.get('osd_map')['osds']))
517 pools
= dict(map(lambda x
: (x
['pool'], x
), self
.get('osd_map')['pools']))
518 crush
= self
.get('osd_map_crush')
519 crush_rules
= crush
['rules']
522 for pool_id
, pool
in pools
.items():
524 for rule
in [r
for r
in crush_rules
if r
['rule_id'] == pool
['crush_rule']]:
525 if rule
['min_size'] <= pool
['size'] <= rule
['max_size']:
526 pool_osds
= common
.crush_rule_osds(crush
['buckets'], rule
)
528 osds_by_pool
[pool_id
] = pool_osds
530 for pool_id
in pools
.keys():
531 for in_pool_id
in osds_by_pool
[pool_id
]:
532 osds
[in_pool_id
].append(pool_id
)
537 def get_osds(self
, pool_id
=None, ids
=None):
539 osd_map
= self
.get('osd_map')
540 osd_metadata
= self
.get('osd_metadata')
542 # Update the data with the additional info from the osd map
543 osds
= osd_map
['osds']
547 osds
= [x
for x
in osds
if str(x
['osd']) in ids
]
549 # Get list of pools per osd node
550 pools_map
= self
.get_osd_pools()
552 # map osd IDs to reweight
553 reweight_map
= dict([
554 (x
.get('id'), x
.get('reweight', None))
555 for x
in self
.get('osd_map_tree')['nodes']
558 # Build OSD data objects
560 osd
['pools'] = pools_map
[osd
['osd']]
561 osd
['server'] = osd_metadata
.get(str(osd
['osd']), {}).get('hostname', None)
563 osd
['reweight'] = reweight_map
.get(osd
['osd'], 0.0)
566 osd
['valid_commands'] = common
.OSD_IMPLEMENTED_COMMANDS
568 osd
['valid_commands'] = []
572 pool_id
= int(pool_id
)
573 osds
= [x
for x
in osds
if pool_id
in x
['pools']]
578 def get_osd_by_id(self
, osd_id
):
579 osd
= [x
for x
in self
.get('osd_map')['osds']
580 if x
['osd'] == osd_id
]
588 def get_pool_by_id(self
, pool_id
):
589 pool
= [x
for x
in self
.get('osd_map')['pools']
590 if x
['pool'] == pool_id
]
598 def submit_request(self
, _request
, **kwargs
):
599 with self
.requests_lock
:
600 request
= CommandsRequest(_request
)
601 self
.requests
.append(request
)
602 if kwargs
.get('wait', 0):
603 while not request
.is_finished():
608 def run_command(self
, command
):
609 # tag with 'seq' so that we can ignore these in notify function
610 result
= CommandResult('seq')
612 self
.send_command(result
, 'mon', '', json
.dumps(command
), 'seq')