]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/restful/module.py
update download target update for octopus release
[ceph.git] / ceph / src / pybind / mgr / restful / module.py
CommitLineData
31f18b77
FG
1"""
2A RESTful API for Ceph
3"""
11fdf7f2 4from __future__ import absolute_import
31f18b77
FG
5
6import os
7import json
8import time
9import errno
10import inspect
11import tempfile
12import threading
13import traceback
1adf2230 14import six
3efd9988 15import socket
31f18b77 16
11fdf7f2
TL
17from . import common
18from . import context
31f18b77
FG
19
20from uuid import uuid4
21from pecan import jsonify, make_app
22from OpenSSL import crypto
23from pecan.rest import RestController
11fdf7f2 24from six import iteritems
31f18b77
FG
25from werkzeug.serving import make_server, make_ssl_devcert
26
11fdf7f2 27from .hooks import ErrorHook
31f18b77
FG
28from mgr_module import MgrModule, CommandResult
29
1adf2230 30
3efd9988
FG
31class CannotServe(Exception):
32 pass
33
31f18b77
FG
34
35class CommandsRequest(object):
36 """
37 This class handles parallel as well as sequential execution of
38 commands. The class accept a list of iterables that should be
39 executed sequentially. Each iterable can contain several commands
40 that can be executed in parallel.
41
42 Example:
43 [[c1,c2],[c3,c4]]
44 - run c1 and c2 in parallel
45 - wait for them to finish
46 - run c3 and c4 in parallel
47 - wait for them to finish
48 """
49
50
51 def __init__(self, commands_arrays):
52 self.id = str(id(self))
53
54 # Filter out empty sub-requests
1adf2230
AA
55 commands_arrays = [x for x in commands_arrays
56 if len(x) != 0]
31f18b77
FG
57
58 self.running = []
59 self.waiting = commands_arrays[1:]
60 self.finished = []
61 self.failed = []
62
63 self.lock = threading.RLock()
64 if not len(commands_arrays):
65 # Nothing to run
66 return
67
68 # Process first iteration of commands_arrays in parallel
69 results = self.run(commands_arrays[0])
70
71 self.running.extend(results)
72
73
74 def run(self, commands):
75 """
76 A static method that will execute the given list of commands in
77 parallel and will return the list of command results.
78 """
79
80 # Gather the results (in parallel)
81 results = []
81eedcae 82 for index, command in enumerate(commands):
11fdf7f2 83 tag = '%s:%s:%d' % (__name__, self.id, index)
31f18b77
FG
84
85 # Store the result
86 result = CommandResult(tag)
81eedcae 87 result.command = common.humanify_command(command)
31f18b77
FG
88 results.append(result)
89
90 # Run the command
81eedcae 91 context.instance.send_command(result, 'mon', '', json.dumps(command), tag)
31f18b77
FG
92
93 return results
94
95
96 def next(self):
97 with self.lock:
98 if not self.waiting:
99 # Nothing to run
100 return
101
102 # Run a next iteration of commands
103 commands = self.waiting[0]
104 self.waiting = self.waiting[1:]
105
106 self.running.extend(self.run(commands))
107
108
109 def finish(self, tag):
110 with self.lock:
111 for index in range(len(self.running)):
112 if self.running[index].tag == tag:
113 if self.running[index].r == 0:
114 self.finished.append(self.running.pop(index))
115 else:
116 self.failed.append(self.running.pop(index))
117 return True
118
119 # No such tag found
120 return False
121
122
123 def is_running(self, tag):
124 for result in self.running:
125 if result.tag == tag:
126 return True
127 return False
128
129
130 def is_ready(self):
131 with self.lock:
132 return not self.running and self.waiting
133
134
135 def is_waiting(self):
136 return bool(self.waiting)
137
138
139 def is_finished(self):
140 with self.lock:
141 return not self.running and not self.waiting
142
143
144 def has_failed(self):
145 return bool(self.failed)
146
147
148 def get_state(self):
149 with self.lock:
150 if not self.is_finished():
151 return "pending"
152
153 if self.has_failed():
154 return "failed"
155
156 return "success"
157
158
159 def __json__(self):
160 return {
161 'id': self.id,
162 'running': map(
163 lambda x: {
164 'command': x.command,
165 'outs': x.outs,
166 'outb': x.outb,
167 },
168 self.running
169 ),
170 'finished': map(
171 lambda x: {
172 'command': x.command,
173 'outs': x.outs,
174 'outb': x.outb,
175 },
176 self.finished
177 ),
178 'waiting': map(
28e407b8
AA
179 lambda x: map(
180 lambda y: common.humanify_command(y),
181 x
182 ),
31f18b77
FG
183 self.waiting
184 ),
185 'failed': map(
186 lambda x: {
187 'command': x.command,
188 'outs': x.outs,
189 'outb': x.outb,
190 },
191 self.failed
192 ),
193 'is_waiting': self.is_waiting(),
194 'is_finished': self.is_finished(),
195 'has_failed': self.has_failed(),
196 'state': self.get_state(),
197 }
198
199
200
201class Module(MgrModule):
11fdf7f2
TL
202 MODULE_OPTIONS = [
203 {'name': 'server_addr'},
204 {'name': 'server_port'},
205 {'name': 'key_file'},
206 ]
207
31f18b77
FG
208 COMMANDS = [
209 {
210 "cmd": "restful create-key name=key_name,type=CephString",
211 "desc": "Create an API key with this name",
212 "perm": "rw"
213 },
214 {
215 "cmd": "restful delete-key name=key_name,type=CephString",
216 "desc": "Delete an API key with this name",
217 "perm": "rw"
218 },
219 {
220 "cmd": "restful list-keys",
221 "desc": "List all API keys",
a8e16298 222 "perm": "r"
31f18b77
FG
223 },
224 {
225 "cmd": "restful create-self-signed-cert",
226 "desc": "Create localized self signed certificate",
227 "perm": "rw"
228 },
229 {
230 "cmd": "restful restart",
231 "desc": "Restart API server",
232 "perm": "rw"
233 },
234 ]
235
236 def __init__(self, *args, **kwargs):
237 super(Module, self).__init__(*args, **kwargs)
11fdf7f2 238 context.instance = self
31f18b77
FG
239
240 self.requests = []
241 self.requests_lock = threading.RLock()
242
243 self.keys = {}
244 self.disable_auth = False
245
246 self.server = None
247
248 self.stop_server = False
249 self.serve_event = threading.Event()
250
251
252 def serve(self):
253 while not self.stop_server:
254 try:
255 self._serve()
256 self.server.socket.close()
3efd9988 257 except CannotServe as cs:
1adf2230 258 self.log.warn("server not running: %s", cs)
31f18b77
FG
259 except:
260 self.log.error(str(traceback.format_exc()))
261
262 # Wait and clear the threading event
263 self.serve_event.wait()
264 self.serve_event.clear()
265
31f18b77
FG
266 def refresh_keys(self):
267 self.keys = {}
11fdf7f2 268 rawkeys = self.get_store_prefix('keys/') or {}
1adf2230 269 for k, v in six.iteritems(rawkeys):
31f18b77
FG
270 self.keys[k[5:]] = v # strip of keys/ prefix
271
272 def _serve(self):
273 # Load stored authentication keys
274 self.refresh_keys()
275
276 jsonify._instance = jsonify.GenericJSON(
277 sort_keys=True,
278 indent=4,
279 separators=(',', ': '),
280 )
281
11fdf7f2 282 server_addr = self.get_localized_module_option('server_addr', '::')
31f18b77 283 if server_addr is None:
3efd9988
FG
284 raise CannotServe('no server_addr configured; try "ceph config-key set mgr/restful/server_addr <ip>"')
285
11fdf7f2 286 server_port = int(self.get_localized_module_option('server_port', '8003'))
31f18b77
FG
287 self.log.info('server_addr: %s server_port: %d',
288 server_addr, server_port)
289
11fdf7f2 290 cert = self.get_localized_store("crt")
31f18b77
FG
291 if cert is not None:
292 cert_tmp = tempfile.NamedTemporaryFile()
1adf2230 293 cert_tmp.write(cert.encode('utf-8'))
31f18b77
FG
294 cert_tmp.flush()
295 cert_fname = cert_tmp.name
296 else:
11fdf7f2 297 cert_fname = self.get_localized_store('crt_file')
31f18b77 298
11fdf7f2 299 pkey = self.get_localized_store("key")
31f18b77
FG
300 if pkey is not None:
301 pkey_tmp = tempfile.NamedTemporaryFile()
1adf2230 302 pkey_tmp.write(pkey.encode('utf-8'))
31f18b77
FG
303 pkey_tmp.flush()
304 pkey_fname = pkey_tmp.name
305 else:
11fdf7f2 306 pkey_fname = self.get_localized_module_option('key_file')
31f18b77
FG
307
308 if not cert_fname or not pkey_fname:
3efd9988 309 raise CannotServe('no certificate configured')
31f18b77 310 if not os.path.isfile(cert_fname):
3efd9988 311 raise CannotServe('certificate %s does not exist' % cert_fname)
31f18b77 312 if not os.path.isfile(pkey_fname):
3efd9988
FG
313 raise CannotServe('private key %s does not exist' % pkey_fname)
314
315 # Publish the URI that others may use to access the service we're
316 # about to start serving
317 self.set_uri("https://{0}:{1}/".format(
318 socket.gethostname() if server_addr == "::" else server_addr,
319 server_port
320 ))
31f18b77
FG
321
322 # Create the HTTPS werkzeug server serving pecan app
323 self.server = make_server(
324 host=server_addr,
325 port=server_port,
326 app=make_app(
327 root='restful.api.Root',
328 hooks = [ErrorHook()], # use a callable if pecan >= 0.3.2
329 ),
330 ssl_context=(cert_fname, pkey_fname),
331 )
332
333 self.server.serve_forever()
334
335
336 def shutdown(self):
337 try:
338 self.stop_server = True
339 if self.server:
340 self.server.shutdown()
341 self.serve_event.set()
342 except:
343 self.log.error(str(traceback.format_exc()))
344 raise
345
346
347 def restart(self):
348 try:
349 if self.server:
350 self.server.shutdown()
351 self.serve_event.set()
352 except:
353 self.log.error(str(traceback.format_exc()))
354
355
356 def notify(self, notify_type, tag):
357 try:
358 self._notify(notify_type, tag)
359 except:
360 self.log.error(str(traceback.format_exc()))
361
362
363 def _notify(self, notify_type, tag):
11fdf7f2
TL
364 if notify_type != "command":
365 self.log.debug("Unhandled notification type '%s'", notify_type)
366 return
367 # we can safely skip all the sequential commands
368 if tag == 'seq':
369 return
370 try:
f64942e4 371 with self.requests_lock:
11fdf7f2 372 request = next(x for x in self.requests if x.is_running(tag))
31f18b77
FG
373 request.finish(tag)
374 if request.is_ready():
375 request.next()
11fdf7f2
TL
376 except StopIteration:
377 # the command was not issued by me
378 pass
31f18b77
FG
379
380
381 def create_self_signed_cert(self):
382 # create a key pair
383 pkey = crypto.PKey()
384 pkey.generate_key(crypto.TYPE_RSA, 2048)
385
386 # create a self-signed cert
387 cert = crypto.X509()
388 cert.get_subject().O = "IT"
389 cert.get_subject().CN = "ceph-restful"
390 cert.set_serial_number(int(uuid4()))
391 cert.gmtime_adj_notBefore(0)
392 cert.gmtime_adj_notAfter(10*365*24*60*60)
393 cert.set_issuer(cert.get_subject())
394 cert.set_pubkey(pkey)
395 cert.sign(pkey, 'sha512')
396
397 return (
398 crypto.dump_certificate(crypto.FILETYPE_PEM, cert),
399 crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
400 )
401
402
11fdf7f2 403 def handle_command(self, inbuf, command):
31f18b77
FG
404 self.log.warn("Handling command: '%s'" % str(command))
405 if command['prefix'] == "restful create-key":
406 if command['key_name'] in self.keys:
407 return 0, self.keys[command['key_name']], ""
408
409 else:
410 key = str(uuid4())
411 self.keys[command['key_name']] = key
11fdf7f2 412 self.set_store('keys/' + command['key_name'], key)
31f18b77
FG
413
414 return (
415 0,
416 self.keys[command['key_name']],
417 "",
418 )
419
420 elif command['prefix'] == "restful delete-key":
421 if command['key_name'] in self.keys:
422 del self.keys[command['key_name']]
11fdf7f2 423 self.set_store('keys/' + command['key_name'], None)
31f18b77
FG
424
425 return (
426 0,
427 "",
428 "",
429 )
430
431 elif command['prefix'] == "restful list-keys":
432 self.refresh_keys()
433 return (
434 0,
435 json.dumps(self.keys, indent=2),
436 "",
437 )
438
439 elif command['prefix'] == "restful create-self-signed-cert":
440 cert, pkey = self.create_self_signed_cert()
11fdf7f2
TL
441 self.set_store(self.get_mgr_id() + '/crt', cert.decode('utf-8'))
442 self.set_store(self.get_mgr_id() + '/key', pkey.decode('utf-8'))
31f18b77
FG
443
444 self.restart()
445 return (
446 0,
447 "Restarting RESTful API server...",
448 ""
449 )
450
451 elif command['prefix'] == 'restful restart':
452 self.restart();
453 return (
454 0,
455 "Restarting RESTful API server...",
456 ""
457 )
458
459 else:
460 return (
461 -errno.EINVAL,
462 "",
463 "Command not found '{0}'".format(command['prefix'])
464 )
465
466
467 def get_doc_api(self, root, prefix=''):
468 doc = {}
469 for _obj in dir(root):
470 obj = getattr(root, _obj)
471
472 if isinstance(obj, RestController):
473 doc.update(self.get_doc_api(obj, prefix + '/' + _obj))
474
475 if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController):
476 doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/<arg>'))
477
478 prefix = prefix or '/'
479
480 doc[prefix] = {}
481 for method in 'get', 'post', 'patch', 'delete':
482 if getattr(root, method, None):
483 doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n')
484
485 if len(doc[prefix]) == 0:
486 del doc[prefix]
487
488 return doc
489
490
491 def get_mons(self):
492 mon_map_mons = self.get('mon_map')['mons']
493 mon_status = json.loads(self.get('mon_status')['json'])
494
495 # Add more information
496 for mon in mon_map_mons:
497 mon['in_quorum'] = mon['rank'] in mon_status['quorum']
498 mon['server'] = self.get_metadata("mon", mon['name'])['hostname']
499 mon['leader'] = mon['rank'] == mon_status['quorum'][0]
500
501 return mon_map_mons
502
503
504 def get_osd_pools(self):
505 osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds']))
506 pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools']))
a8e16298
TL
507 crush = self.get('osd_map_crush')
508 crush_rules = crush['rules']
31f18b77
FG
509
510 osds_by_pool = {}
511 for pool_id, pool in pools.items():
512 pool_osds = None
513 for rule in [r for r in crush_rules if r['rule_id'] == pool['crush_rule']]:
514 if rule['min_size'] <= pool['size'] <= rule['max_size']:
a8e16298 515 pool_osds = common.crush_rule_osds(crush['buckets'], rule)
31f18b77
FG
516
517 osds_by_pool[pool_id] = pool_osds
518
519 for pool_id in pools.keys():
520 for in_pool_id in osds_by_pool[pool_id]:
521 osds[in_pool_id].append(pool_id)
522
523 return osds
524
525
526 def get_osds(self, pool_id=None, ids=None):
527 # Get data
528 osd_map = self.get('osd_map')
529 osd_metadata = self.get('osd_metadata')
530
531 # Update the data with the additional info from the osd map
532 osds = osd_map['osds']
533
534 # Filter by osd ids
535 if ids is not None:
1adf2230 536 osds = [x for x in osds if str(x['osd']) in ids]
31f18b77
FG
537
538 # Get list of pools per osd node
539 pools_map = self.get_osd_pools()
540
541 # map osd IDs to reweight
542 reweight_map = dict([
543 (x.get('id'), x.get('reweight', None))
544 for x in self.get('osd_map_tree')['nodes']
545 ])
546
547 # Build OSD data objects
548 for osd in osds:
549 osd['pools'] = pools_map[osd['osd']]
550 osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None)
551
552 osd['reweight'] = reweight_map.get(osd['osd'], 0.0)
553
554 if osd['up']:
555 osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS
556 else:
557 osd['valid_commands'] = []
558
559 # Filter by pool
560 if pool_id:
561 pool_id = int(pool_id)
1adf2230 562 osds = [x for x in osds if pool_id in x['pools']]
31f18b77
FG
563
564 return osds
565
566
567 def get_osd_by_id(self, osd_id):
1adf2230
AA
568 osd = [x for x in self.get('osd_map')['osds']
569 if x['osd'] == osd_id]
31f18b77
FG
570
571 if len(osd) != 1:
572 return None
573
574 return osd[0]
575
576
577 def get_pool_by_id(self, pool_id):
1adf2230
AA
578 pool = [x for x in self.get('osd_map')['pools']
579 if x['pool'] == pool_id]
31f18b77
FG
580
581 if len(pool) != 1:
582 return None
583
584 return pool[0]
585
586
587 def submit_request(self, _request, **kwargs):
31f18b77 588 with self.requests_lock:
f64942e4 589 request = CommandsRequest(_request)
31f18b77
FG
590 self.requests.append(request)
591 if kwargs.get('wait', 0):
592 while not request.is_finished():
593 time.sleep(0.001)
594 return request
595
596
597 def run_command(self, command):
11fdf7f2 598 # tag with 'seq' so that we can ignore these in notify function
31f18b77
FG
599 result = CommandResult('seq')
600
601 self.send_command(result, 'mon', '', json.dumps(command), 'seq')
602 return result.wait()