]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/restful/module.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / pybind / mgr / restful / module.py
CommitLineData
31f18b77
FG
1"""
2A RESTful API for Ceph
3"""
11fdf7f2 4from __future__ import absolute_import
31f18b77
FG
5
6import os
7import json
8import time
9import errno
10import inspect
11import tempfile
12import threading
13import traceback
1adf2230 14import six
3efd9988 15import socket
9f95a23c 16import fcntl
31f18b77 17
11fdf7f2
TL
18from . import common
19from . import context
31f18b77
FG
20
21from uuid import uuid4
22from pecan import jsonify, make_app
23from OpenSSL import crypto
24from pecan.rest import RestController
11fdf7f2 25from six import iteritems
31f18b77
FG
26from werkzeug.serving import make_server, make_ssl_devcert
27
11fdf7f2 28from .hooks import ErrorHook
31f18b77
FG
29from mgr_module import MgrModule, CommandResult
30
1adf2230 31
3efd9988
FG
32class CannotServe(Exception):
33 pass
34
31f18b77
FG
35
36class CommandsRequest(object):
37 """
38 This class handles parallel as well as sequential execution of
39 commands. The class accept a list of iterables that should be
40 executed sequentially. Each iterable can contain several commands
41 that can be executed in parallel.
42
43 Example:
44 [[c1,c2],[c3,c4]]
45 - run c1 and c2 in parallel
46 - wait for them to finish
47 - run c3 and c4 in parallel
48 - wait for them to finish
49 """
50
51
52 def __init__(self, commands_arrays):
53 self.id = str(id(self))
54
55 # Filter out empty sub-requests
1adf2230
AA
56 commands_arrays = [x for x in commands_arrays
57 if len(x) != 0]
31f18b77
FG
58
59 self.running = []
60 self.waiting = commands_arrays[1:]
61 self.finished = []
62 self.failed = []
63
64 self.lock = threading.RLock()
65 if not len(commands_arrays):
66 # Nothing to run
67 return
68
69 # Process first iteration of commands_arrays in parallel
70 results = self.run(commands_arrays[0])
71
72 self.running.extend(results)
73
74
75 def run(self, commands):
76 """
77 A static method that will execute the given list of commands in
78 parallel and will return the list of command results.
79 """
80
81 # Gather the results (in parallel)
82 results = []
81eedcae 83 for index, command in enumerate(commands):
11fdf7f2 84 tag = '%s:%s:%d' % (__name__, self.id, index)
31f18b77
FG
85
86 # Store the result
87 result = CommandResult(tag)
81eedcae 88 result.command = common.humanify_command(command)
31f18b77
FG
89 results.append(result)
90
91 # Run the command
81eedcae 92 context.instance.send_command(result, 'mon', '', json.dumps(command), tag)
31f18b77
FG
93
94 return results
95
96
97 def next(self):
98 with self.lock:
99 if not self.waiting:
100 # Nothing to run
101 return
102
103 # Run a next iteration of commands
104 commands = self.waiting[0]
105 self.waiting = self.waiting[1:]
106
107 self.running.extend(self.run(commands))
108
109
110 def finish(self, tag):
111 with self.lock:
112 for index in range(len(self.running)):
113 if self.running[index].tag == tag:
114 if self.running[index].r == 0:
115 self.finished.append(self.running.pop(index))
116 else:
117 self.failed.append(self.running.pop(index))
118 return True
119
120 # No such tag found
121 return False
122
123
124 def is_running(self, tag):
125 for result in self.running:
126 if result.tag == tag:
127 return True
128 return False
129
130
131 def is_ready(self):
132 with self.lock:
133 return not self.running and self.waiting
134
135
136 def is_waiting(self):
137 return bool(self.waiting)
138
139
140 def is_finished(self):
141 with self.lock:
142 return not self.running and not self.waiting
143
144
145 def has_failed(self):
146 return bool(self.failed)
147
148
149 def get_state(self):
150 with self.lock:
151 if not self.is_finished():
152 return "pending"
153
154 if self.has_failed():
155 return "failed"
156
157 return "success"
158
159
160 def __json__(self):
161 return {
162 'id': self.id,
9f95a23c
TL
163 'running': [
164 {
31f18b77
FG
165 'command': x.command,
166 'outs': x.outs,
167 'outb': x.outb,
9f95a23c
TL
168 } for x in self.running
169 ],
170 'finished': [
171 {
31f18b77
FG
172 'command': x.command,
173 'outs': x.outs,
174 'outb': x.outb,
9f95a23c
TL
175 } for x in self.finished
176 ],
177 'waiting': [
178 [common.humanify_command(y) for y in x]
179 for x in self.waiting
180 ],
181 'failed': [
182 {
31f18b77
FG
183 'command': x.command,
184 'outs': x.outs,
185 'outb': x.outb,
9f95a23c
TL
186 } for x in self.failed
187 ],
31f18b77
FG
188 'is_waiting': self.is_waiting(),
189 'is_finished': self.is_finished(),
190 'has_failed': self.has_failed(),
191 'state': self.get_state(),
192 }
193
194
195
196class Module(MgrModule):
11fdf7f2
TL
197 MODULE_OPTIONS = [
198 {'name': 'server_addr'},
199 {'name': 'server_port'},
200 {'name': 'key_file'},
201 ]
202
31f18b77
FG
203 COMMANDS = [
204 {
205 "cmd": "restful create-key name=key_name,type=CephString",
206 "desc": "Create an API key with this name",
207 "perm": "rw"
208 },
209 {
210 "cmd": "restful delete-key name=key_name,type=CephString",
211 "desc": "Delete an API key with this name",
212 "perm": "rw"
213 },
214 {
215 "cmd": "restful list-keys",
216 "desc": "List all API keys",
a8e16298 217 "perm": "r"
31f18b77
FG
218 },
219 {
220 "cmd": "restful create-self-signed-cert",
221 "desc": "Create localized self signed certificate",
222 "perm": "rw"
223 },
224 {
225 "cmd": "restful restart",
226 "desc": "Restart API server",
227 "perm": "rw"
228 },
229 ]
230
231 def __init__(self, *args, **kwargs):
232 super(Module, self).__init__(*args, **kwargs)
11fdf7f2 233 context.instance = self
31f18b77
FG
234
235 self.requests = []
236 self.requests_lock = threading.RLock()
237
238 self.keys = {}
239 self.disable_auth = False
240
241 self.server = None
242
243 self.stop_server = False
244 self.serve_event = threading.Event()
245
246
247 def serve(self):
9f95a23c 248 self.log.debug('serve enter')
31f18b77
FG
249 while not self.stop_server:
250 try:
251 self._serve()
252 self.server.socket.close()
3efd9988 253 except CannotServe as cs:
1adf2230 254 self.log.warn("server not running: %s", cs)
31f18b77
FG
255 except:
256 self.log.error(str(traceback.format_exc()))
257
258 # Wait and clear the threading event
259 self.serve_event.wait()
260 self.serve_event.clear()
9f95a23c 261 self.log.debug('serve exit')
31f18b77 262
31f18b77
FG
263 def refresh_keys(self):
264 self.keys = {}
11fdf7f2 265 rawkeys = self.get_store_prefix('keys/') or {}
1adf2230 266 for k, v in six.iteritems(rawkeys):
31f18b77
FG
267 self.keys[k[5:]] = v # strip of keys/ prefix
268
269 def _serve(self):
270 # Load stored authentication keys
271 self.refresh_keys()
272
273 jsonify._instance = jsonify.GenericJSON(
274 sort_keys=True,
275 indent=4,
276 separators=(',', ': '),
277 )
278
11fdf7f2 279 server_addr = self.get_localized_module_option('server_addr', '::')
31f18b77 280 if server_addr is None:
3efd9988
FG
281 raise CannotServe('no server_addr configured; try "ceph config-key set mgr/restful/server_addr <ip>"')
282
11fdf7f2 283 server_port = int(self.get_localized_module_option('server_port', '8003'))
31f18b77
FG
284 self.log.info('server_addr: %s server_port: %d',
285 server_addr, server_port)
286
11fdf7f2 287 cert = self.get_localized_store("crt")
31f18b77
FG
288 if cert is not None:
289 cert_tmp = tempfile.NamedTemporaryFile()
1adf2230 290 cert_tmp.write(cert.encode('utf-8'))
31f18b77
FG
291 cert_tmp.flush()
292 cert_fname = cert_tmp.name
293 else:
11fdf7f2 294 cert_fname = self.get_localized_store('crt_file')
31f18b77 295
11fdf7f2 296 pkey = self.get_localized_store("key")
31f18b77
FG
297 if pkey is not None:
298 pkey_tmp = tempfile.NamedTemporaryFile()
1adf2230 299 pkey_tmp.write(pkey.encode('utf-8'))
31f18b77
FG
300 pkey_tmp.flush()
301 pkey_fname = pkey_tmp.name
302 else:
11fdf7f2 303 pkey_fname = self.get_localized_module_option('key_file')
31f18b77
FG
304
305 if not cert_fname or not pkey_fname:
3efd9988 306 raise CannotServe('no certificate configured')
31f18b77 307 if not os.path.isfile(cert_fname):
3efd9988 308 raise CannotServe('certificate %s does not exist' % cert_fname)
31f18b77 309 if not os.path.isfile(pkey_fname):
3efd9988
FG
310 raise CannotServe('private key %s does not exist' % pkey_fname)
311
312 # Publish the URI that others may use to access the service we're
313 # about to start serving
314 self.set_uri("https://{0}:{1}/".format(
315 socket.gethostname() if server_addr == "::" else server_addr,
316 server_port
317 ))
31f18b77
FG
318
319 # Create the HTTPS werkzeug server serving pecan app
320 self.server = make_server(
321 host=server_addr,
322 port=server_port,
323 app=make_app(
324 root='restful.api.Root',
325 hooks = [ErrorHook()], # use a callable if pecan >= 0.3.2
326 ),
327 ssl_context=(cert_fname, pkey_fname),
328 )
9f95a23c
TL
329 sock_fd_flag = fcntl.fcntl(self.server.socket.fileno(), fcntl.F_GETFD)
330 if not (sock_fd_flag & fcntl.FD_CLOEXEC):
331 self.log.debug("set server socket close-on-exec")
332 fcntl.fcntl(self.server.socket.fileno(), fcntl.F_SETFD, sock_fd_flag | fcntl.FD_CLOEXEC)
333 if self.stop_server:
334 self.log.debug('made server, but stop flag set')
335 else:
336 self.log.debug('made server, serving forever')
337 self.server.serve_forever()
31f18b77
FG
338
339
340 def shutdown(self):
9f95a23c 341 self.log.debug('shutdown enter')
31f18b77
FG
342 try:
343 self.stop_server = True
344 if self.server:
9f95a23c 345 self.log.debug('calling server.shutdown')
31f18b77 346 self.server.shutdown()
9f95a23c 347 self.log.debug('called server.shutdown')
31f18b77
FG
348 self.serve_event.set()
349 except:
350 self.log.error(str(traceback.format_exc()))
351 raise
9f95a23c 352 self.log.debug('shutdown exit')
31f18b77
FG
353
354
355 def restart(self):
356 try:
357 if self.server:
358 self.server.shutdown()
359 self.serve_event.set()
360 except:
361 self.log.error(str(traceback.format_exc()))
362
363
364 def notify(self, notify_type, tag):
365 try:
366 self._notify(notify_type, tag)
367 except:
368 self.log.error(str(traceback.format_exc()))
369
370
371 def _notify(self, notify_type, tag):
11fdf7f2
TL
372 if notify_type != "command":
373 self.log.debug("Unhandled notification type '%s'", notify_type)
374 return
375 # we can safely skip all the sequential commands
376 if tag == 'seq':
377 return
378 try:
f64942e4 379 with self.requests_lock:
11fdf7f2 380 request = next(x for x in self.requests if x.is_running(tag))
31f18b77
FG
381 request.finish(tag)
382 if request.is_ready():
383 request.next()
11fdf7f2
TL
384 except StopIteration:
385 # the command was not issued by me
386 pass
31f18b77
FG
387
388
389 def create_self_signed_cert(self):
390 # create a key pair
391 pkey = crypto.PKey()
392 pkey.generate_key(crypto.TYPE_RSA, 2048)
393
394 # create a self-signed cert
395 cert = crypto.X509()
396 cert.get_subject().O = "IT"
397 cert.get_subject().CN = "ceph-restful"
398 cert.set_serial_number(int(uuid4()))
399 cert.gmtime_adj_notBefore(0)
400 cert.gmtime_adj_notAfter(10*365*24*60*60)
401 cert.set_issuer(cert.get_subject())
402 cert.set_pubkey(pkey)
403 cert.sign(pkey, 'sha512')
404
405 return (
406 crypto.dump_certificate(crypto.FILETYPE_PEM, cert),
407 crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
408 )
409
410
11fdf7f2 411 def handle_command(self, inbuf, command):
31f18b77
FG
412 self.log.warn("Handling command: '%s'" % str(command))
413 if command['prefix'] == "restful create-key":
414 if command['key_name'] in self.keys:
415 return 0, self.keys[command['key_name']], ""
416
417 else:
418 key = str(uuid4())
419 self.keys[command['key_name']] = key
11fdf7f2 420 self.set_store('keys/' + command['key_name'], key)
31f18b77
FG
421
422 return (
423 0,
424 self.keys[command['key_name']],
425 "",
426 )
427
428 elif command['prefix'] == "restful delete-key":
429 if command['key_name'] in self.keys:
430 del self.keys[command['key_name']]
11fdf7f2 431 self.set_store('keys/' + command['key_name'], None)
31f18b77
FG
432
433 return (
434 0,
435 "",
436 "",
437 )
438
439 elif command['prefix'] == "restful list-keys":
440 self.refresh_keys()
441 return (
442 0,
9f95a23c 443 json.dumps(self.keys, indent=4, sort_keys=True),
31f18b77
FG
444 "",
445 )
446
447 elif command['prefix'] == "restful create-self-signed-cert":
448 cert, pkey = self.create_self_signed_cert()
11fdf7f2
TL
449 self.set_store(self.get_mgr_id() + '/crt', cert.decode('utf-8'))
450 self.set_store(self.get_mgr_id() + '/key', pkey.decode('utf-8'))
31f18b77
FG
451
452 self.restart()
453 return (
454 0,
455 "Restarting RESTful API server...",
456 ""
457 )
458
459 elif command['prefix'] == 'restful restart':
460 self.restart();
461 return (
462 0,
463 "Restarting RESTful API server...",
464 ""
465 )
466
467 else:
468 return (
469 -errno.EINVAL,
470 "",
471 "Command not found '{0}'".format(command['prefix'])
472 )
473
474
475 def get_doc_api(self, root, prefix=''):
476 doc = {}
477 for _obj in dir(root):
478 obj = getattr(root, _obj)
479
480 if isinstance(obj, RestController):
481 doc.update(self.get_doc_api(obj, prefix + '/' + _obj))
482
483 if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController):
484 doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/<arg>'))
485
486 prefix = prefix or '/'
487
488 doc[prefix] = {}
489 for method in 'get', 'post', 'patch', 'delete':
490 if getattr(root, method, None):
491 doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n')
492
493 if len(doc[prefix]) == 0:
494 del doc[prefix]
495
496 return doc
497
498
499 def get_mons(self):
500 mon_map_mons = self.get('mon_map')['mons']
501 mon_status = json.loads(self.get('mon_status')['json'])
502
503 # Add more information
504 for mon in mon_map_mons:
505 mon['in_quorum'] = mon['rank'] in mon_status['quorum']
506 mon['server'] = self.get_metadata("mon", mon['name'])['hostname']
507 mon['leader'] = mon['rank'] == mon_status['quorum'][0]
508
509 return mon_map_mons
510
511
512 def get_osd_pools(self):
513 osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds']))
514 pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools']))
a8e16298
TL
515 crush = self.get('osd_map_crush')
516 crush_rules = crush['rules']
31f18b77
FG
517
518 osds_by_pool = {}
519 for pool_id, pool in pools.items():
520 pool_osds = None
521 for rule in [r for r in crush_rules if r['rule_id'] == pool['crush_rule']]:
522 if rule['min_size'] <= pool['size'] <= rule['max_size']:
a8e16298 523 pool_osds = common.crush_rule_osds(crush['buckets'], rule)
31f18b77
FG
524
525 osds_by_pool[pool_id] = pool_osds
526
527 for pool_id in pools.keys():
528 for in_pool_id in osds_by_pool[pool_id]:
529 osds[in_pool_id].append(pool_id)
530
531 return osds
532
533
534 def get_osds(self, pool_id=None, ids=None):
535 # Get data
536 osd_map = self.get('osd_map')
537 osd_metadata = self.get('osd_metadata')
538
539 # Update the data with the additional info from the osd map
540 osds = osd_map['osds']
541
542 # Filter by osd ids
543 if ids is not None:
1adf2230 544 osds = [x for x in osds if str(x['osd']) in ids]
31f18b77
FG
545
546 # Get list of pools per osd node
547 pools_map = self.get_osd_pools()
548
549 # map osd IDs to reweight
550 reweight_map = dict([
551 (x.get('id'), x.get('reweight', None))
552 for x in self.get('osd_map_tree')['nodes']
553 ])
554
555 # Build OSD data objects
556 for osd in osds:
557 osd['pools'] = pools_map[osd['osd']]
558 osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None)
559
560 osd['reweight'] = reweight_map.get(osd['osd'], 0.0)
561
562 if osd['up']:
563 osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS
564 else:
565 osd['valid_commands'] = []
566
567 # Filter by pool
568 if pool_id:
569 pool_id = int(pool_id)
1adf2230 570 osds = [x for x in osds if pool_id in x['pools']]
31f18b77
FG
571
572 return osds
573
574
575 def get_osd_by_id(self, osd_id):
1adf2230
AA
576 osd = [x for x in self.get('osd_map')['osds']
577 if x['osd'] == osd_id]
31f18b77
FG
578
579 if len(osd) != 1:
580 return None
581
582 return osd[0]
583
584
585 def get_pool_by_id(self, pool_id):
1adf2230
AA
586 pool = [x for x in self.get('osd_map')['pools']
587 if x['pool'] == pool_id]
31f18b77
FG
588
589 if len(pool) != 1:
590 return None
591
592 return pool[0]
593
594
595 def submit_request(self, _request, **kwargs):
31f18b77 596 with self.requests_lock:
f64942e4 597 request = CommandsRequest(_request)
31f18b77
FG
598 self.requests.append(request)
599 if kwargs.get('wait', 0):
600 while not request.is_finished():
601 time.sleep(0.001)
602 return request
603
604
605 def run_command(self, command):
11fdf7f2 606 # tag with 'seq' so that we can ignore these in notify function
31f18b77
FG
607 result = CommandResult('seq')
608
609 self.send_command(result, 'mon', '', json.dumps(command), 'seq')
610 return result.wait()