]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_util.py
import ceph 14.2.5
[ceph.git] / ceph / src / pybind / mgr / mgr_util.py
1 import contextlib
2 import os
3 import socket
4 import logging
5
6 (
7 BLACK,
8 RED,
9 GREEN,
10 YELLOW,
11 BLUE,
12 MAGENTA,
13 CYAN,
14 GRAY
15 ) = range(8)
16
17 RESET_SEQ = "\033[0m"
18 COLOR_SEQ = "\033[1;%dm"
19 COLOR_DARK_SEQ = "\033[0;%dm"
20 BOLD_SEQ = "\033[1m"
21 UNDERLINE_SEQ = "\033[4m"
22
23 logger = logging.getLogger(__name__)
24
25
26 def colorize(msg, color, dark=False):
27 """
28 Decorate `msg` with escape sequences to give the requested color
29 """
30 return (COLOR_DARK_SEQ if dark else COLOR_SEQ) % (30 + color) \
31 + msg + RESET_SEQ
32
33
34 def bold(msg):
35 """
36 Decorate `msg` with escape sequences to make it appear bold
37 """
38 return BOLD_SEQ + msg + RESET_SEQ
39
40
41 def format_units(n, width, colored, decimal):
42 """
43 Format a number without units, so as to fit into `width` characters, substituting
44 an appropriate unit suffix.
45
46 Use decimal for dimensionless things, use base 2 (decimal=False) for byte sizes/rates.
47 """
48
49 factor = 1000 if decimal else 1024
50 units = [' ', 'k', 'M', 'G', 'T', 'P', 'E']
51 unit = 0
52 while len("%s" % (int(n) // (factor**unit))) > width - 1:
53 unit += 1
54
55 if unit > 0:
56 truncated_float = ("%f" % (n / (float(factor) ** unit)))[0:width - 1]
57 if truncated_float[-1] == '.':
58 truncated_float = " " + truncated_float[0:-1]
59 else:
60 truncated_float = "%{wid}d".format(wid=width - 1) % n
61 formatted = "%s%s" % (truncated_float, units[unit])
62
63 if colored:
64 if n == 0:
65 color = BLACK, False
66 else:
67 color = YELLOW, False
68 return bold(colorize(formatted[0:-1], color[0], color[1])) \
69 + bold(colorize(formatted[-1], BLACK, False))
70 else:
71 return formatted
72
73
74 def format_dimless(n, width, colored=True):
75 return format_units(n, width, colored, decimal=True)
76
77
78 def format_bytes(n, width, colored=True):
79 return format_units(n, width, colored, decimal=False)
80
81
82 def merge_dicts(*args):
83 # type: (dict) -> dict
84 """
85 >>> assert merge_dicts({1:2}, {3:4}) == {1:2, 3:4}
86 You can also overwrite keys:
87 >>> assert merge_dicts({1:2}, {1:4}) == {1:4}
88 :rtype: dict[str, Any]
89 """
90 ret = {}
91 for arg in args:
92 ret.update(arg)
93 return ret
94
95
96 def get_default_addr():
97 def is_ipv6_enabled():
98 try:
99 sock = socket.socket(socket.AF_INET6)
100 with contextlib.closing(sock):
101 sock.bind(("::1", 0))
102 return True
103 except (AttributeError, socket.error) as e:
104 return False
105
106 try:
107 return get_default_addr.result
108 except AttributeError:
109 result = '::' if is_ipv6_enabled() else '0.0.0.0'
110 get_default_addr.result = result
111 return result
112
113
114 class ServerConfigException(Exception):
115 pass
116
117 def verify_cacrt(cert_fname):
118 """Basic validation of a ca cert"""
119
120 if not cert_fname:
121 raise ServerConfigException("CA cert not configured")
122 if not os.path.isfile(cert_fname):
123 raise ServerConfigException("Certificate {} does not exist".format(cert_fname))
124
125 from OpenSSL import crypto
126 try:
127 with open(cert_fname) as f:
128 x509 = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
129 if x509.has_expired():
130 logger.warning(
131 'Certificate {} has expired'.format(cert_fname))
132 except (ValueError, crypto.Error) as e:
133 raise ServerConfigException(
134 'Invalid certificate {}: {}'.format(cert_fname, str(e)))
135
136
137 def verify_tls_files(cert_fname, pkey_fname):
138 """Basic checks for TLS certificate and key files
139
140 Do some validations to the private key and certificate:
141 - Check the type and format
142 - Check the certificate expiration date
143 - Check the consistency of the private key
144 - Check that the private key and certificate match up
145
146 :param cert_fname: Name of the certificate file
147 :param pkey_fname: name of the certificate public key file
148
149 :raises ServerConfigException: An error with a message
150
151 """
152
153 if not cert_fname or not pkey_fname:
154 raise ServerConfigException('no certificate configured')
155
156 verify_cacrt(cert_fname)
157
158 if not os.path.isfile(pkey_fname):
159 raise ServerConfigException('private key %s does not exist' % pkey_fname)
160
161 from OpenSSL import crypto, SSL
162
163 try:
164 with open(pkey_fname) as f:
165 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
166 pkey.check()
167 except (ValueError, crypto.Error) as e:
168 raise ServerConfigException(
169 'Invalid private key {}: {}'.format(pkey_fname, str(e)))
170 try:
171 context = SSL.Context(SSL.TLSv1_METHOD)
172 context.use_certificate_file(cert_fname, crypto.FILETYPE_PEM)
173 context.use_privatekey_file(pkey_fname, crypto.FILETYPE_PEM)
174 context.check_privatekey()
175 except crypto.Error as e:
176 logger.warning(
177 'Private key {} and certificate {} do not match up: {}'.format(
178 pkey_fname, cert_fname, str(e)))