]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/restful/module.py
508bb7f5274fc1d49aa936ff70509010bceaf0ac
[ceph.git] / ceph / src / pybind / mgr / restful / module.py
1 """
2 A RESTful API for Ceph
3 """
4
5 import os
6 import json
7 import time
8 import errno
9 import inspect
10 import tempfile
11 import threading
12 import traceback
13
14 import common
15
16 from uuid import uuid4
17 from pecan import jsonify, make_app
18 from OpenSSL import crypto
19 from pecan.rest import RestController
20 from werkzeug.serving import make_server, make_ssl_devcert
21
22 from hooks import ErrorHook
23 from mgr_module import MgrModule, CommandResult
24
25 # Global instance to share
26 instance = None
27
28
29
30 class CommandsRequest(object):
31 """
32 This class handles parallel as well as sequential execution of
33 commands. The class accept a list of iterables that should be
34 executed sequentially. Each iterable can contain several commands
35 that can be executed in parallel.
36
37 Example:
38 [[c1,c2],[c3,c4]]
39 - run c1 and c2 in parallel
40 - wait for them to finish
41 - run c3 and c4 in parallel
42 - wait for them to finish
43 """
44
45
46 def __init__(self, commands_arrays):
47 self.id = str(id(self))
48
49 # Filter out empty sub-requests
50 commands_arrays = filter(
51 lambda x: len(x) != 0,
52 commands_arrays,
53 )
54
55 self.running = []
56 self.waiting = commands_arrays[1:]
57 self.finished = []
58 self.failed = []
59
60 self.lock = threading.RLock()
61 if not len(commands_arrays):
62 # Nothing to run
63 return
64
65 # Process first iteration of commands_arrays in parallel
66 results = self.run(commands_arrays[0])
67
68 self.running.extend(results)
69
70
71 def run(self, commands):
72 """
73 A static method that will execute the given list of commands in
74 parallel and will return the list of command results.
75 """
76
77 # Gather the results (in parallel)
78 results = []
79 for index in range(len(commands)):
80 tag = '%s:%d' % (str(self.id), index)
81
82 # Store the result
83 result = CommandResult(tag)
84 result.command = common.humanify_command(commands[index])
85 results.append(result)
86
87 # Run the command
88 instance.send_command(result, 'mon', '', json.dumps(commands[index]), tag)
89
90 return results
91
92
93 def next(self):
94 with self.lock:
95 if not self.waiting:
96 # Nothing to run
97 return
98
99 # Run a next iteration of commands
100 commands = self.waiting[0]
101 self.waiting = self.waiting[1:]
102
103 self.running.extend(self.run(commands))
104
105
106 def finish(self, tag):
107 with self.lock:
108 for index in range(len(self.running)):
109 if self.running[index].tag == tag:
110 if self.running[index].r == 0:
111 self.finished.append(self.running.pop(index))
112 else:
113 self.failed.append(self.running.pop(index))
114 return True
115
116 # No such tag found
117 return False
118
119
120 def is_running(self, tag):
121 for result in self.running:
122 if result.tag == tag:
123 return True
124 return False
125
126
127 def is_ready(self):
128 with self.lock:
129 return not self.running and self.waiting
130
131
132 def is_waiting(self):
133 return bool(self.waiting)
134
135
136 def is_finished(self):
137 with self.lock:
138 return not self.running and not self.waiting
139
140
141 def has_failed(self):
142 return bool(self.failed)
143
144
145 def get_state(self):
146 with self.lock:
147 if not self.is_finished():
148 return "pending"
149
150 if self.has_failed():
151 return "failed"
152
153 return "success"
154
155
156 def __json__(self):
157 return {
158 'id': self.id,
159 'running': map(
160 lambda x: {
161 'command': x.command,
162 'outs': x.outs,
163 'outb': x.outb,
164 },
165 self.running
166 ),
167 'finished': map(
168 lambda x: {
169 'command': x.command,
170 'outs': x.outs,
171 'outb': x.outb,
172 },
173 self.finished
174 ),
175 'waiting': map(
176 lambda x: {
177 'command': x.command,
178 'outs': x.outs,
179 'outb': x.outb,
180 },
181 self.waiting
182 ),
183 'failed': map(
184 lambda x: {
185 'command': x.command,
186 'outs': x.outs,
187 'outb': x.outb,
188 },
189 self.failed
190 ),
191 'is_waiting': self.is_waiting(),
192 'is_finished': self.is_finished(),
193 'has_failed': self.has_failed(),
194 'state': self.get_state(),
195 }
196
197
198
199 class Module(MgrModule):
200 COMMANDS = [
201 {
202 "cmd": "restful create-key name=key_name,type=CephString",
203 "desc": "Create an API key with this name",
204 "perm": "rw"
205 },
206 {
207 "cmd": "restful delete-key name=key_name,type=CephString",
208 "desc": "Delete an API key with this name",
209 "perm": "rw"
210 },
211 {
212 "cmd": "restful list-keys",
213 "desc": "List all API keys",
214 "perm": "rw"
215 },
216 {
217 "cmd": "restful create-self-signed-cert",
218 "desc": "Create localized self signed certificate",
219 "perm": "rw"
220 },
221 {
222 "cmd": "restful restart",
223 "desc": "Restart API server",
224 "perm": "rw"
225 },
226 ]
227
228 def __init__(self, *args, **kwargs):
229 super(Module, self).__init__(*args, **kwargs)
230 global instance
231 instance = self
232
233 self.requests = []
234 self.requests_lock = threading.RLock()
235
236 self.keys = {}
237 self.disable_auth = False
238
239 self.server = None
240
241 self.stop_server = False
242 self.serve_event = threading.Event()
243
244
245 def serve(self):
246 while not self.stop_server:
247 try:
248 self._serve()
249 self.server.socket.close()
250 except:
251 self.log.error(str(traceback.format_exc()))
252
253 # Wait and clear the threading event
254 self.serve_event.wait()
255 self.serve_event.clear()
256
257 def get_localized_config(self, key):
258 r = self.get_config(self.get_mgr_id() + '/' + key)
259 if r is None:
260 r = self.get_config(key)
261 return r
262
263 def refresh_keys(self):
264 self.keys = {}
265 rawkeys = self.get_config_prefix('keys/') or {}
266 for k, v in rawkeys.iteritems():
267 self.keys[k[5:]] = v # strip of keys/ prefix
268
269 def _serve(self):
270 # Load stored authentication keys
271 self.refresh_keys()
272
273 jsonify._instance = jsonify.GenericJSON(
274 sort_keys=True,
275 indent=4,
276 separators=(',', ': '),
277 )
278
279 server_addr = self.get_localized_config('server_addr')
280 if server_addr is None:
281 raise RuntimeError('no server_addr configured; try "ceph config-key put mgr/restful/server_addr <ip>"')
282 server_port = int(self.get_localized_config('server_port') or '8003')
283 self.log.info('server_addr: %s server_port: %d',
284 server_addr, server_port)
285
286 cert = self.get_localized_config("crt")
287 if cert is not None:
288 cert_tmp = tempfile.NamedTemporaryFile()
289 cert_tmp.write(cert)
290 cert_tmp.flush()
291 cert_fname = cert_tmp.name
292 else:
293 cert_fname = self.get_localized_config('crt_file')
294
295 pkey = self.get_localized_config("key")
296 if pkey is not None:
297 pkey_tmp = tempfile.NamedTemporaryFile()
298 pkey_tmp.write(pkey)
299 pkey_tmp.flush()
300 pkey_fname = pkey_tmp.name
301 else:
302 pkey_fname = self.get_localized_config('key_file')
303
304 if not cert_fname or not pkey_fname:
305 raise RuntimeError('no certificate configured')
306 if not os.path.isfile(cert_fname):
307 raise RuntimeError('certificate %s does not exist' % cert_fname)
308 if not os.path.isfile(pkey_fname):
309 raise RuntimeError('private key %s does not exist' % pkey_fname)
310
311 # Create the HTTPS werkzeug server serving pecan app
312 self.server = make_server(
313 host=server_addr,
314 port=server_port,
315 app=make_app(
316 root='restful.api.Root',
317 hooks = [ErrorHook()], # use a callable if pecan >= 0.3.2
318 ),
319 ssl_context=(cert_fname, pkey_fname),
320 )
321
322 self.server.serve_forever()
323
324
325 def shutdown(self):
326 try:
327 self.stop_server = True
328 if self.server:
329 self.server.shutdown()
330 self.serve_event.set()
331 except:
332 self.log.error(str(traceback.format_exc()))
333 raise
334
335
336 def restart(self):
337 try:
338 if self.server:
339 self.server.shutdown()
340 self.serve_event.set()
341 except:
342 self.log.error(str(traceback.format_exc()))
343
344
345 def notify(self, notify_type, tag):
346 try:
347 self._notify(notify_type, tag)
348 except:
349 self.log.error(str(traceback.format_exc()))
350
351
352 def _notify(self, notify_type, tag):
353 if notify_type == "command":
354 # we can safely skip all the sequential commands
355 if tag == 'seq':
356 return
357
358 request = filter(
359 lambda x: x.is_running(tag),
360 self.requests)
361
362 if len(request) != 1:
363 self.log.warn("Unknown request '%s'" % str(tag))
364 return
365
366 request = request[0]
367 request.finish(tag)
368 if request.is_ready():
369 request.next()
370 else:
371 self.log.debug("Unhandled notification type '%s'" % notify_type)
372
373
374 def create_self_signed_cert(self):
375 # create a key pair
376 pkey = crypto.PKey()
377 pkey.generate_key(crypto.TYPE_RSA, 2048)
378
379 # create a self-signed cert
380 cert = crypto.X509()
381 cert.get_subject().O = "IT"
382 cert.get_subject().CN = "ceph-restful"
383 cert.set_serial_number(int(uuid4()))
384 cert.gmtime_adj_notBefore(0)
385 cert.gmtime_adj_notAfter(10*365*24*60*60)
386 cert.set_issuer(cert.get_subject())
387 cert.set_pubkey(pkey)
388 cert.sign(pkey, 'sha512')
389
390 return (
391 crypto.dump_certificate(crypto.FILETYPE_PEM, cert),
392 crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
393 )
394
395
396 def handle_command(self, command):
397 self.log.warn("Handling command: '%s'" % str(command))
398 if command['prefix'] == "restful create-key":
399 if command['key_name'] in self.keys:
400 return 0, self.keys[command['key_name']], ""
401
402 else:
403 key = str(uuid4())
404 self.keys[command['key_name']] = key
405 self.set_config('keys/' + command['key_name'], key)
406
407 return (
408 0,
409 self.keys[command['key_name']],
410 "",
411 )
412
413 elif command['prefix'] == "restful delete-key":
414 if command['key_name'] in self.keys:
415 del self.keys[command['key_name']]
416 self.set_config('keys/' + command['key_name'], None)
417
418 return (
419 0,
420 "",
421 "",
422 )
423
424 elif command['prefix'] == "restful list-keys":
425 self.refresh_keys()
426 return (
427 0,
428 json.dumps(self.keys, indent=2),
429 "",
430 )
431
432 elif command['prefix'] == "restful create-self-signed-cert":
433 cert, pkey = self.create_self_signed_cert()
434
435 self.set_config(self.get_mgr_id() + '/crt', cert)
436 self.set_config(self.get_mgr_id() + '/key', pkey)
437
438 self.restart()
439 return (
440 0,
441 "Restarting RESTful API server...",
442 ""
443 )
444
445 elif command['prefix'] == 'restful restart':
446 self.restart();
447 return (
448 0,
449 "Restarting RESTful API server...",
450 ""
451 )
452
453 else:
454 return (
455 -errno.EINVAL,
456 "",
457 "Command not found '{0}'".format(command['prefix'])
458 )
459
460
461 def get_doc_api(self, root, prefix=''):
462 doc = {}
463 for _obj in dir(root):
464 obj = getattr(root, _obj)
465
466 if isinstance(obj, RestController):
467 doc.update(self.get_doc_api(obj, prefix + '/' + _obj))
468
469 if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController):
470 doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/<arg>'))
471
472 prefix = prefix or '/'
473
474 doc[prefix] = {}
475 for method in 'get', 'post', 'patch', 'delete':
476 if getattr(root, method, None):
477 doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n')
478
479 if len(doc[prefix]) == 0:
480 del doc[prefix]
481
482 return doc
483
484
485 def get_mons(self):
486 mon_map_mons = self.get('mon_map')['mons']
487 mon_status = json.loads(self.get('mon_status')['json'])
488
489 # Add more information
490 for mon in mon_map_mons:
491 mon['in_quorum'] = mon['rank'] in mon_status['quorum']
492 mon['server'] = self.get_metadata("mon", mon['name'])['hostname']
493 mon['leader'] = mon['rank'] == mon_status['quorum'][0]
494
495 return mon_map_mons
496
497
498 def get_osd_pools(self):
499 osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds']))
500 pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools']))
501 crush_rules = self.get('osd_map_crush')['rules']
502
503 osds_by_pool = {}
504 for pool_id, pool in pools.items():
505 pool_osds = None
506 for rule in [r for r in crush_rules if r['rule_id'] == pool['crush_rule']]:
507 if rule['min_size'] <= pool['size'] <= rule['max_size']:
508 pool_osds = common.crush_rule_osds(self.get('osd_map_tree')['nodes'], rule)
509
510 osds_by_pool[pool_id] = pool_osds
511
512 for pool_id in pools.keys():
513 for in_pool_id in osds_by_pool[pool_id]:
514 osds[in_pool_id].append(pool_id)
515
516 return osds
517
518
519 def get_osds(self, pool_id=None, ids=None):
520 # Get data
521 osd_map = self.get('osd_map')
522 osd_metadata = self.get('osd_metadata')
523
524 # Update the data with the additional info from the osd map
525 osds = osd_map['osds']
526
527 # Filter by osd ids
528 if ids is not None:
529 osds = filter(
530 lambda x: str(x['osd']) in ids,
531 osds
532 )
533
534 # Get list of pools per osd node
535 pools_map = self.get_osd_pools()
536
537 # map osd IDs to reweight
538 reweight_map = dict([
539 (x.get('id'), x.get('reweight', None))
540 for x in self.get('osd_map_tree')['nodes']
541 ])
542
543 # Build OSD data objects
544 for osd in osds:
545 osd['pools'] = pools_map[osd['osd']]
546 osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None)
547
548 osd['reweight'] = reweight_map.get(osd['osd'], 0.0)
549
550 if osd['up']:
551 osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS
552 else:
553 osd['valid_commands'] = []
554
555 # Filter by pool
556 if pool_id:
557 pool_id = int(pool_id)
558 osds = filter(
559 lambda x: pool_id in x['pools'],
560 osds
561 )
562
563 return osds
564
565
566 def get_osd_by_id(self, osd_id):
567 osd = filter(
568 lambda x: x['osd'] == osd_id,
569 self.get('osd_map')['osds']
570 )
571
572 if len(osd) != 1:
573 return None
574
575 return osd[0]
576
577
578 def get_pool_by_id(self, pool_id):
579 pool = filter(
580 lambda x: x['pool'] == pool_id,
581 self.get('osd_map')['pools'],
582 )
583
584 if len(pool) != 1:
585 return None
586
587 return pool[0]
588
589
590 def submit_request(self, _request, **kwargs):
591 request = CommandsRequest(_request)
592 with self.requests_lock:
593 self.requests.append(request)
594 if kwargs.get('wait', 0):
595 while not request.is_finished():
596 time.sleep(0.001)
597 return request
598
599
600 def run_command(self, command):
601 # tag with 'seq' so that we can ingore these in notify function
602 result = CommandResult('seq')
603
604 self.send_command(result, 'mon', '', json.dumps(command), 'seq')
605 return result.wait()