]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/restful/module.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / restful / module.py
CommitLineData
31f18b77
FG
1"""
2A RESTful API for Ceph
3"""
4
5import os
6import json
7import time
8import errno
9import inspect
10import tempfile
11import threading
12import traceback
3efd9988 13import socket
9f95a23c 14import fcntl
31f18b77 15
11fdf7f2
TL
16from . import common
17from . import context
31f18b77
FG
18
19from uuid import uuid4
20from pecan import jsonify, make_app
21from OpenSSL import crypto
22from pecan.rest import RestController
23from werkzeug.serving import make_server, make_ssl_devcert
24
11fdf7f2 25from .hooks import ErrorHook
20effc67 26from mgr_module import MgrModule, CommandResult, NotifyType
522d829b 27from mgr_util import build_url
31f18b77 28
1adf2230 29
3efd9988
FG
30class CannotServe(Exception):
31 pass
32
31f18b77
FG
33
34class CommandsRequest(object):
35 """
36 This class handles parallel as well as sequential execution of
37 commands. The class accept a list of iterables that should be
38 executed sequentially. Each iterable can contain several commands
39 that can be executed in parallel.
40
41 Example:
42 [[c1,c2],[c3,c4]]
43 - run c1 and c2 in parallel
44 - wait for them to finish
45 - run c3 and c4 in parallel
46 - wait for them to finish
47 """
48
49
50 def __init__(self, commands_arrays):
51 self.id = str(id(self))
52
53 # Filter out empty sub-requests
1adf2230
AA
54 commands_arrays = [x for x in commands_arrays
55 if len(x) != 0]
31f18b77
FG
56
57 self.running = []
58 self.waiting = commands_arrays[1:]
59 self.finished = []
60 self.failed = []
61
62 self.lock = threading.RLock()
63 if not len(commands_arrays):
64 # Nothing to run
65 return
66
67 # Process first iteration of commands_arrays in parallel
68 results = self.run(commands_arrays[0])
69
70 self.running.extend(results)
71
72
73 def run(self, commands):
74 """
75 A static method that will execute the given list of commands in
76 parallel and will return the list of command results.
77 """
78
79 # Gather the results (in parallel)
80 results = []
81eedcae 81 for index, command in enumerate(commands):
11fdf7f2 82 tag = '%s:%s:%d' % (__name__, self.id, index)
31f18b77
FG
83
84 # Store the result
85 result = CommandResult(tag)
81eedcae 86 result.command = common.humanify_command(command)
31f18b77
FG
87 results.append(result)
88
89 # Run the command
81eedcae 90 context.instance.send_command(result, 'mon', '', json.dumps(command), tag)
31f18b77
FG
91
92 return results
93
94
95 def next(self):
96 with self.lock:
97 if not self.waiting:
98 # Nothing to run
99 return
100
101 # Run a next iteration of commands
102 commands = self.waiting[0]
103 self.waiting = self.waiting[1:]
104
105 self.running.extend(self.run(commands))
106
107
108 def finish(self, tag):
109 with self.lock:
110 for index in range(len(self.running)):
111 if self.running[index].tag == tag:
112 if self.running[index].r == 0:
113 self.finished.append(self.running.pop(index))
114 else:
115 self.failed.append(self.running.pop(index))
116 return True
117
118 # No such tag found
119 return False
120
121
122 def is_running(self, tag):
123 for result in self.running:
124 if result.tag == tag:
125 return True
126 return False
127
128
129 def is_ready(self):
130 with self.lock:
131 return not self.running and self.waiting
132
133
134 def is_waiting(self):
135 return bool(self.waiting)
136
137
138 def is_finished(self):
139 with self.lock:
140 return not self.running and not self.waiting
141
142
143 def has_failed(self):
144 return bool(self.failed)
145
146
147 def get_state(self):
148 with self.lock:
149 if not self.is_finished():
150 return "pending"
151
152 if self.has_failed():
153 return "failed"
154
155 return "success"
156
157
158 def __json__(self):
159 return {
160 'id': self.id,
9f95a23c
TL
161 'running': [
162 {
31f18b77
FG
163 'command': x.command,
164 'outs': x.outs,
165 'outb': x.outb,
9f95a23c
TL
166 } for x in self.running
167 ],
168 'finished': [
169 {
31f18b77
FG
170 'command': x.command,
171 'outs': x.outs,
172 'outb': x.outb,
9f95a23c
TL
173 } for x in self.finished
174 ],
175 'waiting': [
176 [common.humanify_command(y) for y in x]
177 for x in self.waiting
178 ],
179 'failed': [
180 {
31f18b77
FG
181 'command': x.command,
182 'outs': x.outs,
183 'outb': x.outb,
9f95a23c
TL
184 } for x in self.failed
185 ],
31f18b77
FG
186 'is_waiting': self.is_waiting(),
187 'is_finished': self.is_finished(),
188 'has_failed': self.has_failed(),
189 'state': self.get_state(),
190 }
191
192
193
194class Module(MgrModule):
11fdf7f2
TL
195 MODULE_OPTIONS = [
196 {'name': 'server_addr'},
197 {'name': 'server_port'},
198 {'name': 'key_file'},
f67539c2 199 {'name': 'enable_auth', 'type': 'bool', 'default': True},
11fdf7f2
TL
200 ]
201
31f18b77
FG
202 COMMANDS = [
203 {
204 "cmd": "restful create-key name=key_name,type=CephString",
205 "desc": "Create an API key with this name",
206 "perm": "rw"
207 },
208 {
209 "cmd": "restful delete-key name=key_name,type=CephString",
210 "desc": "Delete an API key with this name",
211 "perm": "rw"
212 },
213 {
214 "cmd": "restful list-keys",
215 "desc": "List all API keys",
a8e16298 216 "perm": "r"
31f18b77
FG
217 },
218 {
219 "cmd": "restful create-self-signed-cert",
220 "desc": "Create localized self signed certificate",
221 "perm": "rw"
222 },
223 {
224 "cmd": "restful restart",
225 "desc": "Restart API server",
226 "perm": "rw"
227 },
228 ]
229
20effc67
TL
230 NOTIFY_TYPES = [NotifyType.command]
231
31f18b77
FG
232 def __init__(self, *args, **kwargs):
233 super(Module, self).__init__(*args, **kwargs)
11fdf7f2 234 context.instance = self
31f18b77
FG
235
236 self.requests = []
237 self.requests_lock = threading.RLock()
238
239 self.keys = {}
f67539c2 240 self.enable_auth = True
31f18b77
FG
241
242 self.server = None
243
244 self.stop_server = False
245 self.serve_event = threading.Event()
246
247
248 def serve(self):
9f95a23c 249 self.log.debug('serve enter')
31f18b77
FG
250 while not self.stop_server:
251 try:
252 self._serve()
253 self.server.socket.close()
3efd9988 254 except CannotServe as cs:
e306af50 255 self.log.warning("server not running: %s", cs)
31f18b77
FG
256 except:
257 self.log.error(str(traceback.format_exc()))
258
259 # Wait and clear the threading event
260 self.serve_event.wait()
261 self.serve_event.clear()
9f95a23c 262 self.log.debug('serve exit')
31f18b77 263
31f18b77
FG
264 def refresh_keys(self):
265 self.keys = {}
11fdf7f2 266 rawkeys = self.get_store_prefix('keys/') or {}
f67539c2 267 for k, v in rawkeys.items():
31f18b77
FG
268 self.keys[k[5:]] = v # strip of keys/ prefix
269
270 def _serve(self):
271 # Load stored authentication keys
272 self.refresh_keys()
273
274 jsonify._instance = jsonify.GenericJSON(
275 sort_keys=True,
276 indent=4,
277 separators=(',', ': '),
278 )
279
11fdf7f2 280 server_addr = self.get_localized_module_option('server_addr', '::')
31f18b77 281 if server_addr is None:
3efd9988
FG
282 raise CannotServe('no server_addr configured; try "ceph config-key set mgr/restful/server_addr <ip>"')
283
11fdf7f2 284 server_port = int(self.get_localized_module_option('server_port', '8003'))
31f18b77
FG
285 self.log.info('server_addr: %s server_port: %d',
286 server_addr, server_port)
287
11fdf7f2 288 cert = self.get_localized_store("crt")
31f18b77
FG
289 if cert is not None:
290 cert_tmp = tempfile.NamedTemporaryFile()
1adf2230 291 cert_tmp.write(cert.encode('utf-8'))
31f18b77
FG
292 cert_tmp.flush()
293 cert_fname = cert_tmp.name
294 else:
11fdf7f2 295 cert_fname = self.get_localized_store('crt_file')
31f18b77 296
11fdf7f2 297 pkey = self.get_localized_store("key")
31f18b77
FG
298 if pkey is not None:
299 pkey_tmp = tempfile.NamedTemporaryFile()
1adf2230 300 pkey_tmp.write(pkey.encode('utf-8'))
31f18b77
FG
301 pkey_tmp.flush()
302 pkey_fname = pkey_tmp.name
303 else:
11fdf7f2 304 pkey_fname = self.get_localized_module_option('key_file')
31f18b77 305
f67539c2
TL
306 self.enable_auth = self.get_localized_module_option('enable_auth', True)
307
31f18b77 308 if not cert_fname or not pkey_fname:
3efd9988 309 raise CannotServe('no certificate configured')
31f18b77 310 if not os.path.isfile(cert_fname):
3efd9988 311 raise CannotServe('certificate %s does not exist' % cert_fname)
31f18b77 312 if not os.path.isfile(pkey_fname):
3efd9988
FG
313 raise CannotServe('private key %s does not exist' % pkey_fname)
314
315 # Publish the URI that others may use to access the service we're
316 # about to start serving
522d829b
TL
317 addr = self.get_mgr_ip() if server_addr == "::" else server_addr
318 self.set_uri(build_url(scheme='https', host=addr, port=server_port, path='/'))
31f18b77
FG
319
320 # Create the HTTPS werkzeug server serving pecan app
321 self.server = make_server(
322 host=server_addr,
323 port=server_port,
324 app=make_app(
325 root='restful.api.Root',
326 hooks = [ErrorHook()], # use a callable if pecan >= 0.3.2
327 ),
328 ssl_context=(cert_fname, pkey_fname),
329 )
9f95a23c
TL
330 sock_fd_flag = fcntl.fcntl(self.server.socket.fileno(), fcntl.F_GETFD)
331 if not (sock_fd_flag & fcntl.FD_CLOEXEC):
332 self.log.debug("set server socket close-on-exec")
333 fcntl.fcntl(self.server.socket.fileno(), fcntl.F_SETFD, sock_fd_flag | fcntl.FD_CLOEXEC)
334 if self.stop_server:
335 self.log.debug('made server, but stop flag set')
336 else:
337 self.log.debug('made server, serving forever')
338 self.server.serve_forever()
31f18b77
FG
339
340
341 def shutdown(self):
9f95a23c 342 self.log.debug('shutdown enter')
31f18b77
FG
343 try:
344 self.stop_server = True
345 if self.server:
9f95a23c 346 self.log.debug('calling server.shutdown')
31f18b77 347 self.server.shutdown()
9f95a23c 348 self.log.debug('called server.shutdown')
31f18b77
FG
349 self.serve_event.set()
350 except:
351 self.log.error(str(traceback.format_exc()))
352 raise
9f95a23c 353 self.log.debug('shutdown exit')
31f18b77
FG
354
355
356 def restart(self):
357 try:
358 if self.server:
359 self.server.shutdown()
360 self.serve_event.set()
361 except:
362 self.log.error(str(traceback.format_exc()))
363
364
20effc67 365 def notify(self, notify_type: NotifyType, tag: str):
31f18b77
FG
366 try:
367 self._notify(notify_type, tag)
368 except:
369 self.log.error(str(traceback.format_exc()))
370
371
20effc67
TL
372 def _notify(self, notify_type: NotifyType, tag):
373 if notify_type != NotifyType.command:
11fdf7f2
TL
374 self.log.debug("Unhandled notification type '%s'", notify_type)
375 return
376 # we can safely skip all the sequential commands
377 if tag == 'seq':
378 return
379 try:
f64942e4 380 with self.requests_lock:
11fdf7f2 381 request = next(x for x in self.requests if x.is_running(tag))
31f18b77
FG
382 request.finish(tag)
383 if request.is_ready():
384 request.next()
11fdf7f2
TL
385 except StopIteration:
386 # the command was not issued by me
387 pass
31f18b77 388
f67539c2
TL
389 def config_notify(self):
390 self.enable_auth = self.get_localized_module_option('enable_auth', True)
391
31f18b77
FG
392
393 def create_self_signed_cert(self):
394 # create a key pair
395 pkey = crypto.PKey()
396 pkey.generate_key(crypto.TYPE_RSA, 2048)
397
398 # create a self-signed cert
399 cert = crypto.X509()
400 cert.get_subject().O = "IT"
401 cert.get_subject().CN = "ceph-restful"
402 cert.set_serial_number(int(uuid4()))
403 cert.gmtime_adj_notBefore(0)
404 cert.gmtime_adj_notAfter(10*365*24*60*60)
405 cert.set_issuer(cert.get_subject())
406 cert.set_pubkey(pkey)
407 cert.sign(pkey, 'sha512')
408
409 return (
410 crypto.dump_certificate(crypto.FILETYPE_PEM, cert),
411 crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
412 )
413
414
11fdf7f2 415 def handle_command(self, inbuf, command):
e306af50 416 self.log.warning("Handling command: '%s'" % str(command))
31f18b77
FG
417 if command['prefix'] == "restful create-key":
418 if command['key_name'] in self.keys:
419 return 0, self.keys[command['key_name']], ""
420
421 else:
422 key = str(uuid4())
423 self.keys[command['key_name']] = key
11fdf7f2 424 self.set_store('keys/' + command['key_name'], key)
31f18b77
FG
425
426 return (
427 0,
428 self.keys[command['key_name']],
429 "",
430 )
431
432 elif command['prefix'] == "restful delete-key":
433 if command['key_name'] in self.keys:
434 del self.keys[command['key_name']]
11fdf7f2 435 self.set_store('keys/' + command['key_name'], None)
31f18b77
FG
436
437 return (
438 0,
439 "",
440 "",
441 )
442
443 elif command['prefix'] == "restful list-keys":
444 self.refresh_keys()
445 return (
446 0,
9f95a23c 447 json.dumps(self.keys, indent=4, sort_keys=True),
31f18b77
FG
448 "",
449 )
450
451 elif command['prefix'] == "restful create-self-signed-cert":
452 cert, pkey = self.create_self_signed_cert()
11fdf7f2
TL
453 self.set_store(self.get_mgr_id() + '/crt', cert.decode('utf-8'))
454 self.set_store(self.get_mgr_id() + '/key', pkey.decode('utf-8'))
31f18b77
FG
455
456 self.restart()
457 return (
458 0,
459 "Restarting RESTful API server...",
460 ""
461 )
462
463 elif command['prefix'] == 'restful restart':
464 self.restart();
465 return (
466 0,
467 "Restarting RESTful API server...",
468 ""
469 )
470
471 else:
472 return (
473 -errno.EINVAL,
474 "",
475 "Command not found '{0}'".format(command['prefix'])
476 )
477
478
479 def get_doc_api(self, root, prefix=''):
480 doc = {}
481 for _obj in dir(root):
482 obj = getattr(root, _obj)
483
484 if isinstance(obj, RestController):
485 doc.update(self.get_doc_api(obj, prefix + '/' + _obj))
486
487 if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController):
488 doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/<arg>'))
489
490 prefix = prefix or '/'
491
492 doc[prefix] = {}
493 for method in 'get', 'post', 'patch', 'delete':
494 if getattr(root, method, None):
495 doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n')
496
497 if len(doc[prefix]) == 0:
498 del doc[prefix]
499
500 return doc
501
502
503 def get_mons(self):
504 mon_map_mons = self.get('mon_map')['mons']
505 mon_status = json.loads(self.get('mon_status')['json'])
506
507 # Add more information
508 for mon in mon_map_mons:
509 mon['in_quorum'] = mon['rank'] in mon_status['quorum']
510 mon['server'] = self.get_metadata("mon", mon['name'])['hostname']
511 mon['leader'] = mon['rank'] == mon_status['quorum'][0]
512
513 return mon_map_mons
514
515
516 def get_osd_pools(self):
517 osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds']))
518 pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools']))
a8e16298
TL
519 crush = self.get('osd_map_crush')
520 crush_rules = crush['rules']
31f18b77
FG
521
522 osds_by_pool = {}
523 for pool_id, pool in pools.items():
524 pool_osds = None
525 for rule in [r for r in crush_rules if r['rule_id'] == pool['crush_rule']]:
20effc67 526 pool_osds = common.crush_rule_osds(crush['buckets'], rule)
31f18b77
FG
527
528 osds_by_pool[pool_id] = pool_osds
529
530 for pool_id in pools.keys():
531 for in_pool_id in osds_by_pool[pool_id]:
532 osds[in_pool_id].append(pool_id)
533
534 return osds
535
536
537 def get_osds(self, pool_id=None, ids=None):
538 # Get data
539 osd_map = self.get('osd_map')
540 osd_metadata = self.get('osd_metadata')
541
542 # Update the data with the additional info from the osd map
543 osds = osd_map['osds']
544
545 # Filter by osd ids
546 if ids is not None:
1adf2230 547 osds = [x for x in osds if str(x['osd']) in ids]
31f18b77
FG
548
549 # Get list of pools per osd node
550 pools_map = self.get_osd_pools()
551
552 # map osd IDs to reweight
553 reweight_map = dict([
554 (x.get('id'), x.get('reweight', None))
555 for x in self.get('osd_map_tree')['nodes']
556 ])
557
558 # Build OSD data objects
559 for osd in osds:
560 osd['pools'] = pools_map[osd['osd']]
561 osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None)
562
563 osd['reweight'] = reweight_map.get(osd['osd'], 0.0)
564
565 if osd['up']:
566 osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS
567 else:
568 osd['valid_commands'] = []
569
570 # Filter by pool
571 if pool_id:
572 pool_id = int(pool_id)
1adf2230 573 osds = [x for x in osds if pool_id in x['pools']]
31f18b77
FG
574
575 return osds
576
577
578 def get_osd_by_id(self, osd_id):
1adf2230
AA
579 osd = [x for x in self.get('osd_map')['osds']
580 if x['osd'] == osd_id]
31f18b77
FG
581
582 if len(osd) != 1:
583 return None
584
585 return osd[0]
586
587
588 def get_pool_by_id(self, pool_id):
1adf2230
AA
589 pool = [x for x in self.get('osd_map')['pools']
590 if x['pool'] == pool_id]
31f18b77
FG
591
592 if len(pool) != 1:
593 return None
594
595 return pool[0]
596
597
598 def submit_request(self, _request, **kwargs):
31f18b77 599 with self.requests_lock:
f64942e4 600 request = CommandsRequest(_request)
31f18b77
FG
601 self.requests.append(request)
602 if kwargs.get('wait', 0):
603 while not request.is_finished():
604 time.sleep(0.001)
605 return request
606
607
608 def run_command(self, command):
11fdf7f2 609 # tag with 'seq' so that we can ignore these in notify function
31f18b77
FG
610 result = CommandResult('seq')
611
612 self.send_command(result, 'mon', '', json.dumps(command), 'seq')
613 return result.wait()