]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_util.py
42e00da2aa08a20997e13e2dcbdb87b22fb28e8c
18 COLOR_SEQ
= "\033[1;%dm"
19 COLOR_DARK_SEQ
= "\033[0;%dm"
21 UNDERLINE_SEQ
= "\033[4m"
23 logger
= logging
.getLogger(__name__
)
26 def colorize(msg
, color
, dark
=False):
28 Decorate `msg` with escape sequences to give the requested color
30 return (COLOR_DARK_SEQ
if dark
else COLOR_SEQ
) % (30 + color
) \
36 Decorate `msg` with escape sequences to make it appear bold
38 return BOLD_SEQ
+ msg
+ RESET_SEQ
41 def format_units(n
, width
, colored
, decimal
):
43 Format a number without units, so as to fit into `width` characters, substituting
44 an appropriate unit suffix.
46 Use decimal for dimensionless things, use base 2 (decimal=False) for byte sizes/rates.
49 factor
= 1000 if decimal
else 1024
50 units
= [' ', 'k', 'M', 'G', 'T', 'P', 'E']
52 while len("%s" % (int(n
) // (factor
**unit
))) > width
- 1:
56 truncated_float
= ("%f" % (n
/ (float(factor
) ** unit
)))[0:width
- 1]
57 if truncated_float
[-1] == '.':
58 truncated_float
= " " + truncated_float
[0:-1]
60 truncated_float
= "%{wid}d".format(wid
=width
- 1) % n
61 formatted
= "%s%s" % (truncated_float
, units
[unit
])
68 return bold(colorize(formatted
[0:-1], color
[0], color
[1])) \
69 + bold(colorize(formatted
[-1], BLACK
, False))
74 def format_dimless(n
, width
, colored
=True):
75 return format_units(n
, width
, colored
, decimal
=True)
78 def format_bytes(n
, width
, colored
=True):
79 return format_units(n
, width
, colored
, decimal
=False)
82 def merge_dicts(*args
):
83 # type: (dict) -> dict
85 >>> assert merge_dicts({1:2}, {3:4}) == {1:2, 3:4}
86 You can also overwrite keys:
87 >>> assert merge_dicts({1:2}, {1:4}) == {1:4}
88 :rtype: dict[str, Any]
96 def get_default_addr():
97 def is_ipv6_enabled():
99 sock
= socket
.socket(socket
.AF_INET6
)
100 with contextlib
.closing(sock
):
101 sock
.bind(("::1", 0))
103 except (AttributeError, socket
.error
) as e
:
107 return get_default_addr
.result
108 except AttributeError:
109 result
= '::' if is_ipv6_enabled() else '0.0.0.0'
110 get_default_addr
.result
= result
114 class ServerConfigException(Exception):
117 def verify_cacrt(cert_fname
):
118 """Basic validation of a ca cert"""
121 raise ServerConfigException("CA cert not configured")
122 if not os
.path
.isfile(cert_fname
):
123 raise ServerConfigException("Certificate {} does not exist".format(cert_fname
))
125 from OpenSSL
import crypto
127 with
open(cert_fname
) as f
:
128 x509
= crypto
.load_certificate(crypto
.FILETYPE_PEM
, f
.read())
129 if x509
.has_expired():
131 'Certificate {} has expired'.format(cert_fname
))
132 except (ValueError, crypto
.Error
) as e
:
133 raise ServerConfigException(
134 'Invalid certificate {}: {}'.format(cert_fname
, str(e
)))
137 def verify_tls_files(cert_fname
, pkey_fname
):
138 """Basic checks for TLS certificate and key files
140 Do some validations to the private key and certificate:
141 - Check the type and format
142 - Check the certificate expiration date
143 - Check the consistency of the private key
144 - Check that the private key and certificate match up
146 :param cert_fname: Name of the certificate file
147 :param pkey_fname: name of the certificate public key file
149 :raises ServerConfigException: An error with a message
153 if not cert_fname
or not pkey_fname
:
154 raise ServerConfigException('no certificate configured')
156 verify_cacrt(cert_fname
)
158 if not os
.path
.isfile(pkey_fname
):
159 raise ServerConfigException('private key %s does not exist' % pkey_fname
)
161 from OpenSSL
import crypto
, SSL
164 with
open(pkey_fname
) as f
:
165 pkey
= crypto
.load_privatekey(crypto
.FILETYPE_PEM
, f
.read())
167 except (ValueError, crypto
.Error
) as e
:
168 raise ServerConfigException(
169 'Invalid private key {}: {}'.format(pkey_fname
, str(e
)))
171 context
= SSL
.Context(SSL
.TLSv1_METHOD
)
172 context
.use_certificate_file(cert_fname
, crypto
.FILETYPE_PEM
)
173 context
.use_privatekey_file(pkey_fname
, crypto
.FILETYPE_PEM
)
174 context
.check_privatekey()
175 except crypto
.Error
as e
:
177 'Private key {} and certificate {} do not match up: {}'.format(
178 pkey_fname
, cert_fname
, str(e
)))