]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/mgr_util.py
update ceph source to reef 18.2.0
[ceph.git] / ceph / src / pybind / mgr / mgr_util.py
CommitLineData
f67539c2
TL
1import os
2
3if 'UNITTEST' in os.environ:
4 import tests
5
1e59de90 6import bcrypt
f67539c2 7import cephfs
494da23a 8import contextlib
9f95a23c 9import datetime
f67539c2 10import errno
494da23a 11import socket
f6b5b4d7 12import time
f67539c2
TL
13import logging
14import sys
15from threading import Lock, Condition, Event
39ae355f 16from typing import no_type_check, NewType
522d829b 17import urllib
f6b5b4d7 18from functools import wraps
f67539c2
TL
19if sys.version_info >= (3, 3):
20 from threading import Timer
21else:
22 from threading import _Timer as Timer
23
24from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator
522d829b
TL
25
26from ceph.deployment.utils import wrap_ipv6
27
f67539c2 28T = TypeVar('T')
11fdf7f2 29
f67539c2
TL
30if TYPE_CHECKING:
31 from mgr_module import MgrModule
32
39ae355f
TL
33ConfEntity = NewType('ConfEntity', str)
34
f67539c2 35Module_T = TypeVar('Module_T', bound="MgrModule")
9f95a23c 36
11fdf7f2
TL
37(
38 BLACK,
39 RED,
40 GREEN,
41 YELLOW,
42 BLUE,
43 MAGENTA,
44 CYAN,
45 GRAY
46) = range(8)
47
48RESET_SEQ = "\033[0m"
49COLOR_SEQ = "\033[1;%dm"
50COLOR_DARK_SEQ = "\033[0;%dm"
51BOLD_SEQ = "\033[1m"
52UNDERLINE_SEQ = "\033[4m"
53
eafe8130
TL
54logger = logging.getLogger(__name__)
55
11fdf7f2 56
1e59de90
TL
57class PortAlreadyInUse(Exception):
58 pass
59
60
f67539c2
TL
61class CephfsConnectionException(Exception):
62 def __init__(self, error_code: int, error_message: str):
63 self.errno = error_code
64 self.error_str = error_message
65
66 def to_tuple(self) -> Tuple[int, str, str]:
67 return self.errno, "", self.error_str
68
69 def __str__(self) -> str:
70 return "{0} ({1})".format(self.errno, self.error_str)
71
72class RTimer(Timer):
73 """
74 recurring timer variant of Timer
75 """
76 @no_type_check
77 def run(self):
78 try:
79 while not self.finished.is_set():
80 self.finished.wait(self.interval)
81 self.function(*self.args, **self.kwargs)
82 self.finished.set()
83 except Exception as e:
84 logger.error("task exception: %s", e)
85 raise
86
87@contextlib.contextmanager
88def lock_timeout_log(lock: Lock, timeout: int = 5) -> Iterator[None]:
89 start = time.time()
90 WARN_AFTER = 30
91 warned = False
92 while True:
93 logger.debug("locking {} with {} timeout".format(lock, timeout))
94 if lock.acquire(timeout=timeout):
95 logger.debug("locked {}".format(lock))
96 yield
97 lock.release()
98 break
99 now = time.time()
100 if not warned and now - start > WARN_AFTER:
101 logger.info("possible deadlock acquiring {}".format(lock))
102 warned = True
103
104
105class CephfsConnectionPool(object):
106 class Connection(object):
107 def __init__(self, mgr: Module_T, fs_name: str):
108 self.fs: Optional["cephfs.LibCephFS"] = None
109 self.mgr = mgr
110 self.fs_name = fs_name
111 self.ops_in_progress = 0
112 self.last_used = time.time()
113 self.fs_id = self.get_fs_id()
114
115 def get_fs_id(self) -> int:
116 fs_map = self.mgr.get('fs_map')
117 for fs in fs_map['filesystems']:
118 if fs['mdsmap']['fs_name'] == self.fs_name:
119 return fs['id']
120 raise CephfsConnectionException(
121 -errno.ENOENT, "FS '{0}' not found".format(self.fs_name))
122
123 def get_fs_handle(self) -> "cephfs.LibCephFS":
124 self.last_used = time.time()
125 self.ops_in_progress += 1
126 return self.fs
127
128 def put_fs_handle(self, notify: Callable) -> None:
129 assert self.ops_in_progress > 0
130 self.ops_in_progress -= 1
131 if self.ops_in_progress == 0:
132 notify()
133
134 def del_fs_handle(self, waiter: Optional[Callable]) -> None:
135 if waiter:
136 while self.ops_in_progress != 0:
137 waiter()
138 if self.is_connection_valid():
139 self.disconnect()
140 else:
141 self.abort()
142
143 def is_connection_valid(self) -> bool:
144 fs_id = None
145 try:
146 fs_id = self.get_fs_id()
147 except:
148 # the filesystem does not exist now -- connection is not valid.
149 pass
150 logger.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
151 return self.fs_id == fs_id
152
153 def is_connection_idle(self, timeout: float) -> bool:
154 return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
155
156 def connect(self) -> None:
157 assert self.ops_in_progress == 0
158 logger.debug("Connecting to cephfs '{0}'".format(self.fs_name))
159 self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
160 logger.debug("Setting user ID and group ID of CephFS mount as root...")
161 self.fs.conf_set("client_mount_uid", "0")
162 self.fs.conf_set("client_mount_gid", "0")
b3b6e05e 163 self.fs.conf_set("client_check_pool_perm", "false")
2a845540 164 self.fs.conf_set("client_quota", "false")
f67539c2
TL
165 logger.debug("CephFS initializing...")
166 self.fs.init()
167 logger.debug("CephFS mounting...")
168 self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
169 logger.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
170 self.mgr._ceph_register_client(self.fs.get_addrs())
171
172 def disconnect(self) -> None:
173 try:
174 assert self.fs
175 assert self.ops_in_progress == 0
176 logger.info("disconnecting from cephfs '{0}'".format(self.fs_name))
177 addrs = self.fs.get_addrs()
178 self.fs.shutdown()
179 self.mgr._ceph_unregister_client(addrs)
180 self.fs = None
181 except Exception as e:
182 logger.debug("disconnect: ({0})".format(e))
183 raise
184
185 def abort(self) -> None:
186 assert self.fs
187 assert self.ops_in_progress == 0
188 logger.info("aborting connection from cephfs '{0}'".format(self.fs_name))
189 self.fs.abort_conn()
190 logger.info("abort done from cephfs '{0}'".format(self.fs_name))
191 self.fs = None
192
193 # TODO: make this configurable
194 TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
195 CONNECTION_IDLE_INTERVAL = 60.0 # seconds
522d829b 196 MAX_CONCURRENT_CONNECTIONS = 5 # max number of concurrent connections per volume
f67539c2
TL
197
198 def __init__(self, mgr: Module_T):
199 self.mgr = mgr
522d829b 200 self.connections: Dict[str, List[CephfsConnectionPool.Connection]] = {}
f67539c2
TL
201 self.lock = Lock()
202 self.cond = Condition(self.lock)
203 self.timer_task = RTimer(CephfsConnectionPool.TIMER_TASK_RUN_INTERVAL,
204 self.cleanup_connections)
205 self.timer_task.start()
206
207 def cleanup_connections(self) -> None:
208 with self.lock:
209 logger.info("scanning for idle connections..")
522d829b
TL
210 idle_conns = []
211 for fs_name, connections in self.connections.items():
212 logger.debug(f'fs_name ({fs_name}) connections ({connections})')
213 for connection in connections:
214 if connection.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL):
215 idle_conns.append((fs_name, connection))
216 logger.info(f'cleaning up connections: {idle_conns}')
217 for idle_conn in idle_conns:
218 self._del_connection(idle_conn[0], idle_conn[1])
f67539c2
TL
219
220 def get_fs_handle(self, fs_name: str) -> "cephfs.LibCephFS":
221 with self.lock:
f67539c2 222 try:
522d829b
TL
223 min_shared = 0
224 shared_connection = None
225 connections = self.connections.setdefault(fs_name, [])
226 logger.debug(f'[get] volume: ({fs_name}) connection: ({connections})')
227 if connections:
228 min_shared = connections[0].ops_in_progress
229 shared_connection = connections[0]
230 for connection in list(connections):
231 logger.debug(f'[get] connection: {connection} usage: {connection.ops_in_progress}')
232 if connection.ops_in_progress == 0:
233 if connection.is_connection_valid():
234 logger.debug(f'[get] connection ({connection}) can be reused')
235 return connection.get_fs_handle()
236 else:
237 # filesystem id changed beneath us (or the filesystem does not exist).
238 # this is possible if the filesystem got removed (and recreated with
239 # same name) via "ceph fs rm/new" mon command.
240 logger.warning(f'[get] filesystem id changed for volume ({fs_name}), disconnecting ({connection})')
241 # note -- this will mutate @connections too
242 self._del_connection(fs_name, connection)
f67539c2 243 else:
522d829b
TL
244 if connection.ops_in_progress < min_shared:
245 min_shared = connection.ops_in_progress
246 shared_connection = connection
247 # when we end up here, there are no "free" connections. so either spin up a new
248 # one or share it.
249 if len(connections) < CephfsConnectionPool.MAX_CONCURRENT_CONNECTIONS:
250 logger.debug('[get] spawning new connection since no connection is unused and we still have room for more')
251 connection = CephfsConnectionPool.Connection(self.mgr, fs_name)
252 connection.connect()
253 self.connections[fs_name].append(connection)
254 return connection.get_fs_handle()
255 else:
256 assert shared_connection is not None
257 logger.debug(f'[get] using shared connection ({shared_connection})')
258 return shared_connection.get_fs_handle()
f67539c2
TL
259 except cephfs.Error as e:
260 # try to provide a better error string if possible
261 if e.args[0] == errno.ENOENT:
262 raise CephfsConnectionException(
263 -errno.ENOENT, "FS '{0}' not found".format(fs_name))
264 raise CephfsConnectionException(-e.args[0], e.args[1])
f67539c2 265
522d829b 266 def put_fs_handle(self, fs_name: str, fs_handle: cephfs.LibCephFS) -> None:
f67539c2 267 with self.lock:
522d829b
TL
268 connections = self.connections.get(fs_name, [])
269 for connection in connections:
270 if connection.fs == fs_handle:
271 logger.debug(f'[put] connection: {connection} usage: {connection.ops_in_progress}')
272 connection.put_fs_handle(notify=lambda: self.cond.notifyAll())
f67539c2 273
522d829b
TL
274 def _del_connection(self, fs_name: str, connection: Connection, wait: bool = False) -> None:
275 self.connections[fs_name].remove(connection)
276 connection.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
f67539c2 277
522d829b
TL
278 def _del_connections(self, fs_name: str, wait: bool = False) -> None:
279 for connection in list(self.connections.get(fs_name, [])):
280 self._del_connection(fs_name, connection, wait)
281
282 def del_connections(self, fs_name: str, wait: bool = False) -> None:
f67539c2 283 with self.lock:
522d829b 284 self._del_connections(fs_name, wait)
f67539c2 285
522d829b 286 def del_all_connections(self) -> None:
f67539c2
TL
287 with self.lock:
288 for fs_name in list(self.connections.keys()):
289 logger.info("waiting for pending ops for '{}'".format(fs_name))
522d829b 290 self._del_connections(fs_name, wait=True)
f67539c2
TL
291 logger.info("pending ops completed for '{}'".format(fs_name))
292 # no new connections should have been initialized since its
293 # guarded on shutdown.
294 assert len(self.connections) == 0
295
296
297class CephfsClient(Generic[Module_T]):
298 def __init__(self, mgr: Module_T):
299 self.mgr = mgr
f67539c2
TL
300 self.connection_pool = CephfsConnectionPool(self.mgr)
301
f67539c2
TL
302 def shutdown(self) -> None:
303 logger.info("shutting down")
f67539c2 304 # second, delete all libcephfs handles from connection pool
522d829b 305 self.connection_pool.del_all_connections()
f67539c2
TL
306
307 def get_fs(self, fs_name: str) -> Optional["cephfs.LibCephFS"]:
308 fs_map = self.mgr.get('fs_map')
309 for fs in fs_map['filesystems']:
310 if fs['mdsmap']['fs_name'] == fs_name:
311 return fs
312 return None
313
314 def get_mds_names(self, fs_name: str) -> List[str]:
315 fs = self.get_fs(fs_name)
316 if fs is None:
317 return []
318 return [mds['name'] for mds in fs['mdsmap']['info'].values()]
319
320 def get_metadata_pool(self, fs_name: str) -> Optional[str]:
321 fs = self.get_fs(fs_name)
322 if fs:
323 return fs['mdsmap']['metadata_pool']
324 return None
325
1d09f67e
TL
326 def get_all_filesystems(self) -> List[str]:
327 fs_list: List[str] = []
328 fs_map = self.mgr.get('fs_map')
329 if fs_map['filesystems']:
330 for fs in fs_map['filesystems']:
331 fs_list.append(fs['mdsmap']['fs_name'])
332 return fs_list
333
334
f67539c2
TL
335
336@contextlib.contextmanager
337def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCephFS", None, None]:
338 """
339 Open a volume with shared access.
340 This API is to be used as a context manager.
341
342 :param fsc: cephfs client instance
343 :param fs_name: fs name
344 :return: yields a fs handle (ceph filesystem handle)
345 """
f67539c2
TL
346 fs_handle = fsc.connection_pool.get_fs_handle(fs_name)
347 try:
348 yield fs_handle
349 finally:
522d829b 350 fsc.connection_pool.put_fs_handle(fs_name, fs_handle)
f67539c2
TL
351
352
353def colorize(msg: str, color: int, dark: bool = False) -> str:
11fdf7f2
TL
354 """
355 Decorate `msg` with escape sequences to give the requested color
356 """
357 return (COLOR_DARK_SEQ if dark else COLOR_SEQ) % (30 + color) \
358 + msg + RESET_SEQ
359
360
f67539c2 361def bold(msg: str) -> str:
11fdf7f2
TL
362 """
363 Decorate `msg` with escape sequences to make it appear bold
364 """
365 return BOLD_SEQ + msg + RESET_SEQ
366
367
f67539c2 368def format_units(n: int, width: int, colored: bool, decimal: bool) -> str:
11fdf7f2
TL
369 """
370 Format a number without units, so as to fit into `width` characters, substituting
371 an appropriate unit suffix.
372
373 Use decimal for dimensionless things, use base 2 (decimal=False) for byte sizes/rates.
374 """
375
376 factor = 1000 if decimal else 1024
377 units = [' ', 'k', 'M', 'G', 'T', 'P', 'E']
378 unit = 0
379 while len("%s" % (int(n) // (factor**unit))) > width - 1:
380 unit += 1
381
382 if unit > 0:
383 truncated_float = ("%f" % (n / (float(factor) ** unit)))[0:width - 1]
384 if truncated_float[-1] == '.':
385 truncated_float = " " + truncated_float[0:-1]
386 else:
387 truncated_float = "%{wid}d".format(wid=width - 1) % n
388 formatted = "%s%s" % (truncated_float, units[unit])
389
390 if colored:
391 if n == 0:
392 color = BLACK, False
393 else:
394 color = YELLOW, False
395 return bold(colorize(formatted[0:-1], color[0], color[1])) \
f67539c2 396 + bold(colorize(formatted[-1], YELLOW, False))
11fdf7f2
TL
397 else:
398 return formatted
399
400
f67539c2 401def format_dimless(n: int, width: int, colored: bool = False) -> str:
11fdf7f2
TL
402 return format_units(n, width, colored, decimal=True)
403
404
f67539c2 405def format_bytes(n: int, width: int, colored: bool = False) -> str:
11fdf7f2 406 return format_units(n, width, colored, decimal=False)
81eedcae
TL
407
408
1e59de90
TL
409def test_port_allocation(addr: str, port: int) -> None:
410 """Checks if the port is available
411 :raises PortAlreadyInUse: in case port is already in use
412 :raises Exception: any generic error other than port already in use
413 If no exception is raised, the port can be assumed available
414 """
415 try:
416 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
417 sock.bind((addr, port))
418 sock.close()
419 except socket.error as e:
420 if e.errno == errno.EADDRINUSE:
421 raise PortAlreadyInUse
422 else:
423 raise e
424
425
f67539c2 426def merge_dicts(*args: Dict[T, Any]) -> Dict[T, Any]:
81eedcae 427 """
9f95a23c
TL
428 >>> merge_dicts({1:2}, {3:4})
429 {1: 2, 3: 4}
430
431 You can also overwrite keys:
432 >>> merge_dicts({1:2}, {1:4})
433 {1: 4}
434
81eedcae
TL
435 :rtype: dict[str, Any]
436 """
437 ret = {}
438 for arg in args:
439 ret.update(arg)
440 return ret
494da23a
TL
441
442
443def get_default_addr():
9f95a23c 444 # type: () -> str
f67539c2 445 def is_ipv6_enabled() -> bool:
494da23a
TL
446 try:
447 sock = socket.socket(socket.AF_INET6)
448 with contextlib.closing(sock):
449 sock.bind(("::1", 0))
450 return True
f67539c2
TL
451 except (AttributeError, socket.error):
452 return False
494da23a
TL
453
454 try:
9f95a23c 455 return get_default_addr.result # type: ignore
494da23a
TL
456 except AttributeError:
457 result = '::' if is_ipv6_enabled() else '0.0.0.0'
9f95a23c 458 get_default_addr.result = result # type: ignore
494da23a
TL
459 return result
460
eafe8130 461
522d829b
TL
462def build_url(host: str, scheme: Optional[str] = None, port: Optional[int] = None, path: str = '') -> str:
463 """
464 Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets
465 automatically.
466
467 >>> build_url('example.com', 'https', 443)
468 'https://example.com:443'
469
470 >>> build_url(host='example.com', port=443)
471 '//example.com:443'
472
473 >>> build_url('fce:9af7:a667:7286:4917:b8d3:34df:8373', port=80, scheme='http')
474 'http://[fce:9af7:a667:7286:4917:b8d3:34df:8373]:80'
475
476 >>> build_url('example.com', 'https', 443, path='/metrics')
477 'https://example.com:443/metrics'
478
479
480 :param scheme: The scheme, e.g. http, https or ftp.
481 :type scheme: str
482 :param host: Consisting of either a registered name (including but not limited to
483 a hostname) or an IP address.
484 :type host: str
485 :type port: int
486 :rtype: str
487 """
488 netloc = wrap_ipv6(host)
489 if port:
490 netloc += ':{}'.format(port)
491 pr = urllib.parse.ParseResult(
492 scheme=scheme if scheme else '',
493 netloc=netloc,
494 path=path,
495 params='',
496 query='',
497 fragment='')
498 return pr.geturl()
499
500
eafe8130
TL
501class ServerConfigException(Exception):
502 pass
503
9f95a23c 504
f67539c2
TL
505def create_self_signed_cert(organisation: str = 'Ceph',
506 common_name: str = 'mgr',
507 dname: Optional[Dict[str, str]] = None) -> Tuple[str, str]:
9f95a23c 508 """Returns self-signed PEM certificates valid for 10 years.
1e59de90 509
f67539c2
TL
510 The optional dname parameter provides complete control of the cert/key
511 creation by supporting all valid RDNs via a dictionary. However, if dname
512 is not provided the default O and CN settings will be applied.
513
514 :param organisation: String representing the Organisation(O) RDN (default='Ceph')
515 :param common_name: String representing the Common Name(CN) RDN (default='mgr')
516 :param dname: Optional dictionary containing RDNs to use for crt/key generation
517
518 :return: ssl crt and key in utf-8 format
519
520 :raises ValueError: if the dname parameter received contains invalid RDNs
521
9f95a23c
TL
522 """
523
524 from OpenSSL import crypto
525 from uuid import uuid4
526
f67539c2
TL
527 # RDN = Relative Distinguished Name
528 valid_RDN_list = ['C', 'ST', 'L', 'O', 'OU', 'CN', 'emailAddress']
529
9f95a23c
TL
530 # create a key pair
531 pkey = crypto.PKey()
532 pkey.generate_key(crypto.TYPE_RSA, 2048)
533
f67539c2
TL
534 # Create a "subject" object
535 req = crypto.X509Req()
536 subj = req.get_subject()
537
538 if dname:
539 # dname received, so check it contains valid RDNs
540 if not all(field in valid_RDN_list for field in dname):
541 raise ValueError("Invalid DNAME received. Valid DNAME fields are {}".format(', '.join(valid_RDN_list)))
542 else:
543 dname = {"O": organisation, "CN": common_name}
544
545 # populate the subject with the dname settings
546 for k, v in dname.items():
547 setattr(subj, k, v)
548
9f95a23c
TL
549 # create a self-signed cert
550 cert = crypto.X509()
f67539c2 551 cert.set_subject(req.get_subject())
9f95a23c
TL
552 cert.set_serial_number(int(uuid4()))
553 cert.gmtime_adj_notBefore(0)
554 cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
555 cert.set_issuer(cert.get_subject())
556 cert.set_pubkey(pkey)
557 cert.sign(pkey, 'sha512')
558
559 cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
560 pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
561
562 return cert.decode('utf-8'), pkey.decode('utf-8')
563
564
565def verify_cacrt_content(crt):
566 # type: (str) -> None
567 from OpenSSL import crypto
568 try:
1e59de90
TL
569 crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
570 x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
9f95a23c 571 if x509.has_expired():
1e59de90
TL
572 org, cn = get_cert_issuer_info(crt)
573 no_after = x509.get_notAfter()
574 end_date = None
575 if no_after is not None:
576 end_date = datetime.datetime.strptime(no_after.decode('ascii'), '%Y%m%d%H%M%SZ')
577 msg = f'Certificate issued by "{org}/{cn}" expired on {end_date}'
578 logger.warning(msg)
579 raise ServerConfigException(msg)
9f95a23c 580 except (ValueError, crypto.Error) as e:
1e59de90 581 raise ServerConfigException(f'Invalid certificate: {e}')
9f95a23c
TL
582
583
eafe8130 584def verify_cacrt(cert_fname):
9f95a23c 585 # type: (str) -> None
eafe8130
TL
586 """Basic validation of a ca cert"""
587
588 if not cert_fname:
589 raise ServerConfigException("CA cert not configured")
590 if not os.path.isfile(cert_fname):
591 raise ServerConfigException("Certificate {} does not exist".format(cert_fname))
592
eafe8130
TL
593 try:
594 with open(cert_fname) as f:
9f95a23c
TL
595 verify_cacrt_content(f.read())
596 except ValueError as e:
eafe8130
TL
597 raise ServerConfigException(
598 'Invalid certificate {}: {}'.format(cert_fname, str(e)))
599
1e59de90
TL
600def get_cert_issuer_info(crt: str) -> Tuple[Optional[str],Optional[str]]:
601 """Basic validation of a ca cert"""
602
603 from OpenSSL import crypto, SSL
604 try:
605 crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
606 (org_name, cn) = (None, None)
607 cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
608 components = cert.get_issuer().get_components()
609 for c in components:
610 if c[0].decode() == 'O': # org comp
611 org_name = c[1].decode()
612 elif c[0].decode() == 'CN': # common name comp
613 cn = c[1].decode()
614 return (org_name, cn)
615 except (ValueError, crypto.Error) as e:
616 raise ServerConfigException(f'Invalid certificate key: {e}')
eafe8130 617
9f95a23c
TL
618def verify_tls(crt, key):
619 # type: (str, str) -> None
620 verify_cacrt_content(crt)
621
622 from OpenSSL import crypto, SSL
623 try:
624 _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
625 _key.check()
626 except (ValueError, crypto.Error) as e:
627 raise ServerConfigException(
628 'Invalid private key: {}'.format(str(e)))
629 try:
1e59de90
TL
630 crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
631 _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
9f95a23c
TL
632 except ValueError as e:
633 raise ServerConfigException(
634 'Invalid certificate key: {}'.format(str(e))
635 )
636
637 try:
638 context = SSL.Context(SSL.TLSv1_METHOD)
639 context.use_certificate(_crt)
640 context.use_privatekey(_key)
641 context.check_privatekey()
642 except crypto.Error as e:
1e59de90
TL
643 logger.warning('Private key and certificate do not match up: {}'.format(str(e)))
644 except SSL.Error as e:
645 raise ServerConfigException(f'Invalid cert/key pair: {e}')
646
9f95a23c
TL
647
648
eafe8130 649def verify_tls_files(cert_fname, pkey_fname):
9f95a23c 650 # type: (str, str) -> None
eafe8130
TL
651 """Basic checks for TLS certificate and key files
652
653 Do some validations to the private key and certificate:
654 - Check the type and format
655 - Check the certificate expiration date
656 - Check the consistency of the private key
657 - Check that the private key and certificate match up
658
659 :param cert_fname: Name of the certificate file
660 :param pkey_fname: name of the certificate public key file
661
662 :raises ServerConfigException: An error with a message
663
664 """
665
666 if not cert_fname or not pkey_fname:
667 raise ServerConfigException('no certificate configured')
668
669 verify_cacrt(cert_fname)
670
671 if not os.path.isfile(pkey_fname):
672 raise ServerConfigException('private key %s does not exist' % pkey_fname)
673
674 from OpenSSL import crypto, SSL
675
676 try:
677 with open(pkey_fname) as f:
678 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
679 pkey.check()
680 except (ValueError, crypto.Error) as e:
681 raise ServerConfigException(
682 'Invalid private key {}: {}'.format(pkey_fname, str(e)))
683 try:
684 context = SSL.Context(SSL.TLSv1_METHOD)
685 context.use_certificate_file(cert_fname, crypto.FILETYPE_PEM)
686 context.use_privatekey_file(pkey_fname, crypto.FILETYPE_PEM)
687 context.check_privatekey()
688 except crypto.Error as e:
689 logger.warning(
690 'Private key {} and certificate {} do not match up: {}'.format(
691 pkey_fname, cert_fname, str(e)))
9f95a23c 692
f67539c2
TL
693
694def get_most_recent_rate(rates: Optional[List[Tuple[float, float]]]) -> float:
9f95a23c
TL
695 """ Get most recent rate from rates
696
697 :param rates: The derivative between all time series data points [time in seconds, value]
698 :type rates: list[tuple[int, float]]
699
700 :return: The last derivative or 0.0 if none exists
701 :rtype: float
702
703 >>> get_most_recent_rate(None)
704 0.0
705 >>> get_most_recent_rate([])
706 0.0
707 >>> get_most_recent_rate([(1, -2.0)])
708 -2.0
709 >>> get_most_recent_rate([(1, 2.0), (2, 1.5), (3, 5.0)])
710 5.0
711 """
712 if not rates:
713 return 0.0
714 return rates[-1][1]
715
f67539c2 716def get_time_series_rates(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
9f95a23c
TL
717 """ Rates from time series data
718
719 :param data: Time series data [time in seconds, value]
720 :type data: list[tuple[int, float]]
721
722 :return: The derivative between all time series data points [time in seconds, value]
723 :rtype: list[tuple[int, float]]
724
725 >>> logger.debug = lambda s,x,y: print(s % (x,y))
726 >>> get_time_series_rates([])
727 []
728 >>> get_time_series_rates([[0, 1], [1, 3]])
729 [(1, 2.0)]
730 >>> get_time_series_rates([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
731 Duplicate timestamp in time series data: [0, 2], [0, 3]
732 Duplicate timestamp in time series data: [0, 3], [0, 1]
733 Duplicate timestamp in time series data: [1, 2], [1, 3]
734 [(1, 2.0)]
735 >>> get_time_series_rates([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
736 [(2, 2.0), (4, 4.0), (5, 5.0), (6, 6.0)]
737 """
738 data = _filter_time_series(data)
739 if not data:
740 return []
f67539c2 741 return [(data2[0], _derivative(data1, data2) if data1 is not None else 0.0) for data1, data2 in
9f95a23c
TL
742 _pairwise(data)]
743
39ae355f
TL
744def name_to_config_section(name: str) -> ConfEntity:
745 """
746 Map from daemon names to ceph entity names (as seen in config)
747 """
748 daemon_type = name.split('.', 1)[0]
749 if daemon_type in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi']:
750 return ConfEntity('client.' + name)
751 elif daemon_type in ['mon', 'osd', 'mds', 'mgr', 'client']:
752 return ConfEntity(name)
753 else:
754 return ConfEntity('mon')
755
f67539c2
TL
756
757def _filter_time_series(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
9f95a23c
TL
758 """ Filters time series data
759
760 Filters out samples with the same timestamp in given time series data.
761 It also enforces the list to contain at least two samples.
762
763 All filtered values will be shown in the debug log. If values were filtered it's a bug in the
764 time series data collector, please report it.
765
766 :param data: Time series data [time in seconds, value]
767 :type data: list[tuple[int, float]]
768
769 :return: Filtered time series data [time in seconds, value]
770 :rtype: list[tuple[int, float]]
771
772 >>> logger.debug = lambda s,x,y: print(s % (x,y))
773 >>> _filter_time_series([])
774 []
775 >>> _filter_time_series([[1, 42]])
776 []
777 >>> _filter_time_series([[10, 2], [10, 3]])
778 Duplicate timestamp in time series data: [10, 2], [10, 3]
779 []
780 >>> _filter_time_series([[0, 1], [1, 2]])
781 [[0, 1], [1, 2]]
782 >>> _filter_time_series([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
783 Duplicate timestamp in time series data: [0, 2], [0, 3]
784 Duplicate timestamp in time series data: [0, 3], [0, 1]
785 Duplicate timestamp in time series data: [1, 2], [1, 3]
786 [[0, 1], [1, 3]]
787 >>> _filter_time_series([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
788 [[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]]
789 """
790 filtered = []
791 for i in range(len(data) - 1):
792 if data[i][0] == data[i + 1][0]: # Same timestamp
793 logger.debug("Duplicate timestamp in time series data: %s, %s", data[i], data[i + 1])
794 continue
795 filtered.append(data[i])
796 if not filtered:
797 return []
798 filtered.append(data[-1])
799 return filtered
800
f67539c2
TL
801
802def _derivative(p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
9f95a23c
TL
803 """ Derivative between two time series data points
804
805 :param p1: Time series data [time in seconds, value]
806 :type p1: tuple[int, float]
807 :param p2: Time series data [time in seconds, value]
808 :type p2: tuple[int, float]
809
810 :return: Derivative between both points
811 :rtype: float
812
813 >>> _derivative([0, 0], [2, 1])
814 0.5
815 >>> _derivative([0, 1], [2, 0])
816 -0.5
817 >>> _derivative([0, 0], [3, 1])
818 0.3333333333333333
819 """
820 return (p2[1] - p1[1]) / float(p2[0] - p1[0])
821
f67539c2
TL
822
823def _pairwise(iterable: Iterable[T]) -> Generator[Tuple[Optional[T], T], None, None]:
9f95a23c
TL
824 it = iter(iterable)
825 a = next(it, None)
826
827 for b in it:
828 yield (a, b)
829 a = b
830
f67539c2
TL
831
832def to_pretty_timedelta(n: datetime.timedelta) -> str:
9f95a23c 833 if n < datetime.timedelta(seconds=120):
1e59de90 834 return str(int(n.total_seconds())) + 's'
9f95a23c 835 if n < datetime.timedelta(minutes=120):
1e59de90 836 return str(int(n.total_seconds()) // 60) + 'm'
9f95a23c 837 if n < datetime.timedelta(hours=48):
1e59de90 838 return str(int(n.total_seconds()) // 3600) + 'h'
9f95a23c 839 if n < datetime.timedelta(days=14):
1e59de90 840 return str(int(n.total_seconds()) // (3600*24)) + 'd'
9f95a23c 841 if n < datetime.timedelta(days=7*12):
1e59de90 842 return str(int(n.total_seconds()) // (3600*24*7)) + 'w'
9f95a23c 843 if n < datetime.timedelta(days=365*2):
1e59de90
TL
844 return str(int(n.total_seconds()) // (3600*24*30)) + 'M'
845 return str(int(n.total_seconds()) // (3600*24*365)) + 'y'
f6b5b4d7
TL
846
847
f67539c2 848def profile_method(skip_attribute: bool = False) -> Callable[[Callable[..., T]], Callable[..., T]]:
f6b5b4d7
TL
849 """
850 Decorator for methods of the Module class. Logs the name of the given
851 function f with the time it takes to execute it.
852 """
f67539c2 853 def outer(f: Callable[..., T]) -> Callable[..., T]:
f6b5b4d7 854 @wraps(f)
f67539c2 855 def wrapper(*args: Any, **kwargs: Any) -> T:
f6b5b4d7
TL
856 self = args[0]
857 t = time.time()
858 self.log.debug('Starting method {}.'.format(f.__name__))
859 result = f(*args, **kwargs)
860 duration = time.time() - t
861 if not skip_attribute:
862 wrapper._execution_duration = duration # type: ignore
863 self.log.debug('Method {} ran {:.3f} seconds.'.format(f.__name__, duration))
864 return result
865 return wrapper
866 return outer
1e59de90
TL
867
868
869def password_hash(password: Optional[str], salt_password: Optional[str] = None) -> Optional[str]:
870 if not password:
871 return None
872 if not salt_password:
873 salt = bcrypt.gensalt()
874 else:
875 salt = salt_password.encode('utf8')
876 return bcrypt.hashpw(password.encode('utf8'), salt).decode('utf8')