]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/restful/module.py
import ceph 12.2.12
[ceph.git] / ceph / src / pybind / mgr / restful / module.py
CommitLineData
31f18b77
FG
1"""
2A RESTful API for Ceph
3"""
4
5import os
6import json
7import time
8import errno
9import inspect
10import tempfile
11import threading
12import traceback
1adf2230 13import six
3efd9988 14import socket
31f18b77
FG
15
16import common
17
18from uuid import uuid4
19from pecan import jsonify, make_app
20from OpenSSL import crypto
21from pecan.rest import RestController
22from werkzeug.serving import make_server, make_ssl_devcert
23
24from hooks import ErrorHook
25from mgr_module import MgrModule, CommandResult
26
1adf2230 27
31f18b77
FG
28# Global instance to share
29instance = None
30
31
3efd9988
FG
32class CannotServe(Exception):
33 pass
34
31f18b77
FG
35
36class CommandsRequest(object):
37 """
38 This class handles parallel as well as sequential execution of
39 commands. The class accept a list of iterables that should be
40 executed sequentially. Each iterable can contain several commands
41 that can be executed in parallel.
42
43 Example:
44 [[c1,c2],[c3,c4]]
45 - run c1 and c2 in parallel
46 - wait for them to finish
47 - run c3 and c4 in parallel
48 - wait for them to finish
49 """
50
51
52 def __init__(self, commands_arrays):
53 self.id = str(id(self))
54
55 # Filter out empty sub-requests
1adf2230
AA
56 commands_arrays = [x for x in commands_arrays
57 if len(x) != 0]
31f18b77
FG
58
59 self.running = []
60 self.waiting = commands_arrays[1:]
61 self.finished = []
62 self.failed = []
63
64 self.lock = threading.RLock()
65 if not len(commands_arrays):
66 # Nothing to run
67 return
68
69 # Process first iteration of commands_arrays in parallel
70 results = self.run(commands_arrays[0])
71
72 self.running.extend(results)
73
74
75 def run(self, commands):
76 """
77 A static method that will execute the given list of commands in
78 parallel and will return the list of command results.
79 """
80
81 # Gather the results (in parallel)
82 results = []
83 for index in range(len(commands)):
84 tag = '%s:%d' % (str(self.id), index)
85
86 # Store the result
87 result = CommandResult(tag)
88 result.command = common.humanify_command(commands[index])
89 results.append(result)
90
91 # Run the command
92 instance.send_command(result, 'mon', '', json.dumps(commands[index]), tag)
93
94 return results
95
96
97 def next(self):
98 with self.lock:
99 if not self.waiting:
100 # Nothing to run
101 return
102
103 # Run a next iteration of commands
104 commands = self.waiting[0]
105 self.waiting = self.waiting[1:]
106
107 self.running.extend(self.run(commands))
108
109
110 def finish(self, tag):
111 with self.lock:
112 for index in range(len(self.running)):
113 if self.running[index].tag == tag:
114 if self.running[index].r == 0:
115 self.finished.append(self.running.pop(index))
116 else:
117 self.failed.append(self.running.pop(index))
118 return True
119
120 # No such tag found
121 return False
122
123
124 def is_running(self, tag):
125 for result in self.running:
126 if result.tag == tag:
127 return True
128 return False
129
130
131 def is_ready(self):
132 with self.lock:
133 return not self.running and self.waiting
134
135
136 def is_waiting(self):
137 return bool(self.waiting)
138
139
140 def is_finished(self):
141 with self.lock:
142 return not self.running and not self.waiting
143
144
145 def has_failed(self):
146 return bool(self.failed)
147
148
149 def get_state(self):
150 with self.lock:
151 if not self.is_finished():
152 return "pending"
153
154 if self.has_failed():
155 return "failed"
156
157 return "success"
158
159
160 def __json__(self):
161 return {
162 'id': self.id,
163 'running': map(
164 lambda x: {
165 'command': x.command,
166 'outs': x.outs,
167 'outb': x.outb,
168 },
169 self.running
170 ),
171 'finished': map(
172 lambda x: {
173 'command': x.command,
174 'outs': x.outs,
175 'outb': x.outb,
176 },
177 self.finished
178 ),
179 'waiting': map(
28e407b8
AA
180 lambda x: map(
181 lambda y: common.humanify_command(y),
182 x
183 ),
31f18b77
FG
184 self.waiting
185 ),
186 'failed': map(
187 lambda x: {
188 'command': x.command,
189 'outs': x.outs,
190 'outb': x.outb,
191 },
192 self.failed
193 ),
194 'is_waiting': self.is_waiting(),
195 'is_finished': self.is_finished(),
196 'has_failed': self.has_failed(),
197 'state': self.get_state(),
198 }
199
200
201
202class Module(MgrModule):
203 COMMANDS = [
204 {
205 "cmd": "restful create-key name=key_name,type=CephString",
206 "desc": "Create an API key with this name",
207 "perm": "rw"
208 },
209 {
210 "cmd": "restful delete-key name=key_name,type=CephString",
211 "desc": "Delete an API key with this name",
212 "perm": "rw"
213 },
214 {
215 "cmd": "restful list-keys",
216 "desc": "List all API keys",
a8e16298 217 "perm": "r"
31f18b77
FG
218 },
219 {
220 "cmd": "restful create-self-signed-cert",
221 "desc": "Create localized self signed certificate",
222 "perm": "rw"
223 },
224 {
225 "cmd": "restful restart",
226 "desc": "Restart API server",
227 "perm": "rw"
228 },
229 ]
230
231 def __init__(self, *args, **kwargs):
232 super(Module, self).__init__(*args, **kwargs)
233 global instance
234 instance = self
235
236 self.requests = []
237 self.requests_lock = threading.RLock()
238
239 self.keys = {}
240 self.disable_auth = False
241
242 self.server = None
243
244 self.stop_server = False
245 self.serve_event = threading.Event()
246
247
248 def serve(self):
249 while not self.stop_server:
250 try:
251 self._serve()
252 self.server.socket.close()
3efd9988 253 except CannotServe as cs:
1adf2230 254 self.log.warn("server not running: %s", cs)
31f18b77
FG
255 except:
256 self.log.error(str(traceback.format_exc()))
257
258 # Wait and clear the threading event
259 self.serve_event.wait()
260 self.serve_event.clear()
261
31f18b77
FG
262 def refresh_keys(self):
263 self.keys = {}
264 rawkeys = self.get_config_prefix('keys/') or {}
1adf2230 265 for k, v in six.iteritems(rawkeys):
31f18b77
FG
266 self.keys[k[5:]] = v # strip of keys/ prefix
267
268 def _serve(self):
269 # Load stored authentication keys
270 self.refresh_keys()
271
272 jsonify._instance = jsonify.GenericJSON(
273 sort_keys=True,
274 indent=4,
275 separators=(',', ': '),
276 )
277
224ce89b 278 server_addr = self.get_localized_config('server_addr', '::')
31f18b77 279 if server_addr is None:
3efd9988
FG
280 raise CannotServe('no server_addr configured; try "ceph config-key set mgr/restful/server_addr <ip>"')
281
224ce89b 282 server_port = int(self.get_localized_config('server_port', '8003'))
31f18b77
FG
283 self.log.info('server_addr: %s server_port: %d',
284 server_addr, server_port)
285
286 cert = self.get_localized_config("crt")
287 if cert is not None:
288 cert_tmp = tempfile.NamedTemporaryFile()
1adf2230 289 cert_tmp.write(cert.encode('utf-8'))
31f18b77
FG
290 cert_tmp.flush()
291 cert_fname = cert_tmp.name
292 else:
293 cert_fname = self.get_localized_config('crt_file')
294
295 pkey = self.get_localized_config("key")
296 if pkey is not None:
297 pkey_tmp = tempfile.NamedTemporaryFile()
1adf2230 298 pkey_tmp.write(pkey.encode('utf-8'))
31f18b77
FG
299 pkey_tmp.flush()
300 pkey_fname = pkey_tmp.name
301 else:
302 pkey_fname = self.get_localized_config('key_file')
303
304 if not cert_fname or not pkey_fname:
3efd9988 305 raise CannotServe('no certificate configured')
31f18b77 306 if not os.path.isfile(cert_fname):
3efd9988 307 raise CannotServe('certificate %s does not exist' % cert_fname)
31f18b77 308 if not os.path.isfile(pkey_fname):
3efd9988
FG
309 raise CannotServe('private key %s does not exist' % pkey_fname)
310
311 # Publish the URI that others may use to access the service we're
312 # about to start serving
313 self.set_uri("https://{0}:{1}/".format(
314 socket.gethostname() if server_addr == "::" else server_addr,
315 server_port
316 ))
31f18b77
FG
317
318 # Create the HTTPS werkzeug server serving pecan app
319 self.server = make_server(
320 host=server_addr,
321 port=server_port,
322 app=make_app(
323 root='restful.api.Root',
324 hooks = [ErrorHook()], # use a callable if pecan >= 0.3.2
325 ),
326 ssl_context=(cert_fname, pkey_fname),
327 )
328
329 self.server.serve_forever()
330
331
332 def shutdown(self):
333 try:
334 self.stop_server = True
335 if self.server:
336 self.server.shutdown()
337 self.serve_event.set()
338 except:
339 self.log.error(str(traceback.format_exc()))
340 raise
341
342
343 def restart(self):
344 try:
345 if self.server:
346 self.server.shutdown()
347 self.serve_event.set()
348 except:
349 self.log.error(str(traceback.format_exc()))
350
351
352 def notify(self, notify_type, tag):
353 try:
354 self._notify(notify_type, tag)
355 except:
356 self.log.error(str(traceback.format_exc()))
357
358
359 def _notify(self, notify_type, tag):
360 if notify_type == "command":
361 # we can safely skip all the sequential commands
362 if tag == 'seq':
363 return
364
f64942e4
AA
365 with self.requests_lock:
366 request = [x for x in self.requests if x.is_running(tag)]
367
31f18b77
FG
368 if len(request) != 1:
369 self.log.warn("Unknown request '%s'" % str(tag))
370 return
371
372 request = request[0]
373 request.finish(tag)
374 if request.is_ready():
375 request.next()
376 else:
377 self.log.debug("Unhandled notification type '%s'" % notify_type)
378
379
380 def create_self_signed_cert(self):
381 # create a key pair
382 pkey = crypto.PKey()
383 pkey.generate_key(crypto.TYPE_RSA, 2048)
384
385 # create a self-signed cert
386 cert = crypto.X509()
387 cert.get_subject().O = "IT"
388 cert.get_subject().CN = "ceph-restful"
389 cert.set_serial_number(int(uuid4()))
390 cert.gmtime_adj_notBefore(0)
391 cert.gmtime_adj_notAfter(10*365*24*60*60)
392 cert.set_issuer(cert.get_subject())
393 cert.set_pubkey(pkey)
394 cert.sign(pkey, 'sha512')
395
396 return (
397 crypto.dump_certificate(crypto.FILETYPE_PEM, cert),
398 crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
399 )
400
401
402 def handle_command(self, command):
403 self.log.warn("Handling command: '%s'" % str(command))
404 if command['prefix'] == "restful create-key":
405 if command['key_name'] in self.keys:
406 return 0, self.keys[command['key_name']], ""
407
408 else:
409 key = str(uuid4())
410 self.keys[command['key_name']] = key
411 self.set_config('keys/' + command['key_name'], key)
412
413 return (
414 0,
415 self.keys[command['key_name']],
416 "",
417 )
418
419 elif command['prefix'] == "restful delete-key":
420 if command['key_name'] in self.keys:
421 del self.keys[command['key_name']]
422 self.set_config('keys/' + command['key_name'], None)
423
424 return (
425 0,
426 "",
427 "",
428 )
429
430 elif command['prefix'] == "restful list-keys":
431 self.refresh_keys()
432 return (
433 0,
434 json.dumps(self.keys, indent=2),
435 "",
436 )
437
438 elif command['prefix'] == "restful create-self-signed-cert":
439 cert, pkey = self.create_self_signed_cert()
1adf2230
AA
440 self.set_config(self.get_mgr_id() + '/crt', cert.decode('utf-8'))
441 self.set_config(self.get_mgr_id() + '/key', pkey.decode('utf-8'))
31f18b77
FG
442
443 self.restart()
444 return (
445 0,
446 "Restarting RESTful API server...",
447 ""
448 )
449
450 elif command['prefix'] == 'restful restart':
451 self.restart();
452 return (
453 0,
454 "Restarting RESTful API server...",
455 ""
456 )
457
458 else:
459 return (
460 -errno.EINVAL,
461 "",
462 "Command not found '{0}'".format(command['prefix'])
463 )
464
465
466 def get_doc_api(self, root, prefix=''):
467 doc = {}
468 for _obj in dir(root):
469 obj = getattr(root, _obj)
470
471 if isinstance(obj, RestController):
472 doc.update(self.get_doc_api(obj, prefix + '/' + _obj))
473
474 if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController):
475 doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/<arg>'))
476
477 prefix = prefix or '/'
478
479 doc[prefix] = {}
480 for method in 'get', 'post', 'patch', 'delete':
481 if getattr(root, method, None):
482 doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n')
483
484 if len(doc[prefix]) == 0:
485 del doc[prefix]
486
487 return doc
488
489
490 def get_mons(self):
491 mon_map_mons = self.get('mon_map')['mons']
492 mon_status = json.loads(self.get('mon_status')['json'])
493
494 # Add more information
495 for mon in mon_map_mons:
496 mon['in_quorum'] = mon['rank'] in mon_status['quorum']
497 mon['server'] = self.get_metadata("mon", mon['name'])['hostname']
498 mon['leader'] = mon['rank'] == mon_status['quorum'][0]
499
500 return mon_map_mons
501
502
503 def get_osd_pools(self):
504 osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds']))
505 pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools']))
a8e16298
TL
506 crush = self.get('osd_map_crush')
507 crush_rules = crush['rules']
31f18b77
FG
508
509 osds_by_pool = {}
510 for pool_id, pool in pools.items():
511 pool_osds = None
512 for rule in [r for r in crush_rules if r['rule_id'] == pool['crush_rule']]:
513 if rule['min_size'] <= pool['size'] <= rule['max_size']:
a8e16298 514 pool_osds = common.crush_rule_osds(crush['buckets'], rule)
31f18b77
FG
515
516 osds_by_pool[pool_id] = pool_osds
517
518 for pool_id in pools.keys():
519 for in_pool_id in osds_by_pool[pool_id]:
520 osds[in_pool_id].append(pool_id)
521
522 return osds
523
524
525 def get_osds(self, pool_id=None, ids=None):
526 # Get data
527 osd_map = self.get('osd_map')
528 osd_metadata = self.get('osd_metadata')
529
530 # Update the data with the additional info from the osd map
531 osds = osd_map['osds']
532
533 # Filter by osd ids
534 if ids is not None:
1adf2230 535 osds = [x for x in osds if str(x['osd']) in ids]
31f18b77
FG
536
537 # Get list of pools per osd node
538 pools_map = self.get_osd_pools()
539
540 # map osd IDs to reweight
541 reweight_map = dict([
542 (x.get('id'), x.get('reweight', None))
543 for x in self.get('osd_map_tree')['nodes']
544 ])
545
546 # Build OSD data objects
547 for osd in osds:
548 osd['pools'] = pools_map[osd['osd']]
549 osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None)
550
551 osd['reweight'] = reweight_map.get(osd['osd'], 0.0)
552
553 if osd['up']:
554 osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS
555 else:
556 osd['valid_commands'] = []
557
558 # Filter by pool
559 if pool_id:
560 pool_id = int(pool_id)
1adf2230 561 osds = [x for x in osds if pool_id in x['pools']]
31f18b77
FG
562
563 return osds
564
565
566 def get_osd_by_id(self, osd_id):
1adf2230
AA
567 osd = [x for x in self.get('osd_map')['osds']
568 if x['osd'] == osd_id]
31f18b77
FG
569
570 if len(osd) != 1:
571 return None
572
573 return osd[0]
574
575
576 def get_pool_by_id(self, pool_id):
1adf2230
AA
577 pool = [x for x in self.get('osd_map')['pools']
578 if x['pool'] == pool_id]
31f18b77
FG
579
580 if len(pool) != 1:
581 return None
582
583 return pool[0]
584
585
586 def submit_request(self, _request, **kwargs):
31f18b77 587 with self.requests_lock:
f64942e4 588 request = CommandsRequest(_request)
31f18b77
FG
589 self.requests.append(request)
590 if kwargs.get('wait', 0):
591 while not request.is_finished():
592 time.sleep(0.001)
593 return request
594
595
596 def run_command(self, command):
597 # tag with 'seq' so that we can ingore these in notify function
598 result = CommandResult('seq')
599
600 self.send_command(result, 'mon', '', json.dumps(command), 'seq')
601 return result.wait()