]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_util.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / mgr_util.py
1 import os
2
3 if 'UNITTEST' in os.environ:
4 import tests
5
6 import bcrypt
7 import cephfs
8 import contextlib
9 import datetime
10 import errno
11 import socket
12 import time
13 import logging
14 import sys
15 from threading import Lock, Condition, Event
16 from typing import no_type_check, NewType
17 import urllib
18 from functools import wraps
19 if sys.version_info >= (3, 3):
20 from threading import Timer
21 else:
22 from threading import _Timer as Timer
23
24 from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator
25
26 from ceph.deployment.utils import wrap_ipv6
27
28 T = TypeVar('T')
29
30 if TYPE_CHECKING:
31 from mgr_module import MgrModule
32
33 ConfEntity = NewType('ConfEntity', str)
34
35 Module_T = TypeVar('Module_T', bound="MgrModule")
36
37 (
38 BLACK,
39 RED,
40 GREEN,
41 YELLOW,
42 BLUE,
43 MAGENTA,
44 CYAN,
45 GRAY
46 ) = range(8)
47
48 RESET_SEQ = "\033[0m"
49 COLOR_SEQ = "\033[1;%dm"
50 COLOR_DARK_SEQ = "\033[0;%dm"
51 BOLD_SEQ = "\033[1m"
52 UNDERLINE_SEQ = "\033[4m"
53
54 logger = logging.getLogger(__name__)
55
56
57 class PortAlreadyInUse(Exception):
58 pass
59
60
61 class CephfsConnectionException(Exception):
62 def __init__(self, error_code: int, error_message: str):
63 self.errno = error_code
64 self.error_str = error_message
65
66 def to_tuple(self) -> Tuple[int, str, str]:
67 return self.errno, "", self.error_str
68
69 def __str__(self) -> str:
70 return "{0} ({1})".format(self.errno, self.error_str)
71
72 class RTimer(Timer):
73 """
74 recurring timer variant of Timer
75 """
76 @no_type_check
77 def run(self):
78 try:
79 while not self.finished.is_set():
80 self.finished.wait(self.interval)
81 self.function(*self.args, **self.kwargs)
82 self.finished.set()
83 except Exception as e:
84 logger.error("task exception: %s", e)
85 raise
86
87 @contextlib.contextmanager
88 def lock_timeout_log(lock: Lock, timeout: int = 5) -> Iterator[None]:
89 start = time.time()
90 WARN_AFTER = 30
91 warned = False
92 while True:
93 logger.debug("locking {} with {} timeout".format(lock, timeout))
94 if lock.acquire(timeout=timeout):
95 logger.debug("locked {}".format(lock))
96 yield
97 lock.release()
98 break
99 now = time.time()
100 if not warned and now - start > WARN_AFTER:
101 logger.info("possible deadlock acquiring {}".format(lock))
102 warned = True
103
104
105 class CephfsConnectionPool(object):
106 class Connection(object):
107 def __init__(self, mgr: Module_T, fs_name: str):
108 self.fs: Optional["cephfs.LibCephFS"] = None
109 self.mgr = mgr
110 self.fs_name = fs_name
111 self.ops_in_progress = 0
112 self.last_used = time.time()
113 self.fs_id = self.get_fs_id()
114
115 def get_fs_id(self) -> int:
116 fs_map = self.mgr.get('fs_map')
117 for fs in fs_map['filesystems']:
118 if fs['mdsmap']['fs_name'] == self.fs_name:
119 return fs['id']
120 raise CephfsConnectionException(
121 -errno.ENOENT, "FS '{0}' not found".format(self.fs_name))
122
123 def get_fs_handle(self) -> "cephfs.LibCephFS":
124 self.last_used = time.time()
125 self.ops_in_progress += 1
126 return self.fs
127
128 def put_fs_handle(self, notify: Callable) -> None:
129 assert self.ops_in_progress > 0
130 self.ops_in_progress -= 1
131 if self.ops_in_progress == 0:
132 notify()
133
134 def del_fs_handle(self, waiter: Optional[Callable]) -> None:
135 if waiter:
136 while self.ops_in_progress != 0:
137 waiter()
138 if self.is_connection_valid():
139 self.disconnect()
140 else:
141 self.abort()
142
143 def is_connection_valid(self) -> bool:
144 fs_id = None
145 try:
146 fs_id = self.get_fs_id()
147 except:
148 # the filesystem does not exist now -- connection is not valid.
149 pass
150 logger.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
151 return self.fs_id == fs_id
152
153 def is_connection_idle(self, timeout: float) -> bool:
154 return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
155
156 def connect(self) -> None:
157 assert self.ops_in_progress == 0
158 logger.debug("Connecting to cephfs '{0}'".format(self.fs_name))
159 self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
160 logger.debug("Setting user ID and group ID of CephFS mount as root...")
161 self.fs.conf_set("client_mount_uid", "0")
162 self.fs.conf_set("client_mount_gid", "0")
163 self.fs.conf_set("client_check_pool_perm", "false")
164 self.fs.conf_set("client_quota", "false")
165 logger.debug("CephFS initializing...")
166 self.fs.init()
167 logger.debug("CephFS mounting...")
168 self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
169 logger.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
170 self.mgr._ceph_register_client(None, self.fs.get_addrs(), False)
171
172 def disconnect(self) -> None:
173 try:
174 assert self.fs
175 assert self.ops_in_progress == 0
176 logger.info("disconnecting from cephfs '{0}'".format(self.fs_name))
177 addrs = self.fs.get_addrs()
178 self.fs.shutdown()
179 self.mgr._ceph_unregister_client(None, addrs)
180 self.fs = None
181 except Exception as e:
182 logger.debug("disconnect: ({0})".format(e))
183 raise
184
185 def abort(self) -> None:
186 assert self.fs
187 assert self.ops_in_progress == 0
188 logger.info("aborting connection from cephfs '{0}'".format(self.fs_name))
189 self.fs.abort_conn()
190 logger.info("abort done from cephfs '{0}'".format(self.fs_name))
191 self.fs = None
192
193 # TODO: make this configurable
194 TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
195 CONNECTION_IDLE_INTERVAL = 60.0 # seconds
196 MAX_CONCURRENT_CONNECTIONS = 5 # max number of concurrent connections per volume
197
198 def __init__(self, mgr: Module_T):
199 self.mgr = mgr
200 self.connections: Dict[str, List[CephfsConnectionPool.Connection]] = {}
201 self.lock = Lock()
202 self.cond = Condition(self.lock)
203 self.timer_task = RTimer(CephfsConnectionPool.TIMER_TASK_RUN_INTERVAL,
204 self.cleanup_connections)
205 self.timer_task.start()
206
207 def cleanup_connections(self) -> None:
208 with self.lock:
209 logger.info("scanning for idle connections..")
210 idle_conns = []
211 for fs_name, connections in self.connections.items():
212 logger.debug(f'fs_name ({fs_name}) connections ({connections})')
213 for connection in connections:
214 if connection.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL):
215 idle_conns.append((fs_name, connection))
216 logger.info(f'cleaning up connections: {idle_conns}')
217 for idle_conn in idle_conns:
218 self._del_connection(idle_conn[0], idle_conn[1])
219
220 def get_fs_handle(self, fs_name: str) -> "cephfs.LibCephFS":
221 with self.lock:
222 try:
223 min_shared = 0
224 shared_connection = None
225 connections = self.connections.setdefault(fs_name, [])
226 logger.debug(f'[get] volume: ({fs_name}) connection: ({connections})')
227 if connections:
228 min_shared = connections[0].ops_in_progress
229 shared_connection = connections[0]
230 for connection in list(connections):
231 logger.debug(f'[get] connection: {connection} usage: {connection.ops_in_progress}')
232 if connection.ops_in_progress == 0:
233 if connection.is_connection_valid():
234 logger.debug(f'[get] connection ({connection}) can be reused')
235 return connection.get_fs_handle()
236 else:
237 # filesystem id changed beneath us (or the filesystem does not exist).
238 # this is possible if the filesystem got removed (and recreated with
239 # same name) via "ceph fs rm/new" mon command.
240 logger.warning(f'[get] filesystem id changed for volume ({fs_name}), disconnecting ({connection})')
241 # note -- this will mutate @connections too
242 self._del_connection(fs_name, connection)
243 else:
244 if connection.ops_in_progress < min_shared:
245 min_shared = connection.ops_in_progress
246 shared_connection = connection
247 # when we end up here, there are no "free" connections. so either spin up a new
248 # one or share it.
249 if len(connections) < CephfsConnectionPool.MAX_CONCURRENT_CONNECTIONS:
250 logger.debug('[get] spawning new connection since no connection is unused and we still have room for more')
251 connection = CephfsConnectionPool.Connection(self.mgr, fs_name)
252 connection.connect()
253 self.connections[fs_name].append(connection)
254 return connection.get_fs_handle()
255 else:
256 assert shared_connection is not None
257 logger.debug(f'[get] using shared connection ({shared_connection})')
258 return shared_connection.get_fs_handle()
259 except cephfs.Error as e:
260 # try to provide a better error string if possible
261 if e.args[0] == errno.ENOENT:
262 raise CephfsConnectionException(
263 -errno.ENOENT, "FS '{0}' not found".format(fs_name))
264 raise CephfsConnectionException(-e.args[0], e.args[1])
265
266 def put_fs_handle(self, fs_name: str, fs_handle: cephfs.LibCephFS) -> None:
267 with self.lock:
268 connections = self.connections.get(fs_name, [])
269 for connection in connections:
270 if connection.fs == fs_handle:
271 logger.debug(f'[put] connection: {connection} usage: {connection.ops_in_progress}')
272 connection.put_fs_handle(notify=lambda: self.cond.notifyAll())
273
274 def _del_connection(self, fs_name: str, connection: Connection, wait: bool = False) -> None:
275 self.connections[fs_name].remove(connection)
276 connection.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
277
278 def _del_connections(self, fs_name: str, wait: bool = False) -> None:
279 for connection in list(self.connections.get(fs_name, [])):
280 self._del_connection(fs_name, connection, wait)
281
282 def del_connections(self, fs_name: str, wait: bool = False) -> None:
283 with self.lock:
284 self._del_connections(fs_name, wait)
285
286 def del_all_connections(self) -> None:
287 with self.lock:
288 for fs_name in list(self.connections.keys()):
289 logger.info("waiting for pending ops for '{}'".format(fs_name))
290 self._del_connections(fs_name, wait=True)
291 logger.info("pending ops completed for '{}'".format(fs_name))
292 # no new connections should have been initialized since its
293 # guarded on shutdown.
294 assert len(self.connections) == 0
295
296
297 class CephfsClient(Generic[Module_T]):
298 def __init__(self, mgr: Module_T):
299 self.mgr = mgr
300 self.connection_pool = CephfsConnectionPool(self.mgr)
301
302 def shutdown(self) -> None:
303 logger.info("shutting down")
304 # second, delete all libcephfs handles from connection pool
305 self.connection_pool.del_all_connections()
306
307 def get_fs(self, fs_name: str) -> Optional["cephfs.LibCephFS"]:
308 fs_map = self.mgr.get('fs_map')
309 for fs in fs_map['filesystems']:
310 if fs['mdsmap']['fs_name'] == fs_name:
311 return fs
312 return None
313
314 def get_mds_names(self, fs_name: str) -> List[str]:
315 fs = self.get_fs(fs_name)
316 if fs is None:
317 return []
318 return [mds['name'] for mds in fs['mdsmap']['info'].values()]
319
320 def get_metadata_pool(self, fs_name: str) -> Optional[str]:
321 fs = self.get_fs(fs_name)
322 if fs:
323 return fs['mdsmap']['metadata_pool']
324 return None
325
326 def get_all_filesystems(self) -> List[str]:
327 fs_list: List[str] = []
328 fs_map = self.mgr.get('fs_map')
329 if fs_map['filesystems']:
330 for fs in fs_map['filesystems']:
331 fs_list.append(fs['mdsmap']['fs_name'])
332 return fs_list
333
334
335
336 @contextlib.contextmanager
337 def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCephFS", None, None]:
338 """
339 Open a volume with shared access.
340 This API is to be used as a context manager.
341
342 :param fsc: cephfs client instance
343 :param fs_name: fs name
344 :return: yields a fs handle (ceph filesystem handle)
345 """
346 fs_handle = fsc.connection_pool.get_fs_handle(fs_name)
347 try:
348 yield fs_handle
349 finally:
350 fsc.connection_pool.put_fs_handle(fs_name, fs_handle)
351
352
353 def colorize(msg: str, color: int, dark: bool = False) -> str:
354 """
355 Decorate `msg` with escape sequences to give the requested color
356 """
357 return (COLOR_DARK_SEQ if dark else COLOR_SEQ) % (30 + color) \
358 + msg + RESET_SEQ
359
360
361 def bold(msg: str) -> str:
362 """
363 Decorate `msg` with escape sequences to make it appear bold
364 """
365 return BOLD_SEQ + msg + RESET_SEQ
366
367
368 def format_units(n: int, width: int, colored: bool, decimal: bool) -> str:
369 """
370 Format a number without units, so as to fit into `width` characters, substituting
371 an appropriate unit suffix.
372
373 Use decimal for dimensionless things, use base 2 (decimal=False) for byte sizes/rates.
374 """
375
376 factor = 1000 if decimal else 1024
377 units = [' ', 'k', 'M', 'G', 'T', 'P', 'E']
378 unit = 0
379 while len("%s" % (int(n) // (factor**unit))) > width - 1:
380 unit += 1
381
382 if unit > 0:
383 truncated_float = ("%f" % (n / (float(factor) ** unit)))[0:width - 1]
384 if truncated_float[-1] == '.':
385 truncated_float = " " + truncated_float[0:-1]
386 else:
387 truncated_float = "%{wid}d".format(wid=width - 1) % n
388 formatted = "%s%s" % (truncated_float, units[unit])
389
390 if colored:
391 if n == 0:
392 color = BLACK, False
393 else:
394 color = YELLOW, False
395 return bold(colorize(formatted[0:-1], color[0], color[1])) \
396 + bold(colorize(formatted[-1], YELLOW, False))
397 else:
398 return formatted
399
400
401 def format_dimless(n: int, width: int, colored: bool = False) -> str:
402 return format_units(n, width, colored, decimal=True)
403
404
405 def format_bytes(n: int, width: int, colored: bool = False) -> str:
406 return format_units(n, width, colored, decimal=False)
407
408
409 def test_port_allocation(addr: str, port: int) -> None:
410 """Checks if the port is available
411 :raises PortAlreadyInUse: in case port is already in use
412 :raises Exception: any generic error other than port already in use
413 If no exception is raised, the port can be assumed available
414 """
415 try:
416 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
417 sock.bind((addr, port))
418 sock.close()
419 except socket.error as e:
420 if e.errno == errno.EADDRINUSE:
421 raise PortAlreadyInUse
422 else:
423 raise e
424
425
426 def merge_dicts(*args: Dict[T, Any]) -> Dict[T, Any]:
427 """
428 >>> merge_dicts({1:2}, {3:4})
429 {1: 2, 3: 4}
430
431 You can also overwrite keys:
432 >>> merge_dicts({1:2}, {1:4})
433 {1: 4}
434
435 :rtype: dict[str, Any]
436 """
437 ret = {}
438 for arg in args:
439 ret.update(arg)
440 return ret
441
442
443 def get_default_addr():
444 # type: () -> str
445 def is_ipv6_enabled() -> bool:
446 try:
447 sock = socket.socket(socket.AF_INET6)
448 with contextlib.closing(sock):
449 sock.bind(("::1", 0))
450 return True
451 except (AttributeError, socket.error):
452 return False
453
454 try:
455 return get_default_addr.result # type: ignore
456 except AttributeError:
457 result = '::' if is_ipv6_enabled() else '0.0.0.0'
458 get_default_addr.result = result # type: ignore
459 return result
460
461
462 def build_url(host: str, scheme: Optional[str] = None, port: Optional[int] = None, path: str = '') -> str:
463 """
464 Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets
465 automatically.
466
467 >>> build_url('example.com', 'https', 443)
468 'https://example.com:443'
469
470 >>> build_url(host='example.com', port=443)
471 '//example.com:443'
472
473 >>> build_url('fce:9af7:a667:7286:4917:b8d3:34df:8373', port=80, scheme='http')
474 'http://[fce:9af7:a667:7286:4917:b8d3:34df:8373]:80'
475
476 >>> build_url('example.com', 'https', 443, path='/metrics')
477 'https://example.com:443/metrics'
478
479
480 :param scheme: The scheme, e.g. http, https or ftp.
481 :type scheme: str
482 :param host: Consisting of either a registered name (including but not limited to
483 a hostname) or an IP address.
484 :type host: str
485 :type port: int
486 :rtype: str
487 """
488 netloc = wrap_ipv6(host)
489 if port:
490 netloc += ':{}'.format(port)
491 pr = urllib.parse.ParseResult(
492 scheme=scheme if scheme else '',
493 netloc=netloc,
494 path=path,
495 params='',
496 query='',
497 fragment='')
498 return pr.geturl()
499
500
501 class ServerConfigException(Exception):
502 pass
503
504
505 def create_self_signed_cert(organisation: str = 'Ceph',
506 common_name: str = 'mgr',
507 dname: Optional[Dict[str, str]] = None) -> Tuple[str, str]:
508 """Returns self-signed PEM certificates valid for 10 years.
509
510 The optional dname parameter provides complete control of the cert/key
511 creation by supporting all valid RDNs via a dictionary. However, if dname
512 is not provided the default O and CN settings will be applied.
513
514 :param organisation: String representing the Organisation(O) RDN (default='Ceph')
515 :param common_name: String representing the Common Name(CN) RDN (default='mgr')
516 :param dname: Optional dictionary containing RDNs to use for crt/key generation
517
518 :return: ssl crt and key in utf-8 format
519
520 :raises ValueError: if the dname parameter received contains invalid RDNs
521
522 """
523
524 from OpenSSL import crypto
525 from uuid import uuid4
526
527 # RDN = Relative Distinguished Name
528 valid_RDN_list = ['C', 'ST', 'L', 'O', 'OU', 'CN', 'emailAddress']
529
530 # create a key pair
531 pkey = crypto.PKey()
532 pkey.generate_key(crypto.TYPE_RSA, 2048)
533
534 # Create a "subject" object
535 req = crypto.X509Req()
536 subj = req.get_subject()
537
538 if dname:
539 # dname received, so check it contains valid RDNs
540 if not all(field in valid_RDN_list for field in dname):
541 raise ValueError("Invalid DNAME received. Valid DNAME fields are {}".format(', '.join(valid_RDN_list)))
542 else:
543 dname = {"O": organisation, "CN": common_name}
544
545 # populate the subject with the dname settings
546 for k, v in dname.items():
547 setattr(subj, k, v)
548
549 # create a self-signed cert
550 cert = crypto.X509()
551 cert.set_subject(req.get_subject())
552 cert.set_serial_number(int(uuid4()))
553 cert.gmtime_adj_notBefore(0)
554 cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
555 cert.set_issuer(cert.get_subject())
556 cert.set_pubkey(pkey)
557 cert.sign(pkey, 'sha512')
558
559 cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
560 pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
561
562 return cert.decode('utf-8'), pkey.decode('utf-8')
563
564
565 def verify_cacrt_content(crt):
566 # type: (str) -> None
567 from OpenSSL import crypto
568 try:
569 crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
570 x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
571 if x509.has_expired():
572 org, cn = get_cert_issuer_info(crt)
573 no_after = x509.get_notAfter()
574 end_date = None
575 if no_after is not None:
576 end_date = datetime.datetime.strptime(no_after.decode('ascii'), '%Y%m%d%H%M%SZ')
577 msg = f'Certificate issued by "{org}/{cn}" expired on {end_date}'
578 logger.warning(msg)
579 raise ServerConfigException(msg)
580 except (ValueError, crypto.Error) as e:
581 raise ServerConfigException(f'Invalid certificate: {e}')
582
583
584 def verify_cacrt(cert_fname):
585 # type: (str) -> None
586 """Basic validation of a ca cert"""
587
588 if not cert_fname:
589 raise ServerConfigException("CA cert not configured")
590 if not os.path.isfile(cert_fname):
591 raise ServerConfigException("Certificate {} does not exist".format(cert_fname))
592
593 try:
594 with open(cert_fname) as f:
595 verify_cacrt_content(f.read())
596 except ValueError as e:
597 raise ServerConfigException(
598 'Invalid certificate {}: {}'.format(cert_fname, str(e)))
599
600 def get_cert_issuer_info(crt: str) -> Tuple[Optional[str],Optional[str]]:
601 """Basic validation of a ca cert"""
602
603 from OpenSSL import crypto, SSL
604 try:
605 crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
606 (org_name, cn) = (None, None)
607 cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
608 components = cert.get_issuer().get_components()
609 for c in components:
610 if c[0].decode() == 'O': # org comp
611 org_name = c[1].decode()
612 elif c[0].decode() == 'CN': # common name comp
613 cn = c[1].decode()
614 return (org_name, cn)
615 except (ValueError, crypto.Error) as e:
616 raise ServerConfigException(f'Invalid certificate key: {e}')
617
618 def verify_tls(crt, key):
619 # type: (str, str) -> None
620 verify_cacrt_content(crt)
621
622 from OpenSSL import crypto, SSL
623 try:
624 _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
625 _key.check()
626 except (ValueError, crypto.Error) as e:
627 raise ServerConfigException(
628 'Invalid private key: {}'.format(str(e)))
629 try:
630 crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
631 _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
632 except ValueError as e:
633 raise ServerConfigException(
634 'Invalid certificate key: {}'.format(str(e))
635 )
636
637 try:
638 context = SSL.Context(SSL.TLSv1_METHOD)
639 context.use_certificate(_crt)
640 context.use_privatekey(_key)
641 context.check_privatekey()
642 except crypto.Error as e:
643 logger.warning('Private key and certificate do not match up: {}'.format(str(e)))
644 except SSL.Error as e:
645 raise ServerConfigException(f'Invalid cert/key pair: {e}')
646
647
648
649 def verify_tls_files(cert_fname, pkey_fname):
650 # type: (str, str) -> None
651 """Basic checks for TLS certificate and key files
652
653 Do some validations to the private key and certificate:
654 - Check the type and format
655 - Check the certificate expiration date
656 - Check the consistency of the private key
657 - Check that the private key and certificate match up
658
659 :param cert_fname: Name of the certificate file
660 :param pkey_fname: name of the certificate public key file
661
662 :raises ServerConfigException: An error with a message
663
664 """
665
666 if not cert_fname or not pkey_fname:
667 raise ServerConfigException('no certificate configured')
668
669 verify_cacrt(cert_fname)
670
671 if not os.path.isfile(pkey_fname):
672 raise ServerConfigException('private key %s does not exist' % pkey_fname)
673
674 from OpenSSL import crypto, SSL
675
676 try:
677 with open(pkey_fname) as f:
678 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
679 pkey.check()
680 except (ValueError, crypto.Error) as e:
681 raise ServerConfigException(
682 'Invalid private key {}: {}'.format(pkey_fname, str(e)))
683 try:
684 context = SSL.Context(SSL.TLSv1_METHOD)
685 context.use_certificate_file(cert_fname, crypto.FILETYPE_PEM)
686 context.use_privatekey_file(pkey_fname, crypto.FILETYPE_PEM)
687 context.check_privatekey()
688 except crypto.Error as e:
689 logger.warning(
690 'Private key {} and certificate {} do not match up: {}'.format(
691 pkey_fname, cert_fname, str(e)))
692
693
694 def get_most_recent_rate(rates: Optional[List[Tuple[float, float]]]) -> float:
695 """ Get most recent rate from rates
696
697 :param rates: The derivative between all time series data points [time in seconds, value]
698 :type rates: list[tuple[int, float]]
699
700 :return: The last derivative or 0.0 if none exists
701 :rtype: float
702
703 >>> get_most_recent_rate(None)
704 0.0
705 >>> get_most_recent_rate([])
706 0.0
707 >>> get_most_recent_rate([(1, -2.0)])
708 -2.0
709 >>> get_most_recent_rate([(1, 2.0), (2, 1.5), (3, 5.0)])
710 5.0
711 """
712 if not rates:
713 return 0.0
714 return rates[-1][1]
715
716 def get_time_series_rates(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
717 """ Rates from time series data
718
719 :param data: Time series data [time in seconds, value]
720 :type data: list[tuple[int, float]]
721
722 :return: The derivative between all time series data points [time in seconds, value]
723 :rtype: list[tuple[int, float]]
724
725 >>> logger.debug = lambda s,x,y: print(s % (x,y))
726 >>> get_time_series_rates([])
727 []
728 >>> get_time_series_rates([[0, 1], [1, 3]])
729 [(1, 2.0)]
730 >>> get_time_series_rates([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
731 Duplicate timestamp in time series data: [0, 2], [0, 3]
732 Duplicate timestamp in time series data: [0, 3], [0, 1]
733 Duplicate timestamp in time series data: [1, 2], [1, 3]
734 [(1, 2.0)]
735 >>> get_time_series_rates([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
736 [(2, 2.0), (4, 4.0), (5, 5.0), (6, 6.0)]
737 """
738 data = _filter_time_series(data)
739 if not data:
740 return []
741 return [(data2[0], _derivative(data1, data2) if data1 is not None else 0.0) for data1, data2 in
742 _pairwise(data)]
743
744 def name_to_config_section(name: str) -> ConfEntity:
745 """
746 Map from daemon names to ceph entity names (as seen in config)
747 """
748 daemon_type = name.split('.', 1)[0]
749 if daemon_type in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi']:
750 return ConfEntity('client.' + name)
751 elif daemon_type in ['mon', 'osd', 'mds', 'mgr', 'client']:
752 return ConfEntity(name)
753 else:
754 return ConfEntity('mon')
755
756
757 def _filter_time_series(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
758 """ Filters time series data
759
760 Filters out samples with the same timestamp in given time series data.
761 It also enforces the list to contain at least two samples.
762
763 All filtered values will be shown in the debug log. If values were filtered it's a bug in the
764 time series data collector, please report it.
765
766 :param data: Time series data [time in seconds, value]
767 :type data: list[tuple[int, float]]
768
769 :return: Filtered time series data [time in seconds, value]
770 :rtype: list[tuple[int, float]]
771
772 >>> logger.debug = lambda s,x,y: print(s % (x,y))
773 >>> _filter_time_series([])
774 []
775 >>> _filter_time_series([[1, 42]])
776 []
777 >>> _filter_time_series([[10, 2], [10, 3]])
778 Duplicate timestamp in time series data: [10, 2], [10, 3]
779 []
780 >>> _filter_time_series([[0, 1], [1, 2]])
781 [[0, 1], [1, 2]]
782 >>> _filter_time_series([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
783 Duplicate timestamp in time series data: [0, 2], [0, 3]
784 Duplicate timestamp in time series data: [0, 3], [0, 1]
785 Duplicate timestamp in time series data: [1, 2], [1, 3]
786 [[0, 1], [1, 3]]
787 >>> _filter_time_series([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
788 [[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]]
789 """
790 filtered = []
791 for i in range(len(data) - 1):
792 if data[i][0] == data[i + 1][0]: # Same timestamp
793 logger.debug("Duplicate timestamp in time series data: %s, %s", data[i], data[i + 1])
794 continue
795 filtered.append(data[i])
796 if not filtered:
797 return []
798 filtered.append(data[-1])
799 return filtered
800
801
802 def _derivative(p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
803 """ Derivative between two time series data points
804
805 :param p1: Time series data [time in seconds, value]
806 :type p1: tuple[int, float]
807 :param p2: Time series data [time in seconds, value]
808 :type p2: tuple[int, float]
809
810 :return: Derivative between both points
811 :rtype: float
812
813 >>> _derivative([0, 0], [2, 1])
814 0.5
815 >>> _derivative([0, 1], [2, 0])
816 -0.5
817 >>> _derivative([0, 0], [3, 1])
818 0.3333333333333333
819 """
820 return (p2[1] - p1[1]) / float(p2[0] - p1[0])
821
822
823 def _pairwise(iterable: Iterable[T]) -> Generator[Tuple[Optional[T], T], None, None]:
824 it = iter(iterable)
825 a = next(it, None)
826
827 for b in it:
828 yield (a, b)
829 a = b
830
831
832 def to_pretty_timedelta(n: datetime.timedelta) -> str:
833 if n < datetime.timedelta(seconds=120):
834 return str(int(n.total_seconds())) + 's'
835 if n < datetime.timedelta(minutes=120):
836 return str(int(n.total_seconds()) // 60) + 'm'
837 if n < datetime.timedelta(hours=48):
838 return str(int(n.total_seconds()) // 3600) + 'h'
839 if n < datetime.timedelta(days=14):
840 return str(int(n.total_seconds()) // (3600*24)) + 'd'
841 if n < datetime.timedelta(days=7*12):
842 return str(int(n.total_seconds()) // (3600*24*7)) + 'w'
843 if n < datetime.timedelta(days=365*2):
844 return str(int(n.total_seconds()) // (3600*24*30)) + 'M'
845 return str(int(n.total_seconds()) // (3600*24*365)) + 'y'
846
847
848 def profile_method(skip_attribute: bool = False) -> Callable[[Callable[..., T]], Callable[..., T]]:
849 """
850 Decorator for methods of the Module class. Logs the name of the given
851 function f with the time it takes to execute it.
852 """
853 def outer(f: Callable[..., T]) -> Callable[..., T]:
854 @wraps(f)
855 def wrapper(*args: Any, **kwargs: Any) -> T:
856 self = args[0]
857 t = time.time()
858 self.log.debug('Starting method {}.'.format(f.__name__))
859 result = f(*args, **kwargs)
860 duration = time.time() - t
861 if not skip_attribute:
862 wrapper._execution_duration = duration # type: ignore
863 self.log.debug('Method {} ran {:.3f} seconds.'.format(f.__name__, duration))
864 return result
865 return wrapper
866 return outer
867
868
869 def password_hash(password: Optional[str], salt_password: Optional[str] = None) -> Optional[str]:
870 if not password:
871 return None
872 if not salt_password:
873 salt = bcrypt.gensalt()
874 else:
875 salt = salt_password.encode('utf8')
876 return bcrypt.hashpw(password.encode('utf8'), salt).decode('utf8')