]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/restful/module.py
update source to 12.2.11
[ceph.git] / ceph / src / pybind / mgr / restful / module.py
1 """
2 A RESTful API for Ceph
3 """
4
5 import os
6 import json
7 import time
8 import errno
9 import inspect
10 import tempfile
11 import threading
12 import traceback
13 import six
14 import socket
15
16 import common
17
18 from uuid import uuid4
19 from pecan import jsonify, make_app
20 from OpenSSL import crypto
21 from pecan.rest import RestController
22 from werkzeug.serving import make_server, make_ssl_devcert
23
24 from hooks import ErrorHook
25 from mgr_module import MgrModule, CommandResult
26
27
28 # Global instance to share
29 instance = None
30
31
32 class CannotServe(Exception):
33 pass
34
35
36 class CommandsRequest(object):
37 """
38 This class handles parallel as well as sequential execution of
39 commands. The class accept a list of iterables that should be
40 executed sequentially. Each iterable can contain several commands
41 that can be executed in parallel.
42
43 Example:
44 [[c1,c2],[c3,c4]]
45 - run c1 and c2 in parallel
46 - wait for them to finish
47 - run c3 and c4 in parallel
48 - wait for them to finish
49 """
50
51
52 def __init__(self, commands_arrays):
53 self.id = str(id(self))
54
55 # Filter out empty sub-requests
56 commands_arrays = [x for x in commands_arrays
57 if len(x) != 0]
58
59 self.running = []
60 self.waiting = commands_arrays[1:]
61 self.finished = []
62 self.failed = []
63
64 self.lock = threading.RLock()
65 if not len(commands_arrays):
66 # Nothing to run
67 return
68
69 # Process first iteration of commands_arrays in parallel
70 results = self.run(commands_arrays[0])
71
72 self.running.extend(results)
73
74
75 def run(self, commands):
76 """
77 A static method that will execute the given list of commands in
78 parallel and will return the list of command results.
79 """
80
81 # Gather the results (in parallel)
82 results = []
83 for index in range(len(commands)):
84 tag = '%s:%d' % (str(self.id), index)
85
86 # Store the result
87 result = CommandResult(tag)
88 result.command = common.humanify_command(commands[index])
89 results.append(result)
90
91 # Run the command
92 instance.send_command(result, 'mon', '', json.dumps(commands[index]), tag)
93
94 return results
95
96
97 def next(self):
98 with self.lock:
99 if not self.waiting:
100 # Nothing to run
101 return
102
103 # Run a next iteration of commands
104 commands = self.waiting[0]
105 self.waiting = self.waiting[1:]
106
107 self.running.extend(self.run(commands))
108
109
110 def finish(self, tag):
111 with self.lock:
112 for index in range(len(self.running)):
113 if self.running[index].tag == tag:
114 if self.running[index].r == 0:
115 self.finished.append(self.running.pop(index))
116 else:
117 self.failed.append(self.running.pop(index))
118 return True
119
120 # No such tag found
121 return False
122
123
124 def is_running(self, tag):
125 for result in self.running:
126 if result.tag == tag:
127 return True
128 return False
129
130
131 def is_ready(self):
132 with self.lock:
133 return not self.running and self.waiting
134
135
136 def is_waiting(self):
137 return bool(self.waiting)
138
139
140 def is_finished(self):
141 with self.lock:
142 return not self.running and not self.waiting
143
144
145 def has_failed(self):
146 return bool(self.failed)
147
148
149 def get_state(self):
150 with self.lock:
151 if not self.is_finished():
152 return "pending"
153
154 if self.has_failed():
155 return "failed"
156
157 return "success"
158
159
160 def __json__(self):
161 return {
162 'id': self.id,
163 'running': map(
164 lambda x: {
165 'command': x.command,
166 'outs': x.outs,
167 'outb': x.outb,
168 },
169 self.running
170 ),
171 'finished': map(
172 lambda x: {
173 'command': x.command,
174 'outs': x.outs,
175 'outb': x.outb,
176 },
177 self.finished
178 ),
179 'waiting': map(
180 lambda x: map(
181 lambda y: common.humanify_command(y),
182 x
183 ),
184 self.waiting
185 ),
186 'failed': map(
187 lambda x: {
188 'command': x.command,
189 'outs': x.outs,
190 'outb': x.outb,
191 },
192 self.failed
193 ),
194 'is_waiting': self.is_waiting(),
195 'is_finished': self.is_finished(),
196 'has_failed': self.has_failed(),
197 'state': self.get_state(),
198 }
199
200
201
202 class Module(MgrModule):
203 COMMANDS = [
204 {
205 "cmd": "restful create-key name=key_name,type=CephString",
206 "desc": "Create an API key with this name",
207 "perm": "rw"
208 },
209 {
210 "cmd": "restful delete-key name=key_name,type=CephString",
211 "desc": "Delete an API key with this name",
212 "perm": "rw"
213 },
214 {
215 "cmd": "restful list-keys",
216 "desc": "List all API keys",
217 "perm": "rw"
218 },
219 {
220 "cmd": "restful create-self-signed-cert",
221 "desc": "Create localized self signed certificate",
222 "perm": "rw"
223 },
224 {
225 "cmd": "restful restart",
226 "desc": "Restart API server",
227 "perm": "rw"
228 },
229 ]
230
231 def __init__(self, *args, **kwargs):
232 super(Module, self).__init__(*args, **kwargs)
233 global instance
234 instance = self
235
236 self.requests = []
237 self.requests_lock = threading.RLock()
238
239 self.keys = {}
240 self.disable_auth = False
241
242 self.server = None
243
244 self.stop_server = False
245 self.serve_event = threading.Event()
246
247
248 def serve(self):
249 while not self.stop_server:
250 try:
251 self._serve()
252 self.server.socket.close()
253 except CannotServe as cs:
254 self.log.warn("server not running: %s", cs)
255 except:
256 self.log.error(str(traceback.format_exc()))
257
258 # Wait and clear the threading event
259 self.serve_event.wait()
260 self.serve_event.clear()
261
262 def refresh_keys(self):
263 self.keys = {}
264 rawkeys = self.get_config_prefix('keys/') or {}
265 for k, v in six.iteritems(rawkeys):
266 self.keys[k[5:]] = v # strip of keys/ prefix
267
268 def _serve(self):
269 # Load stored authentication keys
270 self.refresh_keys()
271
272 jsonify._instance = jsonify.GenericJSON(
273 sort_keys=True,
274 indent=4,
275 separators=(',', ': '),
276 )
277
278 server_addr = self.get_localized_config('server_addr', '::')
279 if server_addr is None:
280 raise CannotServe('no server_addr configured; try "ceph config-key set mgr/restful/server_addr <ip>"')
281
282 server_port = int(self.get_localized_config('server_port', '8003'))
283 self.log.info('server_addr: %s server_port: %d',
284 server_addr, server_port)
285
286 cert = self.get_localized_config("crt")
287 if cert is not None:
288 cert_tmp = tempfile.NamedTemporaryFile()
289 cert_tmp.write(cert.encode('utf-8'))
290 cert_tmp.flush()
291 cert_fname = cert_tmp.name
292 else:
293 cert_fname = self.get_localized_config('crt_file')
294
295 pkey = self.get_localized_config("key")
296 if pkey is not None:
297 pkey_tmp = tempfile.NamedTemporaryFile()
298 pkey_tmp.write(pkey.encode('utf-8'))
299 pkey_tmp.flush()
300 pkey_fname = pkey_tmp.name
301 else:
302 pkey_fname = self.get_localized_config('key_file')
303
304 if not cert_fname or not pkey_fname:
305 raise CannotServe('no certificate configured')
306 if not os.path.isfile(cert_fname):
307 raise CannotServe('certificate %s does not exist' % cert_fname)
308 if not os.path.isfile(pkey_fname):
309 raise CannotServe('private key %s does not exist' % pkey_fname)
310
311 # Publish the URI that others may use to access the service we're
312 # about to start serving
313 self.set_uri("https://{0}:{1}/".format(
314 socket.gethostname() if server_addr == "::" else server_addr,
315 server_port
316 ))
317
318 # Create the HTTPS werkzeug server serving pecan app
319 self.server = make_server(
320 host=server_addr,
321 port=server_port,
322 app=make_app(
323 root='restful.api.Root',
324 hooks = [ErrorHook()], # use a callable if pecan >= 0.3.2
325 ),
326 ssl_context=(cert_fname, pkey_fname),
327 )
328
329 self.server.serve_forever()
330
331
332 def shutdown(self):
333 try:
334 self.stop_server = True
335 if self.server:
336 self.server.shutdown()
337 self.serve_event.set()
338 except:
339 self.log.error(str(traceback.format_exc()))
340 raise
341
342
343 def restart(self):
344 try:
345 if self.server:
346 self.server.shutdown()
347 self.serve_event.set()
348 except:
349 self.log.error(str(traceback.format_exc()))
350
351
352 def notify(self, notify_type, tag):
353 try:
354 self._notify(notify_type, tag)
355 except:
356 self.log.error(str(traceback.format_exc()))
357
358
359 def _notify(self, notify_type, tag):
360 if notify_type == "command":
361 # we can safely skip all the sequential commands
362 if tag == 'seq':
363 return
364
365 with self.requests_lock:
366 request = [x for x in self.requests if x.is_running(tag)]
367
368 if len(request) != 1:
369 self.log.warn("Unknown request '%s'" % str(tag))
370 return
371
372 request = request[0]
373 request.finish(tag)
374 if request.is_ready():
375 request.next()
376 else:
377 self.log.debug("Unhandled notification type '%s'" % notify_type)
378
379
380 def create_self_signed_cert(self):
381 # create a key pair
382 pkey = crypto.PKey()
383 pkey.generate_key(crypto.TYPE_RSA, 2048)
384
385 # create a self-signed cert
386 cert = crypto.X509()
387 cert.get_subject().O = "IT"
388 cert.get_subject().CN = "ceph-restful"
389 cert.set_serial_number(int(uuid4()))
390 cert.gmtime_adj_notBefore(0)
391 cert.gmtime_adj_notAfter(10*365*24*60*60)
392 cert.set_issuer(cert.get_subject())
393 cert.set_pubkey(pkey)
394 cert.sign(pkey, 'sha512')
395
396 return (
397 crypto.dump_certificate(crypto.FILETYPE_PEM, cert),
398 crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
399 )
400
401
402 def handle_command(self, command):
403 self.log.warn("Handling command: '%s'" % str(command))
404 if command['prefix'] == "restful create-key":
405 if command['key_name'] in self.keys:
406 return 0, self.keys[command['key_name']], ""
407
408 else:
409 key = str(uuid4())
410 self.keys[command['key_name']] = key
411 self.set_config('keys/' + command['key_name'], key)
412
413 return (
414 0,
415 self.keys[command['key_name']],
416 "",
417 )
418
419 elif command['prefix'] == "restful delete-key":
420 if command['key_name'] in self.keys:
421 del self.keys[command['key_name']]
422 self.set_config('keys/' + command['key_name'], None)
423
424 return (
425 0,
426 "",
427 "",
428 )
429
430 elif command['prefix'] == "restful list-keys":
431 self.refresh_keys()
432 return (
433 0,
434 json.dumps(self.keys, indent=2),
435 "",
436 )
437
438 elif command['prefix'] == "restful create-self-signed-cert":
439 cert, pkey = self.create_self_signed_cert()
440 self.set_config(self.get_mgr_id() + '/crt', cert.decode('utf-8'))
441 self.set_config(self.get_mgr_id() + '/key', pkey.decode('utf-8'))
442
443 self.restart()
444 return (
445 0,
446 "Restarting RESTful API server...",
447 ""
448 )
449
450 elif command['prefix'] == 'restful restart':
451 self.restart();
452 return (
453 0,
454 "Restarting RESTful API server...",
455 ""
456 )
457
458 else:
459 return (
460 -errno.EINVAL,
461 "",
462 "Command not found '{0}'".format(command['prefix'])
463 )
464
465
466 def get_doc_api(self, root, prefix=''):
467 doc = {}
468 for _obj in dir(root):
469 obj = getattr(root, _obj)
470
471 if isinstance(obj, RestController):
472 doc.update(self.get_doc_api(obj, prefix + '/' + _obj))
473
474 if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController):
475 doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/<arg>'))
476
477 prefix = prefix or '/'
478
479 doc[prefix] = {}
480 for method in 'get', 'post', 'patch', 'delete':
481 if getattr(root, method, None):
482 doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n')
483
484 if len(doc[prefix]) == 0:
485 del doc[prefix]
486
487 return doc
488
489
490 def get_mons(self):
491 mon_map_mons = self.get('mon_map')['mons']
492 mon_status = json.loads(self.get('mon_status')['json'])
493
494 # Add more information
495 for mon in mon_map_mons:
496 mon['in_quorum'] = mon['rank'] in mon_status['quorum']
497 mon['server'] = self.get_metadata("mon", mon['name'])['hostname']
498 mon['leader'] = mon['rank'] == mon_status['quorum'][0]
499
500 return mon_map_mons
501
502
503 def get_osd_pools(self):
504 osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds']))
505 pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools']))
506 crush_rules = self.get('osd_map_crush')['rules']
507
508 osds_by_pool = {}
509 for pool_id, pool in pools.items():
510 pool_osds = None
511 for rule in [r for r in crush_rules if r['rule_id'] == pool['crush_rule']]:
512 if rule['min_size'] <= pool['size'] <= rule['max_size']:
513 pool_osds = common.crush_rule_osds(self.get('osd_map_tree')['nodes'], rule)
514
515 osds_by_pool[pool_id] = pool_osds
516
517 for pool_id in pools.keys():
518 for in_pool_id in osds_by_pool[pool_id]:
519 osds[in_pool_id].append(pool_id)
520
521 return osds
522
523
524 def get_osds(self, pool_id=None, ids=None):
525 # Get data
526 osd_map = self.get('osd_map')
527 osd_metadata = self.get('osd_metadata')
528
529 # Update the data with the additional info from the osd map
530 osds = osd_map['osds']
531
532 # Filter by osd ids
533 if ids is not None:
534 osds = [x for x in osds if str(x['osd']) in ids]
535
536 # Get list of pools per osd node
537 pools_map = self.get_osd_pools()
538
539 # map osd IDs to reweight
540 reweight_map = dict([
541 (x.get('id'), x.get('reweight', None))
542 for x in self.get('osd_map_tree')['nodes']
543 ])
544
545 # Build OSD data objects
546 for osd in osds:
547 osd['pools'] = pools_map[osd['osd']]
548 osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None)
549
550 osd['reweight'] = reweight_map.get(osd['osd'], 0.0)
551
552 if osd['up']:
553 osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS
554 else:
555 osd['valid_commands'] = []
556
557 # Filter by pool
558 if pool_id:
559 pool_id = int(pool_id)
560 osds = [x for x in osds if pool_id in x['pools']]
561
562 return osds
563
564
565 def get_osd_by_id(self, osd_id):
566 osd = [x for x in self.get('osd_map')['osds']
567 if x['osd'] == osd_id]
568
569 if len(osd) != 1:
570 return None
571
572 return osd[0]
573
574
575 def get_pool_by_id(self, pool_id):
576 pool = [x for x in self.get('osd_map')['pools']
577 if x['pool'] == pool_id]
578
579 if len(pool) != 1:
580 return None
581
582 return pool[0]
583
584
585 def submit_request(self, _request, **kwargs):
586 with self.requests_lock:
587 request = CommandsRequest(_request)
588 self.requests.append(request)
589 if kwargs.get('wait', 0):
590 while not request.is_finished():
591 time.sleep(0.001)
592 return request
593
594
595 def run_command(self, command):
596 # tag with 'seq' so that we can ingore these in notify function
597 result = CommandResult('seq')
598
599 self.send_command(result, 'mon', '', json.dumps(command), 'seq')
600 return result.wait()