]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/restful/module.py
update sources to 12.2.7
[ceph.git] / ceph / src / pybind / mgr / restful / module.py
CommitLineData
31f18b77
FG
1"""
2A RESTful API for Ceph
3"""
4
5import os
6import json
7import time
8import errno
9import inspect
10import tempfile
11import threading
12import traceback
3efd9988 13import socket
31f18b77
FG
14
15import common
16
17from uuid import uuid4
18from pecan import jsonify, make_app
19from OpenSSL import crypto
20from pecan.rest import RestController
21from werkzeug.serving import make_server, make_ssl_devcert
22
23from hooks import ErrorHook
24from mgr_module import MgrModule, CommandResult
25
26# Global instance to share
27instance = None
28
29
3efd9988
FG
30class CannotServe(Exception):
31 pass
32
31f18b77
FG
33
34class CommandsRequest(object):
35 """
36 This class handles parallel as well as sequential execution of
37 commands. The class accept a list of iterables that should be
38 executed sequentially. Each iterable can contain several commands
39 that can be executed in parallel.
40
41 Example:
42 [[c1,c2],[c3,c4]]
43 - run c1 and c2 in parallel
44 - wait for them to finish
45 - run c3 and c4 in parallel
46 - wait for them to finish
47 """
48
49
50 def __init__(self, commands_arrays):
51 self.id = str(id(self))
52
53 # Filter out empty sub-requests
54 commands_arrays = filter(
55 lambda x: len(x) != 0,
56 commands_arrays,
57 )
58
59 self.running = []
60 self.waiting = commands_arrays[1:]
61 self.finished = []
62 self.failed = []
63
64 self.lock = threading.RLock()
65 if not len(commands_arrays):
66 # Nothing to run
67 return
68
69 # Process first iteration of commands_arrays in parallel
70 results = self.run(commands_arrays[0])
71
72 self.running.extend(results)
73
74
75 def run(self, commands):
76 """
77 A static method that will execute the given list of commands in
78 parallel and will return the list of command results.
79 """
80
81 # Gather the results (in parallel)
82 results = []
83 for index in range(len(commands)):
84 tag = '%s:%d' % (str(self.id), index)
85
86 # Store the result
87 result = CommandResult(tag)
88 result.command = common.humanify_command(commands[index])
89 results.append(result)
90
91 # Run the command
92 instance.send_command(result, 'mon', '', json.dumps(commands[index]), tag)
93
94 return results
95
96
97 def next(self):
98 with self.lock:
99 if not self.waiting:
100 # Nothing to run
101 return
102
103 # Run a next iteration of commands
104 commands = self.waiting[0]
105 self.waiting = self.waiting[1:]
106
107 self.running.extend(self.run(commands))
108
109
110 def finish(self, tag):
111 with self.lock:
112 for index in range(len(self.running)):
113 if self.running[index].tag == tag:
114 if self.running[index].r == 0:
115 self.finished.append(self.running.pop(index))
116 else:
117 self.failed.append(self.running.pop(index))
118 return True
119
120 # No such tag found
121 return False
122
123
124 def is_running(self, tag):
125 for result in self.running:
126 if result.tag == tag:
127 return True
128 return False
129
130
131 def is_ready(self):
132 with self.lock:
133 return not self.running and self.waiting
134
135
136 def is_waiting(self):
137 return bool(self.waiting)
138
139
140 def is_finished(self):
141 with self.lock:
142 return not self.running and not self.waiting
143
144
145 def has_failed(self):
146 return bool(self.failed)
147
148
149 def get_state(self):
150 with self.lock:
151 if not self.is_finished():
152 return "pending"
153
154 if self.has_failed():
155 return "failed"
156
157 return "success"
158
159
160 def __json__(self):
161 return {
162 'id': self.id,
163 'running': map(
164 lambda x: {
165 'command': x.command,
166 'outs': x.outs,
167 'outb': x.outb,
168 },
169 self.running
170 ),
171 'finished': map(
172 lambda x: {
173 'command': x.command,
174 'outs': x.outs,
175 'outb': x.outb,
176 },
177 self.finished
178 ),
179 'waiting': map(
28e407b8
AA
180 lambda x: map(
181 lambda y: common.humanify_command(y),
182 x
183 ),
31f18b77
FG
184 self.waiting
185 ),
186 'failed': map(
187 lambda x: {
188 'command': x.command,
189 'outs': x.outs,
190 'outb': x.outb,
191 },
192 self.failed
193 ),
194 'is_waiting': self.is_waiting(),
195 'is_finished': self.is_finished(),
196 'has_failed': self.has_failed(),
197 'state': self.get_state(),
198 }
199
200
201
202class Module(MgrModule):
203 COMMANDS = [
204 {
205 "cmd": "restful create-key name=key_name,type=CephString",
206 "desc": "Create an API key with this name",
207 "perm": "rw"
208 },
209 {
210 "cmd": "restful delete-key name=key_name,type=CephString",
211 "desc": "Delete an API key with this name",
212 "perm": "rw"
213 },
214 {
215 "cmd": "restful list-keys",
216 "desc": "List all API keys",
217 "perm": "rw"
218 },
219 {
220 "cmd": "restful create-self-signed-cert",
221 "desc": "Create localized self signed certificate",
222 "perm": "rw"
223 },
224 {
225 "cmd": "restful restart",
226 "desc": "Restart API server",
227 "perm": "rw"
228 },
229 ]
230
231 def __init__(self, *args, **kwargs):
232 super(Module, self).__init__(*args, **kwargs)
233 global instance
234 instance = self
235
236 self.requests = []
237 self.requests_lock = threading.RLock()
238
239 self.keys = {}
240 self.disable_auth = False
241
242 self.server = None
243
244 self.stop_server = False
245 self.serve_event = threading.Event()
246
247
248 def serve(self):
249 while not self.stop_server:
250 try:
251 self._serve()
252 self.server.socket.close()
3efd9988
FG
253 except CannotServe as cs:
254 self.log.warn("server not running: {0}".format(cs.message))
31f18b77
FG
255 except:
256 self.log.error(str(traceback.format_exc()))
257
258 # Wait and clear the threading event
259 self.serve_event.wait()
260 self.serve_event.clear()
261
31f18b77
FG
262 def refresh_keys(self):
263 self.keys = {}
264 rawkeys = self.get_config_prefix('keys/') or {}
265 for k, v in rawkeys.iteritems():
266 self.keys[k[5:]] = v # strip of keys/ prefix
267
268 def _serve(self):
269 # Load stored authentication keys
270 self.refresh_keys()
271
272 jsonify._instance = jsonify.GenericJSON(
273 sort_keys=True,
274 indent=4,
275 separators=(',', ': '),
276 )
277
224ce89b 278 server_addr = self.get_localized_config('server_addr', '::')
31f18b77 279 if server_addr is None:
3efd9988
FG
280 raise CannotServe('no server_addr configured; try "ceph config-key set mgr/restful/server_addr <ip>"')
281
224ce89b 282 server_port = int(self.get_localized_config('server_port', '8003'))
31f18b77
FG
283 self.log.info('server_addr: %s server_port: %d',
284 server_addr, server_port)
285
286 cert = self.get_localized_config("crt")
287 if cert is not None:
288 cert_tmp = tempfile.NamedTemporaryFile()
289 cert_tmp.write(cert)
290 cert_tmp.flush()
291 cert_fname = cert_tmp.name
292 else:
293 cert_fname = self.get_localized_config('crt_file')
294
295 pkey = self.get_localized_config("key")
296 if pkey is not None:
297 pkey_tmp = tempfile.NamedTemporaryFile()
298 pkey_tmp.write(pkey)
299 pkey_tmp.flush()
300 pkey_fname = pkey_tmp.name
301 else:
302 pkey_fname = self.get_localized_config('key_file')
303
304 if not cert_fname or not pkey_fname:
3efd9988 305 raise CannotServe('no certificate configured')
31f18b77 306 if not os.path.isfile(cert_fname):
3efd9988 307 raise CannotServe('certificate %s does not exist' % cert_fname)
31f18b77 308 if not os.path.isfile(pkey_fname):
3efd9988
FG
309 raise CannotServe('private key %s does not exist' % pkey_fname)
310
311 # Publish the URI that others may use to access the service we're
312 # about to start serving
313 self.set_uri("https://{0}:{1}/".format(
314 socket.gethostname() if server_addr == "::" else server_addr,
315 server_port
316 ))
31f18b77
FG
317
318 # Create the HTTPS werkzeug server serving pecan app
319 self.server = make_server(
320 host=server_addr,
321 port=server_port,
322 app=make_app(
323 root='restful.api.Root',
324 hooks = [ErrorHook()], # use a callable if pecan >= 0.3.2
325 ),
326 ssl_context=(cert_fname, pkey_fname),
327 )
328
329 self.server.serve_forever()
330
331
332 def shutdown(self):
333 try:
334 self.stop_server = True
335 if self.server:
336 self.server.shutdown()
337 self.serve_event.set()
338 except:
339 self.log.error(str(traceback.format_exc()))
340 raise
341
342
343 def restart(self):
344 try:
345 if self.server:
346 self.server.shutdown()
347 self.serve_event.set()
348 except:
349 self.log.error(str(traceback.format_exc()))
350
351
352 def notify(self, notify_type, tag):
353 try:
354 self._notify(notify_type, tag)
355 except:
356 self.log.error(str(traceback.format_exc()))
357
358
359 def _notify(self, notify_type, tag):
360 if notify_type == "command":
361 # we can safely skip all the sequential commands
362 if tag == 'seq':
363 return
364
365 request = filter(
366 lambda x: x.is_running(tag),
367 self.requests)
368
369 if len(request) != 1:
370 self.log.warn("Unknown request '%s'" % str(tag))
371 return
372
373 request = request[0]
374 request.finish(tag)
375 if request.is_ready():
376 request.next()
377 else:
378 self.log.debug("Unhandled notification type '%s'" % notify_type)
379
380
381 def create_self_signed_cert(self):
382 # create a key pair
383 pkey = crypto.PKey()
384 pkey.generate_key(crypto.TYPE_RSA, 2048)
385
386 # create a self-signed cert
387 cert = crypto.X509()
388 cert.get_subject().O = "IT"
389 cert.get_subject().CN = "ceph-restful"
390 cert.set_serial_number(int(uuid4()))
391 cert.gmtime_adj_notBefore(0)
392 cert.gmtime_adj_notAfter(10*365*24*60*60)
393 cert.set_issuer(cert.get_subject())
394 cert.set_pubkey(pkey)
395 cert.sign(pkey, 'sha512')
396
397 return (
398 crypto.dump_certificate(crypto.FILETYPE_PEM, cert),
399 crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
400 )
401
402
403 def handle_command(self, command):
404 self.log.warn("Handling command: '%s'" % str(command))
405 if command['prefix'] == "restful create-key":
406 if command['key_name'] in self.keys:
407 return 0, self.keys[command['key_name']], ""
408
409 else:
410 key = str(uuid4())
411 self.keys[command['key_name']] = key
412 self.set_config('keys/' + command['key_name'], key)
413
414 return (
415 0,
416 self.keys[command['key_name']],
417 "",
418 )
419
420 elif command['prefix'] == "restful delete-key":
421 if command['key_name'] in self.keys:
422 del self.keys[command['key_name']]
423 self.set_config('keys/' + command['key_name'], None)
424
425 return (
426 0,
427 "",
428 "",
429 )
430
431 elif command['prefix'] == "restful list-keys":
432 self.refresh_keys()
433 return (
434 0,
435 json.dumps(self.keys, indent=2),
436 "",
437 )
438
439 elif command['prefix'] == "restful create-self-signed-cert":
440 cert, pkey = self.create_self_signed_cert()
441
442 self.set_config(self.get_mgr_id() + '/crt', cert)
443 self.set_config(self.get_mgr_id() + '/key', pkey)
444
445 self.restart()
446 return (
447 0,
448 "Restarting RESTful API server...",
449 ""
450 )
451
452 elif command['prefix'] == 'restful restart':
453 self.restart();
454 return (
455 0,
456 "Restarting RESTful API server...",
457 ""
458 )
459
460 else:
461 return (
462 -errno.EINVAL,
463 "",
464 "Command not found '{0}'".format(command['prefix'])
465 )
466
467
468 def get_doc_api(self, root, prefix=''):
469 doc = {}
470 for _obj in dir(root):
471 obj = getattr(root, _obj)
472
473 if isinstance(obj, RestController):
474 doc.update(self.get_doc_api(obj, prefix + '/' + _obj))
475
476 if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController):
477 doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/<arg>'))
478
479 prefix = prefix or '/'
480
481 doc[prefix] = {}
482 for method in 'get', 'post', 'patch', 'delete':
483 if getattr(root, method, None):
484 doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n')
485
486 if len(doc[prefix]) == 0:
487 del doc[prefix]
488
489 return doc
490
491
492 def get_mons(self):
493 mon_map_mons = self.get('mon_map')['mons']
494 mon_status = json.loads(self.get('mon_status')['json'])
495
496 # Add more information
497 for mon in mon_map_mons:
498 mon['in_quorum'] = mon['rank'] in mon_status['quorum']
499 mon['server'] = self.get_metadata("mon", mon['name'])['hostname']
500 mon['leader'] = mon['rank'] == mon_status['quorum'][0]
501
502 return mon_map_mons
503
504
505 def get_osd_pools(self):
506 osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds']))
507 pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools']))
508 crush_rules = self.get('osd_map_crush')['rules']
509
510 osds_by_pool = {}
511 for pool_id, pool in pools.items():
512 pool_osds = None
513 for rule in [r for r in crush_rules if r['rule_id'] == pool['crush_rule']]:
514 if rule['min_size'] <= pool['size'] <= rule['max_size']:
515 pool_osds = common.crush_rule_osds(self.get('osd_map_tree')['nodes'], rule)
516
517 osds_by_pool[pool_id] = pool_osds
518
519 for pool_id in pools.keys():
520 for in_pool_id in osds_by_pool[pool_id]:
521 osds[in_pool_id].append(pool_id)
522
523 return osds
524
525
526 def get_osds(self, pool_id=None, ids=None):
527 # Get data
528 osd_map = self.get('osd_map')
529 osd_metadata = self.get('osd_metadata')
530
531 # Update the data with the additional info from the osd map
532 osds = osd_map['osds']
533
534 # Filter by osd ids
535 if ids is not None:
536 osds = filter(
537 lambda x: str(x['osd']) in ids,
538 osds
539 )
540
541 # Get list of pools per osd node
542 pools_map = self.get_osd_pools()
543
544 # map osd IDs to reweight
545 reweight_map = dict([
546 (x.get('id'), x.get('reweight', None))
547 for x in self.get('osd_map_tree')['nodes']
548 ])
549
550 # Build OSD data objects
551 for osd in osds:
552 osd['pools'] = pools_map[osd['osd']]
553 osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None)
554
555 osd['reweight'] = reweight_map.get(osd['osd'], 0.0)
556
557 if osd['up']:
558 osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS
559 else:
560 osd['valid_commands'] = []
561
562 # Filter by pool
563 if pool_id:
564 pool_id = int(pool_id)
565 osds = filter(
566 lambda x: pool_id in x['pools'],
567 osds
568 )
569
570 return osds
571
572
573 def get_osd_by_id(self, osd_id):
574 osd = filter(
575 lambda x: x['osd'] == osd_id,
576 self.get('osd_map')['osds']
577 )
578
579 if len(osd) != 1:
580 return None
581
582 return osd[0]
583
584
585 def get_pool_by_id(self, pool_id):
586 pool = filter(
587 lambda x: x['pool'] == pool_id,
588 self.get('osd_map')['pools'],
589 )
590
591 if len(pool) != 1:
592 return None
593
594 return pool[0]
595
596
597 def submit_request(self, _request, **kwargs):
598 request = CommandsRequest(_request)
599 with self.requests_lock:
600 self.requests.append(request)
601 if kwargs.get('wait', 0):
602 while not request.is_finished():
603 time.sleep(0.001)
604 return request
605
606
607 def run_command(self, command):
608 # tag with 'seq' so that we can ingore these in notify function
609 result = CommandResult('seq')
610
611 self.send_command(result, 'mon', '', json.dumps(command), 'seq')
612 return result.wait()