]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/restful/module.py
16 from uuid
import uuid4
17 from pecan
import jsonify
, make_app
18 from OpenSSL
import crypto
19 from pecan
.rest
import RestController
20 from werkzeug
.serving
import make_server
, make_ssl_devcert
22 from hooks
import ErrorHook
23 from mgr_module
import MgrModule
, CommandResult
25 # Global instance to share
30 class CommandsRequest(object):
32 This class handles parallel as well as sequential execution of
33 commands. The class accept a list of iterables that should be
34 executed sequentially. Each iterable can contain several commands
35 that can be executed in parallel.
39 - run c1 and c2 in parallel
40 - wait for them to finish
41 - run c3 and c4 in parallel
42 - wait for them to finish
46 def __init__(self
, commands_arrays
):
47 self
.id = str(id(self
))
49 # Filter out empty sub-requests
50 commands_arrays
= filter(
51 lambda x
: len(x
) != 0,
56 self
.waiting
= commands_arrays
[1:]
60 self
.lock
= threading
.RLock()
61 if not len(commands_arrays
):
65 # Process first iteration of commands_arrays in parallel
66 results
= self
.run(commands_arrays
[0])
68 self
.running
.extend(results
)
71 def run(self
, commands
):
73 A static method that will execute the given list of commands in
74 parallel and will return the list of command results.
77 # Gather the results (in parallel)
79 for index
in range(len(commands
)):
80 tag
= '%s:%d' % (str(self
.id), index
)
83 result
= CommandResult(tag
)
84 result
.command
= common
.humanify_command(commands
[index
])
85 results
.append(result
)
88 instance
.send_command(result
, 'mon', '', json
.dumps(commands
[index
]), tag
)
99 # Run a next iteration of commands
100 commands
= self
.waiting
[0]
101 self
.waiting
= self
.waiting
[1:]
103 self
.running
.extend(self
.run(commands
))
106 def finish(self
, tag
):
108 for index
in range(len(self
.running
)):
109 if self
.running
[index
].tag
== tag
:
110 if self
.running
[index
].r
== 0:
111 self
.finished
.append(self
.running
.pop(index
))
113 self
.failed
.append(self
.running
.pop(index
))
120 def is_running(self
, tag
):
121 for result
in self
.running
:
122 if result
.tag
== tag
:
129 return not self
.running
and self
.waiting
132 def is_waiting(self
):
133 return bool(self
.waiting
)
136 def is_finished(self
):
138 return not self
.running
and not self
.waiting
141 def has_failed(self
):
142 return bool(self
.failed
)
147 if not self
.is_finished():
150 if self
.has_failed():
161 'command': x
.command
,
169 'command': x
.command
,
177 'command': x
.command
,
185 'command': x
.command
,
191 'is_waiting': self
.is_waiting(),
192 'is_finished': self
.is_finished(),
193 'has_failed': self
.has_failed(),
194 'state': self
.get_state(),
199 class Module(MgrModule
):
202 "cmd": "restful create-key name=key_name,type=CephString",
203 "desc": "Create an API key with this name",
207 "cmd": "restful delete-key name=key_name,type=CephString",
208 "desc": "Delete an API key with this name",
212 "cmd": "restful list-keys",
213 "desc": "List all API keys",
217 "cmd": "restful create-self-signed-cert",
218 "desc": "Create localized self signed certificate",
222 "cmd": "restful restart",
223 "desc": "Restart API server",
228 def __init__(self
, *args
, **kwargs
):
229 super(Module
, self
).__init
__(*args
, **kwargs
)
234 self
.requests_lock
= threading
.RLock()
237 self
.disable_auth
= False
241 self
.stop_server
= False
242 self
.serve_event
= threading
.Event()
246 while not self
.stop_server
:
249 self
.server
.socket
.close()
251 self
.log
.error(str(traceback
.format_exc()))
253 # Wait and clear the threading event
254 self
.serve_event
.wait()
255 self
.serve_event
.clear()
257 def get_localized_config(self
, key
):
258 r
= self
.get_config(self
.get_mgr_id() + '/' + key
)
260 r
= self
.get_config(key
)
263 def refresh_keys(self
):
265 rawkeys
= self
.get_config_prefix('keys/') or {}
266 for k
, v
in rawkeys
.iteritems():
267 self
.keys
[k
[5:]] = v
# strip of keys/ prefix
270 # Load stored authentication keys
273 jsonify
._instance
= jsonify
.GenericJSON(
276 separators
=(',', ': '),
279 server_addr
= self
.get_localized_config('server_addr')
280 if server_addr
is None:
281 raise RuntimeError('no server_addr configured; try "ceph config-key put mgr/restful/server_addr <ip>"')
282 server_port
= int(self
.get_localized_config('server_port') or '8003')
283 self
.log
.info('server_addr: %s server_port: %d',
284 server_addr
, server_port
)
286 cert
= self
.get_localized_config("crt")
288 cert_tmp
= tempfile
.NamedTemporaryFile()
291 cert_fname
= cert_tmp
.name
293 cert_fname
= self
.get_localized_config('crt_file')
295 pkey
= self
.get_localized_config("key")
297 pkey_tmp
= tempfile
.NamedTemporaryFile()
300 pkey_fname
= pkey_tmp
.name
302 pkey_fname
= self
.get_localized_config('key_file')
304 if not cert_fname
or not pkey_fname
:
305 raise RuntimeError('no certificate configured')
306 if not os
.path
.isfile(cert_fname
):
307 raise RuntimeError('certificate %s does not exist' % cert_fname
)
308 if not os
.path
.isfile(pkey_fname
):
309 raise RuntimeError('private key %s does not exist' % pkey_fname
)
311 # Create the HTTPS werkzeug server serving pecan app
312 self
.server
= make_server(
316 root
='restful.api.Root',
317 hooks
= [ErrorHook()], # use a callable if pecan >= 0.3.2
319 ssl_context
=(cert_fname
, pkey_fname
),
322 self
.server
.serve_forever()
327 self
.stop_server
= True
329 self
.server
.shutdown()
330 self
.serve_event
.set()
332 self
.log
.error(str(traceback
.format_exc()))
339 self
.server
.shutdown()
340 self
.serve_event
.set()
342 self
.log
.error(str(traceback
.format_exc()))
345 def notify(self
, notify_type
, tag
):
347 self
._notify
(notify_type
, tag
)
349 self
.log
.error(str(traceback
.format_exc()))
352 def _notify(self
, notify_type
, tag
):
353 if notify_type
== "command":
354 # we can safely skip all the sequential commands
359 lambda x
: x
.is_running(tag
),
362 if len(request
) != 1:
363 self
.log
.warn("Unknown request '%s'" % str(tag
))
368 if request
.is_ready():
371 self
.log
.debug("Unhandled notification type '%s'" % notify_type
)
374 def create_self_signed_cert(self
):
377 pkey
.generate_key(crypto
.TYPE_RSA
, 2048)
379 # create a self-signed cert
381 cert
.get_subject().O
= "IT"
382 cert
.get_subject().CN
= "ceph-restful"
383 cert
.set_serial_number(int(uuid4()))
384 cert
.gmtime_adj_notBefore(0)
385 cert
.gmtime_adj_notAfter(10*365*24*60*60)
386 cert
.set_issuer(cert
.get_subject())
387 cert
.set_pubkey(pkey
)
388 cert
.sign(pkey
, 'sha512')
391 crypto
.dump_certificate(crypto
.FILETYPE_PEM
, cert
),
392 crypto
.dump_privatekey(crypto
.FILETYPE_PEM
, pkey
)
396 def handle_command(self
, command
):
397 self
.log
.warn("Handling command: '%s'" % str(command
))
398 if command
['prefix'] == "restful create-key":
399 if command
['key_name'] in self
.keys
:
400 return 0, self
.keys
[command
['key_name']], ""
404 self
.keys
[command
['key_name']] = key
405 self
.set_config('keys/' + command
['key_name'], key
)
409 self
.keys
[command
['key_name']],
413 elif command
['prefix'] == "restful delete-key":
414 if command
['key_name'] in self
.keys
:
415 del self
.keys
[command
['key_name']]
416 self
.set_config('keys/' + command
['key_name'], None)
424 elif command
['prefix'] == "restful list-keys":
428 json
.dumps(self
.keys
, indent
=2),
432 elif command
['prefix'] == "restful create-self-signed-cert":
433 cert
, pkey
= self
.create_self_signed_cert()
435 self
.set_config(self
.get_mgr_id() + '/crt', cert
)
436 self
.set_config(self
.get_mgr_id() + '/key', pkey
)
441 "Restarting RESTful API server...",
445 elif command
['prefix'] == 'restful restart':
449 "Restarting RESTful API server...",
457 "Command not found '{0}'".format(command
['prefix'])
461 def get_doc_api(self
, root
, prefix
=''):
463 for _obj
in dir(root
):
464 obj
= getattr(root
, _obj
)
466 if isinstance(obj
, RestController
):
467 doc
.update(self
.get_doc_api(obj
, prefix
+ '/' + _obj
))
469 if getattr(root
, '_lookup', None) and isinstance(root
._lookup
('0')[0], RestController
):
470 doc
.update(self
.get_doc_api(root
._lookup
('0')[0], prefix
+ '/<arg>'))
472 prefix
= prefix
or '/'
475 for method
in 'get', 'post', 'patch', 'delete':
476 if getattr(root
, method
, None):
477 doc
[prefix
][method
.upper()] = inspect
.getdoc(getattr(root
, method
)).split('\n')
479 if len(doc
[prefix
]) == 0:
486 mon_map_mons
= self
.get('mon_map')['mons']
487 mon_status
= json
.loads(self
.get('mon_status')['json'])
489 # Add more information
490 for mon
in mon_map_mons
:
491 mon
['in_quorum'] = mon
['rank'] in mon_status
['quorum']
492 mon
['server'] = self
.get_metadata("mon", mon
['name'])['hostname']
493 mon
['leader'] = mon
['rank'] == mon_status
['quorum'][0]
498 def get_osd_pools(self
):
499 osds
= dict(map(lambda x
: (x
['osd'], []), self
.get('osd_map')['osds']))
500 pools
= dict(map(lambda x
: (x
['pool'], x
), self
.get('osd_map')['pools']))
501 crush_rules
= self
.get('osd_map_crush')['rules']
504 for pool_id
, pool
in pools
.items():
506 for rule
in [r
for r
in crush_rules
if r
['rule_id'] == pool
['crush_rule']]:
507 if rule
['min_size'] <= pool
['size'] <= rule
['max_size']:
508 pool_osds
= common
.crush_rule_osds(self
.get('osd_map_tree')['nodes'], rule
)
510 osds_by_pool
[pool_id
] = pool_osds
512 for pool_id
in pools
.keys():
513 for in_pool_id
in osds_by_pool
[pool_id
]:
514 osds
[in_pool_id
].append(pool_id
)
519 def get_osds(self
, pool_id
=None, ids
=None):
521 osd_map
= self
.get('osd_map')
522 osd_metadata
= self
.get('osd_metadata')
524 # Update the data with the additional info from the osd map
525 osds
= osd_map
['osds']
530 lambda x
: str(x
['osd']) in ids
,
534 # Get list of pools per osd node
535 pools_map
= self
.get_osd_pools()
537 # map osd IDs to reweight
538 reweight_map
= dict([
539 (x
.get('id'), x
.get('reweight', None))
540 for x
in self
.get('osd_map_tree')['nodes']
543 # Build OSD data objects
545 osd
['pools'] = pools_map
[osd
['osd']]
546 osd
['server'] = osd_metadata
.get(str(osd
['osd']), {}).get('hostname', None)
548 osd
['reweight'] = reweight_map
.get(osd
['osd'], 0.0)
551 osd
['valid_commands'] = common
.OSD_IMPLEMENTED_COMMANDS
553 osd
['valid_commands'] = []
557 pool_id
= int(pool_id
)
559 lambda x
: pool_id
in x
['pools'],
566 def get_osd_by_id(self
, osd_id
):
568 lambda x
: x
['osd'] == osd_id
,
569 self
.get('osd_map')['osds']
578 def get_pool_by_id(self
, pool_id
):
580 lambda x
: x
['pool'] == pool_id
,
581 self
.get('osd_map')['pools'],
590 def submit_request(self
, _request
, **kwargs
):
591 request
= CommandsRequest(_request
)
592 with self
.requests_lock
:
593 self
.requests
.append(request
)
594 if kwargs
.get('wait', 0):
595 while not request
.is_finished():
600 def run_command(self
, command
):
601 # tag with 'seq' so that we can ingore these in notify function
602 result
= CommandResult('seq')
604 self
.send_command(result
, 'mon', '', json
.dumps(command
), 'seq')