]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/restful/module.py
update sources to 12.2.7
[ceph.git] / ceph / src / pybind / mgr / restful / module.py
1 """
2 A RESTful API for Ceph
3 """
4
5 import os
6 import json
7 import time
8 import errno
9 import inspect
10 import tempfile
11 import threading
12 import traceback
13 import socket
14
15 import common
16
17 from uuid import uuid4
18 from pecan import jsonify, make_app
19 from OpenSSL import crypto
20 from pecan.rest import RestController
21 from werkzeug.serving import make_server, make_ssl_devcert
22
23 from hooks import ErrorHook
24 from mgr_module import MgrModule, CommandResult
25
26 # Global instance to share
27 instance = None
28
29
30 class CannotServe(Exception):
31 pass
32
33
34 class CommandsRequest(object):
35 """
36 This class handles parallel as well as sequential execution of
37 commands. The class accept a list of iterables that should be
38 executed sequentially. Each iterable can contain several commands
39 that can be executed in parallel.
40
41 Example:
42 [[c1,c2],[c3,c4]]
43 - run c1 and c2 in parallel
44 - wait for them to finish
45 - run c3 and c4 in parallel
46 - wait for them to finish
47 """
48
49
50 def __init__(self, commands_arrays):
51 self.id = str(id(self))
52
53 # Filter out empty sub-requests
54 commands_arrays = filter(
55 lambda x: len(x) != 0,
56 commands_arrays,
57 )
58
59 self.running = []
60 self.waiting = commands_arrays[1:]
61 self.finished = []
62 self.failed = []
63
64 self.lock = threading.RLock()
65 if not len(commands_arrays):
66 # Nothing to run
67 return
68
69 # Process first iteration of commands_arrays in parallel
70 results = self.run(commands_arrays[0])
71
72 self.running.extend(results)
73
74
75 def run(self, commands):
76 """
77 A static method that will execute the given list of commands in
78 parallel and will return the list of command results.
79 """
80
81 # Gather the results (in parallel)
82 results = []
83 for index in range(len(commands)):
84 tag = '%s:%d' % (str(self.id), index)
85
86 # Store the result
87 result = CommandResult(tag)
88 result.command = common.humanify_command(commands[index])
89 results.append(result)
90
91 # Run the command
92 instance.send_command(result, 'mon', '', json.dumps(commands[index]), tag)
93
94 return results
95
96
97 def next(self):
98 with self.lock:
99 if not self.waiting:
100 # Nothing to run
101 return
102
103 # Run a next iteration of commands
104 commands = self.waiting[0]
105 self.waiting = self.waiting[1:]
106
107 self.running.extend(self.run(commands))
108
109
110 def finish(self, tag):
111 with self.lock:
112 for index in range(len(self.running)):
113 if self.running[index].tag == tag:
114 if self.running[index].r == 0:
115 self.finished.append(self.running.pop(index))
116 else:
117 self.failed.append(self.running.pop(index))
118 return True
119
120 # No such tag found
121 return False
122
123
124 def is_running(self, tag):
125 for result in self.running:
126 if result.tag == tag:
127 return True
128 return False
129
130
131 def is_ready(self):
132 with self.lock:
133 return not self.running and self.waiting
134
135
136 def is_waiting(self):
137 return bool(self.waiting)
138
139
140 def is_finished(self):
141 with self.lock:
142 return not self.running and not self.waiting
143
144
145 def has_failed(self):
146 return bool(self.failed)
147
148
149 def get_state(self):
150 with self.lock:
151 if not self.is_finished():
152 return "pending"
153
154 if self.has_failed():
155 return "failed"
156
157 return "success"
158
159
160 def __json__(self):
161 return {
162 'id': self.id,
163 'running': map(
164 lambda x: {
165 'command': x.command,
166 'outs': x.outs,
167 'outb': x.outb,
168 },
169 self.running
170 ),
171 'finished': map(
172 lambda x: {
173 'command': x.command,
174 'outs': x.outs,
175 'outb': x.outb,
176 },
177 self.finished
178 ),
179 'waiting': map(
180 lambda x: map(
181 lambda y: common.humanify_command(y),
182 x
183 ),
184 self.waiting
185 ),
186 'failed': map(
187 lambda x: {
188 'command': x.command,
189 'outs': x.outs,
190 'outb': x.outb,
191 },
192 self.failed
193 ),
194 'is_waiting': self.is_waiting(),
195 'is_finished': self.is_finished(),
196 'has_failed': self.has_failed(),
197 'state': self.get_state(),
198 }
199
200
201
202 class Module(MgrModule):
203 COMMANDS = [
204 {
205 "cmd": "restful create-key name=key_name,type=CephString",
206 "desc": "Create an API key with this name",
207 "perm": "rw"
208 },
209 {
210 "cmd": "restful delete-key name=key_name,type=CephString",
211 "desc": "Delete an API key with this name",
212 "perm": "rw"
213 },
214 {
215 "cmd": "restful list-keys",
216 "desc": "List all API keys",
217 "perm": "rw"
218 },
219 {
220 "cmd": "restful create-self-signed-cert",
221 "desc": "Create localized self signed certificate",
222 "perm": "rw"
223 },
224 {
225 "cmd": "restful restart",
226 "desc": "Restart API server",
227 "perm": "rw"
228 },
229 ]
230
231 def __init__(self, *args, **kwargs):
232 super(Module, self).__init__(*args, **kwargs)
233 global instance
234 instance = self
235
236 self.requests = []
237 self.requests_lock = threading.RLock()
238
239 self.keys = {}
240 self.disable_auth = False
241
242 self.server = None
243
244 self.stop_server = False
245 self.serve_event = threading.Event()
246
247
248 def serve(self):
249 while not self.stop_server:
250 try:
251 self._serve()
252 self.server.socket.close()
253 except CannotServe as cs:
254 self.log.warn("server not running: {0}".format(cs.message))
255 except:
256 self.log.error(str(traceback.format_exc()))
257
258 # Wait and clear the threading event
259 self.serve_event.wait()
260 self.serve_event.clear()
261
262 def refresh_keys(self):
263 self.keys = {}
264 rawkeys = self.get_config_prefix('keys/') or {}
265 for k, v in rawkeys.iteritems():
266 self.keys[k[5:]] = v # strip of keys/ prefix
267
268 def _serve(self):
269 # Load stored authentication keys
270 self.refresh_keys()
271
272 jsonify._instance = jsonify.GenericJSON(
273 sort_keys=True,
274 indent=4,
275 separators=(',', ': '),
276 )
277
278 server_addr = self.get_localized_config('server_addr', '::')
279 if server_addr is None:
280 raise CannotServe('no server_addr configured; try "ceph config-key set mgr/restful/server_addr <ip>"')
281
282 server_port = int(self.get_localized_config('server_port', '8003'))
283 self.log.info('server_addr: %s server_port: %d',
284 server_addr, server_port)
285
286 cert = self.get_localized_config("crt")
287 if cert is not None:
288 cert_tmp = tempfile.NamedTemporaryFile()
289 cert_tmp.write(cert)
290 cert_tmp.flush()
291 cert_fname = cert_tmp.name
292 else:
293 cert_fname = self.get_localized_config('crt_file')
294
295 pkey = self.get_localized_config("key")
296 if pkey is not None:
297 pkey_tmp = tempfile.NamedTemporaryFile()
298 pkey_tmp.write(pkey)
299 pkey_tmp.flush()
300 pkey_fname = pkey_tmp.name
301 else:
302 pkey_fname = self.get_localized_config('key_file')
303
304 if not cert_fname or not pkey_fname:
305 raise CannotServe('no certificate configured')
306 if not os.path.isfile(cert_fname):
307 raise CannotServe('certificate %s does not exist' % cert_fname)
308 if not os.path.isfile(pkey_fname):
309 raise CannotServe('private key %s does not exist' % pkey_fname)
310
311 # Publish the URI that others may use to access the service we're
312 # about to start serving
313 self.set_uri("https://{0}:{1}/".format(
314 socket.gethostname() if server_addr == "::" else server_addr,
315 server_port
316 ))
317
318 # Create the HTTPS werkzeug server serving pecan app
319 self.server = make_server(
320 host=server_addr,
321 port=server_port,
322 app=make_app(
323 root='restful.api.Root',
324 hooks = [ErrorHook()], # use a callable if pecan >= 0.3.2
325 ),
326 ssl_context=(cert_fname, pkey_fname),
327 )
328
329 self.server.serve_forever()
330
331
332 def shutdown(self):
333 try:
334 self.stop_server = True
335 if self.server:
336 self.server.shutdown()
337 self.serve_event.set()
338 except:
339 self.log.error(str(traceback.format_exc()))
340 raise
341
342
343 def restart(self):
344 try:
345 if self.server:
346 self.server.shutdown()
347 self.serve_event.set()
348 except:
349 self.log.error(str(traceback.format_exc()))
350
351
352 def notify(self, notify_type, tag):
353 try:
354 self._notify(notify_type, tag)
355 except:
356 self.log.error(str(traceback.format_exc()))
357
358
359 def _notify(self, notify_type, tag):
360 if notify_type == "command":
361 # we can safely skip all the sequential commands
362 if tag == 'seq':
363 return
364
365 request = filter(
366 lambda x: x.is_running(tag),
367 self.requests)
368
369 if len(request) != 1:
370 self.log.warn("Unknown request '%s'" % str(tag))
371 return
372
373 request = request[0]
374 request.finish(tag)
375 if request.is_ready():
376 request.next()
377 else:
378 self.log.debug("Unhandled notification type '%s'" % notify_type)
379
380
381 def create_self_signed_cert(self):
382 # create a key pair
383 pkey = crypto.PKey()
384 pkey.generate_key(crypto.TYPE_RSA, 2048)
385
386 # create a self-signed cert
387 cert = crypto.X509()
388 cert.get_subject().O = "IT"
389 cert.get_subject().CN = "ceph-restful"
390 cert.set_serial_number(int(uuid4()))
391 cert.gmtime_adj_notBefore(0)
392 cert.gmtime_adj_notAfter(10*365*24*60*60)
393 cert.set_issuer(cert.get_subject())
394 cert.set_pubkey(pkey)
395 cert.sign(pkey, 'sha512')
396
397 return (
398 crypto.dump_certificate(crypto.FILETYPE_PEM, cert),
399 crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
400 )
401
402
403 def handle_command(self, command):
404 self.log.warn("Handling command: '%s'" % str(command))
405 if command['prefix'] == "restful create-key":
406 if command['key_name'] in self.keys:
407 return 0, self.keys[command['key_name']], ""
408
409 else:
410 key = str(uuid4())
411 self.keys[command['key_name']] = key
412 self.set_config('keys/' + command['key_name'], key)
413
414 return (
415 0,
416 self.keys[command['key_name']],
417 "",
418 )
419
420 elif command['prefix'] == "restful delete-key":
421 if command['key_name'] in self.keys:
422 del self.keys[command['key_name']]
423 self.set_config('keys/' + command['key_name'], None)
424
425 return (
426 0,
427 "",
428 "",
429 )
430
431 elif command['prefix'] == "restful list-keys":
432 self.refresh_keys()
433 return (
434 0,
435 json.dumps(self.keys, indent=2),
436 "",
437 )
438
439 elif command['prefix'] == "restful create-self-signed-cert":
440 cert, pkey = self.create_self_signed_cert()
441
442 self.set_config(self.get_mgr_id() + '/crt', cert)
443 self.set_config(self.get_mgr_id() + '/key', pkey)
444
445 self.restart()
446 return (
447 0,
448 "Restarting RESTful API server...",
449 ""
450 )
451
452 elif command['prefix'] == 'restful restart':
453 self.restart();
454 return (
455 0,
456 "Restarting RESTful API server...",
457 ""
458 )
459
460 else:
461 return (
462 -errno.EINVAL,
463 "",
464 "Command not found '{0}'".format(command['prefix'])
465 )
466
467
468 def get_doc_api(self, root, prefix=''):
469 doc = {}
470 for _obj in dir(root):
471 obj = getattr(root, _obj)
472
473 if isinstance(obj, RestController):
474 doc.update(self.get_doc_api(obj, prefix + '/' + _obj))
475
476 if getattr(root, '_lookup', None) and isinstance(root._lookup('0')[0], RestController):
477 doc.update(self.get_doc_api(root._lookup('0')[0], prefix + '/<arg>'))
478
479 prefix = prefix or '/'
480
481 doc[prefix] = {}
482 for method in 'get', 'post', 'patch', 'delete':
483 if getattr(root, method, None):
484 doc[prefix][method.upper()] = inspect.getdoc(getattr(root, method)).split('\n')
485
486 if len(doc[prefix]) == 0:
487 del doc[prefix]
488
489 return doc
490
491
492 def get_mons(self):
493 mon_map_mons = self.get('mon_map')['mons']
494 mon_status = json.loads(self.get('mon_status')['json'])
495
496 # Add more information
497 for mon in mon_map_mons:
498 mon['in_quorum'] = mon['rank'] in mon_status['quorum']
499 mon['server'] = self.get_metadata("mon", mon['name'])['hostname']
500 mon['leader'] = mon['rank'] == mon_status['quorum'][0]
501
502 return mon_map_mons
503
504
505 def get_osd_pools(self):
506 osds = dict(map(lambda x: (x['osd'], []), self.get('osd_map')['osds']))
507 pools = dict(map(lambda x: (x['pool'], x), self.get('osd_map')['pools']))
508 crush_rules = self.get('osd_map_crush')['rules']
509
510 osds_by_pool = {}
511 for pool_id, pool in pools.items():
512 pool_osds = None
513 for rule in [r for r in crush_rules if r['rule_id'] == pool['crush_rule']]:
514 if rule['min_size'] <= pool['size'] <= rule['max_size']:
515 pool_osds = common.crush_rule_osds(self.get('osd_map_tree')['nodes'], rule)
516
517 osds_by_pool[pool_id] = pool_osds
518
519 for pool_id in pools.keys():
520 for in_pool_id in osds_by_pool[pool_id]:
521 osds[in_pool_id].append(pool_id)
522
523 return osds
524
525
526 def get_osds(self, pool_id=None, ids=None):
527 # Get data
528 osd_map = self.get('osd_map')
529 osd_metadata = self.get('osd_metadata')
530
531 # Update the data with the additional info from the osd map
532 osds = osd_map['osds']
533
534 # Filter by osd ids
535 if ids is not None:
536 osds = filter(
537 lambda x: str(x['osd']) in ids,
538 osds
539 )
540
541 # Get list of pools per osd node
542 pools_map = self.get_osd_pools()
543
544 # map osd IDs to reweight
545 reweight_map = dict([
546 (x.get('id'), x.get('reweight', None))
547 for x in self.get('osd_map_tree')['nodes']
548 ])
549
550 # Build OSD data objects
551 for osd in osds:
552 osd['pools'] = pools_map[osd['osd']]
553 osd['server'] = osd_metadata.get(str(osd['osd']), {}).get('hostname', None)
554
555 osd['reweight'] = reweight_map.get(osd['osd'], 0.0)
556
557 if osd['up']:
558 osd['valid_commands'] = common.OSD_IMPLEMENTED_COMMANDS
559 else:
560 osd['valid_commands'] = []
561
562 # Filter by pool
563 if pool_id:
564 pool_id = int(pool_id)
565 osds = filter(
566 lambda x: pool_id in x['pools'],
567 osds
568 )
569
570 return osds
571
572
573 def get_osd_by_id(self, osd_id):
574 osd = filter(
575 lambda x: x['osd'] == osd_id,
576 self.get('osd_map')['osds']
577 )
578
579 if len(osd) != 1:
580 return None
581
582 return osd[0]
583
584
585 def get_pool_by_id(self, pool_id):
586 pool = filter(
587 lambda x: x['pool'] == pool_id,
588 self.get('osd_map')['pools'],
589 )
590
591 if len(pool) != 1:
592 return None
593
594 return pool[0]
595
596
597 def submit_request(self, _request, **kwargs):
598 request = CommandsRequest(_request)
599 with self.requests_lock:
600 self.requests.append(request)
601 if kwargs.get('wait', 0):
602 while not request.is_finished():
603 time.sleep(0.001)
604 return request
605
606
607 def run_command(self, command):
608 # tag with 'seq' so that we can ingore these in notify function
609 result = CommandResult('seq')
610
611 self.send_command(result, 'mon', '', json.dumps(command), 'seq')
612 return result.wait()