]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/mgr_util.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / pybind / mgr / mgr_util.py
CommitLineData
f67539c2
TL
1import os
2
3if 'UNITTEST' in os.environ:
4 import tests
5
6import cephfs
494da23a 7import contextlib
9f95a23c 8import datetime
f67539c2 9import errno
494da23a 10import socket
f6b5b4d7 11import time
f67539c2
TL
12import logging
13import sys
14from threading import Lock, Condition, Event
15from typing import no_type_check
522d829b 16import urllib
f6b5b4d7 17from functools import wraps
f67539c2
TL
18if sys.version_info >= (3, 3):
19 from threading import Timer
20else:
21 from threading import _Timer as Timer
22
23from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator
522d829b
TL
24
25from ceph.deployment.utils import wrap_ipv6
26
f67539c2 27T = TypeVar('T')
11fdf7f2 28
f67539c2
TL
29if TYPE_CHECKING:
30 from mgr_module import MgrModule
31
32Module_T = TypeVar('Module_T', bound="MgrModule")
9f95a23c 33
11fdf7f2
TL
34(
35 BLACK,
36 RED,
37 GREEN,
38 YELLOW,
39 BLUE,
40 MAGENTA,
41 CYAN,
42 GRAY
43) = range(8)
44
45RESET_SEQ = "\033[0m"
46COLOR_SEQ = "\033[1;%dm"
47COLOR_DARK_SEQ = "\033[0;%dm"
48BOLD_SEQ = "\033[1m"
49UNDERLINE_SEQ = "\033[4m"
50
eafe8130
TL
51logger = logging.getLogger(__name__)
52
11fdf7f2 53
f67539c2
TL
54class CephfsConnectionException(Exception):
55 def __init__(self, error_code: int, error_message: str):
56 self.errno = error_code
57 self.error_str = error_message
58
59 def to_tuple(self) -> Tuple[int, str, str]:
60 return self.errno, "", self.error_str
61
62 def __str__(self) -> str:
63 return "{0} ({1})".format(self.errno, self.error_str)
64
65class RTimer(Timer):
66 """
67 recurring timer variant of Timer
68 """
69 @no_type_check
70 def run(self):
71 try:
72 while not self.finished.is_set():
73 self.finished.wait(self.interval)
74 self.function(*self.args, **self.kwargs)
75 self.finished.set()
76 except Exception as e:
77 logger.error("task exception: %s", e)
78 raise
79
80@contextlib.contextmanager
81def lock_timeout_log(lock: Lock, timeout: int = 5) -> Iterator[None]:
82 start = time.time()
83 WARN_AFTER = 30
84 warned = False
85 while True:
86 logger.debug("locking {} with {} timeout".format(lock, timeout))
87 if lock.acquire(timeout=timeout):
88 logger.debug("locked {}".format(lock))
89 yield
90 lock.release()
91 break
92 now = time.time()
93 if not warned and now - start > WARN_AFTER:
94 logger.info("possible deadlock acquiring {}".format(lock))
95 warned = True
96
97
98class CephfsConnectionPool(object):
99 class Connection(object):
100 def __init__(self, mgr: Module_T, fs_name: str):
101 self.fs: Optional["cephfs.LibCephFS"] = None
102 self.mgr = mgr
103 self.fs_name = fs_name
104 self.ops_in_progress = 0
105 self.last_used = time.time()
106 self.fs_id = self.get_fs_id()
107
108 def get_fs_id(self) -> int:
109 fs_map = self.mgr.get('fs_map')
110 for fs in fs_map['filesystems']:
111 if fs['mdsmap']['fs_name'] == self.fs_name:
112 return fs['id']
113 raise CephfsConnectionException(
114 -errno.ENOENT, "FS '{0}' not found".format(self.fs_name))
115
116 def get_fs_handle(self) -> "cephfs.LibCephFS":
117 self.last_used = time.time()
118 self.ops_in_progress += 1
119 return self.fs
120
121 def put_fs_handle(self, notify: Callable) -> None:
122 assert self.ops_in_progress > 0
123 self.ops_in_progress -= 1
124 if self.ops_in_progress == 0:
125 notify()
126
127 def del_fs_handle(self, waiter: Optional[Callable]) -> None:
128 if waiter:
129 while self.ops_in_progress != 0:
130 waiter()
131 if self.is_connection_valid():
132 self.disconnect()
133 else:
134 self.abort()
135
136 def is_connection_valid(self) -> bool:
137 fs_id = None
138 try:
139 fs_id = self.get_fs_id()
140 except:
141 # the filesystem does not exist now -- connection is not valid.
142 pass
143 logger.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
144 return self.fs_id == fs_id
145
146 def is_connection_idle(self, timeout: float) -> bool:
147 return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
148
149 def connect(self) -> None:
150 assert self.ops_in_progress == 0
151 logger.debug("Connecting to cephfs '{0}'".format(self.fs_name))
152 self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
153 logger.debug("Setting user ID and group ID of CephFS mount as root...")
154 self.fs.conf_set("client_mount_uid", "0")
155 self.fs.conf_set("client_mount_gid", "0")
b3b6e05e 156 self.fs.conf_set("client_check_pool_perm", "false")
f67539c2
TL
157 logger.debug("CephFS initializing...")
158 self.fs.init()
159 logger.debug("CephFS mounting...")
160 self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
161 logger.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
162 self.mgr._ceph_register_client(self.fs.get_addrs())
163
164 def disconnect(self) -> None:
165 try:
166 assert self.fs
167 assert self.ops_in_progress == 0
168 logger.info("disconnecting from cephfs '{0}'".format(self.fs_name))
169 addrs = self.fs.get_addrs()
170 self.fs.shutdown()
171 self.mgr._ceph_unregister_client(addrs)
172 self.fs = None
173 except Exception as e:
174 logger.debug("disconnect: ({0})".format(e))
175 raise
176
177 def abort(self) -> None:
178 assert self.fs
179 assert self.ops_in_progress == 0
180 logger.info("aborting connection from cephfs '{0}'".format(self.fs_name))
181 self.fs.abort_conn()
182 logger.info("abort done from cephfs '{0}'".format(self.fs_name))
183 self.fs = None
184
185 # TODO: make this configurable
186 TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
187 CONNECTION_IDLE_INTERVAL = 60.0 # seconds
522d829b 188 MAX_CONCURRENT_CONNECTIONS = 5 # max number of concurrent connections per volume
f67539c2
TL
189
190 def __init__(self, mgr: Module_T):
191 self.mgr = mgr
522d829b 192 self.connections: Dict[str, List[CephfsConnectionPool.Connection]] = {}
f67539c2
TL
193 self.lock = Lock()
194 self.cond = Condition(self.lock)
195 self.timer_task = RTimer(CephfsConnectionPool.TIMER_TASK_RUN_INTERVAL,
196 self.cleanup_connections)
197 self.timer_task.start()
198
199 def cleanup_connections(self) -> None:
200 with self.lock:
201 logger.info("scanning for idle connections..")
522d829b
TL
202 idle_conns = []
203 for fs_name, connections in self.connections.items():
204 logger.debug(f'fs_name ({fs_name}) connections ({connections})')
205 for connection in connections:
206 if connection.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL):
207 idle_conns.append((fs_name, connection))
208 logger.info(f'cleaning up connections: {idle_conns}')
209 for idle_conn in idle_conns:
210 self._del_connection(idle_conn[0], idle_conn[1])
f67539c2
TL
211
212 def get_fs_handle(self, fs_name: str) -> "cephfs.LibCephFS":
213 with self.lock:
f67539c2 214 try:
522d829b
TL
215 min_shared = 0
216 shared_connection = None
217 connections = self.connections.setdefault(fs_name, [])
218 logger.debug(f'[get] volume: ({fs_name}) connection: ({connections})')
219 if connections:
220 min_shared = connections[0].ops_in_progress
221 shared_connection = connections[0]
222 for connection in list(connections):
223 logger.debug(f'[get] connection: {connection} usage: {connection.ops_in_progress}')
224 if connection.ops_in_progress == 0:
225 if connection.is_connection_valid():
226 logger.debug(f'[get] connection ({connection}) can be reused')
227 return connection.get_fs_handle()
228 else:
229 # filesystem id changed beneath us (or the filesystem does not exist).
230 # this is possible if the filesystem got removed (and recreated with
231 # same name) via "ceph fs rm/new" mon command.
232 logger.warning(f'[get] filesystem id changed for volume ({fs_name}), disconnecting ({connection})')
233 # note -- this will mutate @connections too
234 self._del_connection(fs_name, connection)
f67539c2 235 else:
522d829b
TL
236 if connection.ops_in_progress < min_shared:
237 min_shared = connection.ops_in_progress
238 shared_connection = connection
239 # when we end up here, there are no "free" connections. so either spin up a new
240 # one or share it.
241 if len(connections) < CephfsConnectionPool.MAX_CONCURRENT_CONNECTIONS:
242 logger.debug('[get] spawning new connection since no connection is unused and we still have room for more')
243 connection = CephfsConnectionPool.Connection(self.mgr, fs_name)
244 connection.connect()
245 self.connections[fs_name].append(connection)
246 return connection.get_fs_handle()
247 else:
248 assert shared_connection is not None
249 logger.debug(f'[get] using shared connection ({shared_connection})')
250 return shared_connection.get_fs_handle()
f67539c2
TL
251 except cephfs.Error as e:
252 # try to provide a better error string if possible
253 if e.args[0] == errno.ENOENT:
254 raise CephfsConnectionException(
255 -errno.ENOENT, "FS '{0}' not found".format(fs_name))
256 raise CephfsConnectionException(-e.args[0], e.args[1])
f67539c2 257
522d829b 258 def put_fs_handle(self, fs_name: str, fs_handle: cephfs.LibCephFS) -> None:
f67539c2 259 with self.lock:
522d829b
TL
260 connections = self.connections.get(fs_name, [])
261 for connection in connections:
262 if connection.fs == fs_handle:
263 logger.debug(f'[put] connection: {connection} usage: {connection.ops_in_progress}')
264 connection.put_fs_handle(notify=lambda: self.cond.notifyAll())
f67539c2 265
522d829b
TL
266 def _del_connection(self, fs_name: str, connection: Connection, wait: bool = False) -> None:
267 self.connections[fs_name].remove(connection)
268 connection.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
f67539c2 269
522d829b
TL
270 def _del_connections(self, fs_name: str, wait: bool = False) -> None:
271 for connection in list(self.connections.get(fs_name, [])):
272 self._del_connection(fs_name, connection, wait)
273
274 def del_connections(self, fs_name: str, wait: bool = False) -> None:
f67539c2 275 with self.lock:
522d829b 276 self._del_connections(fs_name, wait)
f67539c2 277
522d829b 278 def del_all_connections(self) -> None:
f67539c2
TL
279 with self.lock:
280 for fs_name in list(self.connections.keys()):
281 logger.info("waiting for pending ops for '{}'".format(fs_name))
522d829b 282 self._del_connections(fs_name, wait=True)
f67539c2
TL
283 logger.info("pending ops completed for '{}'".format(fs_name))
284 # no new connections should have been initialized since its
285 # guarded on shutdown.
286 assert len(self.connections) == 0
287
288
289class CephfsClient(Generic[Module_T]):
290 def __init__(self, mgr: Module_T):
291 self.mgr = mgr
292 self.stopping = Event()
293 self.connection_pool = CephfsConnectionPool(self.mgr)
294
295 def is_stopping(self) -> bool:
296 return self.stopping.is_set()
297
298 def shutdown(self) -> None:
299 logger.info("shutting down")
300 # first, note that we're shutting down
301 self.stopping.set()
302 # second, delete all libcephfs handles from connection pool
522d829b 303 self.connection_pool.del_all_connections()
f67539c2
TL
304
305 def get_fs(self, fs_name: str) -> Optional["cephfs.LibCephFS"]:
306 fs_map = self.mgr.get('fs_map')
307 for fs in fs_map['filesystems']:
308 if fs['mdsmap']['fs_name'] == fs_name:
309 return fs
310 return None
311
312 def get_mds_names(self, fs_name: str) -> List[str]:
313 fs = self.get_fs(fs_name)
314 if fs is None:
315 return []
316 return [mds['name'] for mds in fs['mdsmap']['info'].values()]
317
318 def get_metadata_pool(self, fs_name: str) -> Optional[str]:
319 fs = self.get_fs(fs_name)
320 if fs:
321 return fs['mdsmap']['metadata_pool']
322 return None
323
1d09f67e
TL
324 def get_all_filesystems(self) -> List[str]:
325 fs_list: List[str] = []
326 fs_map = self.mgr.get('fs_map')
327 if fs_map['filesystems']:
328 for fs in fs_map['filesystems']:
329 fs_list.append(fs['mdsmap']['fs_name'])
330 return fs_list
331
332
f67539c2
TL
333
334@contextlib.contextmanager
335def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCephFS", None, None]:
336 """
337 Open a volume with shared access.
338 This API is to be used as a context manager.
339
340 :param fsc: cephfs client instance
341 :param fs_name: fs name
342 :return: yields a fs handle (ceph filesystem handle)
343 """
344 if fsc.is_stopping():
345 raise CephfsConnectionException(-errno.ESHUTDOWN,
346 "shutdown in progress")
347
348 fs_handle = fsc.connection_pool.get_fs_handle(fs_name)
349 try:
350 yield fs_handle
351 finally:
522d829b 352 fsc.connection_pool.put_fs_handle(fs_name, fs_handle)
f67539c2
TL
353
354
355def colorize(msg: str, color: int, dark: bool = False) -> str:
11fdf7f2
TL
356 """
357 Decorate `msg` with escape sequences to give the requested color
358 """
359 return (COLOR_DARK_SEQ if dark else COLOR_SEQ) % (30 + color) \
360 + msg + RESET_SEQ
361
362
f67539c2 363def bold(msg: str) -> str:
11fdf7f2
TL
364 """
365 Decorate `msg` with escape sequences to make it appear bold
366 """
367 return BOLD_SEQ + msg + RESET_SEQ
368
369
f67539c2 370def format_units(n: int, width: int, colored: bool, decimal: bool) -> str:
11fdf7f2
TL
371 """
372 Format a number without units, so as to fit into `width` characters, substituting
373 an appropriate unit suffix.
374
375 Use decimal for dimensionless things, use base 2 (decimal=False) for byte sizes/rates.
376 """
377
378 factor = 1000 if decimal else 1024
379 units = [' ', 'k', 'M', 'G', 'T', 'P', 'E']
380 unit = 0
381 while len("%s" % (int(n) // (factor**unit))) > width - 1:
382 unit += 1
383
384 if unit > 0:
385 truncated_float = ("%f" % (n / (float(factor) ** unit)))[0:width - 1]
386 if truncated_float[-1] == '.':
387 truncated_float = " " + truncated_float[0:-1]
388 else:
389 truncated_float = "%{wid}d".format(wid=width - 1) % n
390 formatted = "%s%s" % (truncated_float, units[unit])
391
392 if colored:
393 if n == 0:
394 color = BLACK, False
395 else:
396 color = YELLOW, False
397 return bold(colorize(formatted[0:-1], color[0], color[1])) \
f67539c2 398 + bold(colorize(formatted[-1], YELLOW, False))
11fdf7f2
TL
399 else:
400 return formatted
401
402
f67539c2 403def format_dimless(n: int, width: int, colored: bool = False) -> str:
11fdf7f2
TL
404 return format_units(n, width, colored, decimal=True)
405
406
f67539c2 407def format_bytes(n: int, width: int, colored: bool = False) -> str:
11fdf7f2 408 return format_units(n, width, colored, decimal=False)
81eedcae
TL
409
410
f67539c2 411def merge_dicts(*args: Dict[T, Any]) -> Dict[T, Any]:
81eedcae 412 """
9f95a23c
TL
413 >>> merge_dicts({1:2}, {3:4})
414 {1: 2, 3: 4}
415
416 You can also overwrite keys:
417 >>> merge_dicts({1:2}, {1:4})
418 {1: 4}
419
81eedcae
TL
420 :rtype: dict[str, Any]
421 """
422 ret = {}
423 for arg in args:
424 ret.update(arg)
425 return ret
494da23a
TL
426
427
428def get_default_addr():
9f95a23c 429 # type: () -> str
f67539c2 430 def is_ipv6_enabled() -> bool:
494da23a
TL
431 try:
432 sock = socket.socket(socket.AF_INET6)
433 with contextlib.closing(sock):
434 sock.bind(("::1", 0))
435 return True
f67539c2
TL
436 except (AttributeError, socket.error):
437 return False
494da23a
TL
438
439 try:
9f95a23c 440 return get_default_addr.result # type: ignore
494da23a
TL
441 except AttributeError:
442 result = '::' if is_ipv6_enabled() else '0.0.0.0'
9f95a23c 443 get_default_addr.result = result # type: ignore
494da23a
TL
444 return result
445
eafe8130 446
522d829b
TL
447def build_url(host: str, scheme: Optional[str] = None, port: Optional[int] = None, path: str = '') -> str:
448 """
449 Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets
450 automatically.
451
452 >>> build_url('example.com', 'https', 443)
453 'https://example.com:443'
454
455 >>> build_url(host='example.com', port=443)
456 '//example.com:443'
457
458 >>> build_url('fce:9af7:a667:7286:4917:b8d3:34df:8373', port=80, scheme='http')
459 'http://[fce:9af7:a667:7286:4917:b8d3:34df:8373]:80'
460
461 >>> build_url('example.com', 'https', 443, path='/metrics')
462 'https://example.com:443/metrics'
463
464
465 :param scheme: The scheme, e.g. http, https or ftp.
466 :type scheme: str
467 :param host: Consisting of either a registered name (including but not limited to
468 a hostname) or an IP address.
469 :type host: str
470 :type port: int
471 :rtype: str
472 """
473 netloc = wrap_ipv6(host)
474 if port:
475 netloc += ':{}'.format(port)
476 pr = urllib.parse.ParseResult(
477 scheme=scheme if scheme else '',
478 netloc=netloc,
479 path=path,
480 params='',
481 query='',
482 fragment='')
483 return pr.geturl()
484
485
eafe8130
TL
486class ServerConfigException(Exception):
487 pass
488
9f95a23c 489
f67539c2
TL
490def create_self_signed_cert(organisation: str = 'Ceph',
491 common_name: str = 'mgr',
492 dname: Optional[Dict[str, str]] = None) -> Tuple[str, str]:
9f95a23c 493 """Returns self-signed PEM certificates valid for 10 years.
f67539c2
TL
494
495 The optional dname parameter provides complete control of the cert/key
496 creation by supporting all valid RDNs via a dictionary. However, if dname
497 is not provided the default O and CN settings will be applied.
498
499 :param organisation: String representing the Organisation(O) RDN (default='Ceph')
500 :param common_name: String representing the Common Name(CN) RDN (default='mgr')
501 :param dname: Optional dictionary containing RDNs to use for crt/key generation
502
503 :return: ssl crt and key in utf-8 format
504
505 :raises ValueError: if the dname parameter received contains invalid RDNs
506
9f95a23c
TL
507 """
508
509 from OpenSSL import crypto
510 from uuid import uuid4
511
f67539c2
TL
512 # RDN = Relative Distinguished Name
513 valid_RDN_list = ['C', 'ST', 'L', 'O', 'OU', 'CN', 'emailAddress']
514
9f95a23c
TL
515 # create a key pair
516 pkey = crypto.PKey()
517 pkey.generate_key(crypto.TYPE_RSA, 2048)
518
f67539c2
TL
519 # Create a "subject" object
520 req = crypto.X509Req()
521 subj = req.get_subject()
522
523 if dname:
524 # dname received, so check it contains valid RDNs
525 if not all(field in valid_RDN_list for field in dname):
526 raise ValueError("Invalid DNAME received. Valid DNAME fields are {}".format(', '.join(valid_RDN_list)))
527 else:
528 dname = {"O": organisation, "CN": common_name}
529
530 # populate the subject with the dname settings
531 for k, v in dname.items():
532 setattr(subj, k, v)
533
9f95a23c
TL
534 # create a self-signed cert
535 cert = crypto.X509()
f67539c2 536 cert.set_subject(req.get_subject())
9f95a23c
TL
537 cert.set_serial_number(int(uuid4()))
538 cert.gmtime_adj_notBefore(0)
539 cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
540 cert.set_issuer(cert.get_subject())
541 cert.set_pubkey(pkey)
542 cert.sign(pkey, 'sha512')
543
544 cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
545 pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
546
547 return cert.decode('utf-8'), pkey.decode('utf-8')
548
549
550def verify_cacrt_content(crt):
551 # type: (str) -> None
552 from OpenSSL import crypto
553 try:
554 x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
555 if x509.has_expired():
556 logger.warning('Certificate has expired: {}'.format(crt))
557 except (ValueError, crypto.Error) as e:
558 raise ServerConfigException(
559 'Invalid certificate: {}'.format(str(e)))
560
561
eafe8130 562def verify_cacrt(cert_fname):
9f95a23c 563 # type: (str) -> None
eafe8130
TL
564 """Basic validation of a ca cert"""
565
566 if not cert_fname:
567 raise ServerConfigException("CA cert not configured")
568 if not os.path.isfile(cert_fname):
569 raise ServerConfigException("Certificate {} does not exist".format(cert_fname))
570
eafe8130
TL
571 try:
572 with open(cert_fname) as f:
9f95a23c
TL
573 verify_cacrt_content(f.read())
574 except ValueError as e:
eafe8130
TL
575 raise ServerConfigException(
576 'Invalid certificate {}: {}'.format(cert_fname, str(e)))
577
578
9f95a23c
TL
579def verify_tls(crt, key):
580 # type: (str, str) -> None
581 verify_cacrt_content(crt)
582
583 from OpenSSL import crypto, SSL
584 try:
585 _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
586 _key.check()
587 except (ValueError, crypto.Error) as e:
588 raise ServerConfigException(
589 'Invalid private key: {}'.format(str(e)))
590 try:
591 _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
592 except ValueError as e:
593 raise ServerConfigException(
594 'Invalid certificate key: {}'.format(str(e))
595 )
596
597 try:
598 context = SSL.Context(SSL.TLSv1_METHOD)
599 context.use_certificate(_crt)
600 context.use_privatekey(_key)
601 context.check_privatekey()
602 except crypto.Error as e:
603 logger.warning(
604 'Private key and certificate do not match up: {}'.format(str(e)))
605
606
eafe8130 607def verify_tls_files(cert_fname, pkey_fname):
9f95a23c 608 # type: (str, str) -> None
eafe8130
TL
609 """Basic checks for TLS certificate and key files
610
611 Do some validations to the private key and certificate:
612 - Check the type and format
613 - Check the certificate expiration date
614 - Check the consistency of the private key
615 - Check that the private key and certificate match up
616
617 :param cert_fname: Name of the certificate file
618 :param pkey_fname: name of the certificate public key file
619
620 :raises ServerConfigException: An error with a message
621
622 """
623
624 if not cert_fname or not pkey_fname:
625 raise ServerConfigException('no certificate configured')
626
627 verify_cacrt(cert_fname)
628
629 if not os.path.isfile(pkey_fname):
630 raise ServerConfigException('private key %s does not exist' % pkey_fname)
631
632 from OpenSSL import crypto, SSL
633
634 try:
635 with open(pkey_fname) as f:
636 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
637 pkey.check()
638 except (ValueError, crypto.Error) as e:
639 raise ServerConfigException(
640 'Invalid private key {}: {}'.format(pkey_fname, str(e)))
641 try:
642 context = SSL.Context(SSL.TLSv1_METHOD)
643 context.use_certificate_file(cert_fname, crypto.FILETYPE_PEM)
644 context.use_privatekey_file(pkey_fname, crypto.FILETYPE_PEM)
645 context.check_privatekey()
646 except crypto.Error as e:
647 logger.warning(
648 'Private key {} and certificate {} do not match up: {}'.format(
649 pkey_fname, cert_fname, str(e)))
9f95a23c 650
f67539c2
TL
651
652def get_most_recent_rate(rates: Optional[List[Tuple[float, float]]]) -> float:
9f95a23c
TL
653 """ Get most recent rate from rates
654
655 :param rates: The derivative between all time series data points [time in seconds, value]
656 :type rates: list[tuple[int, float]]
657
658 :return: The last derivative or 0.0 if none exists
659 :rtype: float
660
661 >>> get_most_recent_rate(None)
662 0.0
663 >>> get_most_recent_rate([])
664 0.0
665 >>> get_most_recent_rate([(1, -2.0)])
666 -2.0
667 >>> get_most_recent_rate([(1, 2.0), (2, 1.5), (3, 5.0)])
668 5.0
669 """
670 if not rates:
671 return 0.0
672 return rates[-1][1]
673
f67539c2 674def get_time_series_rates(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
9f95a23c
TL
675 """ Rates from time series data
676
677 :param data: Time series data [time in seconds, value]
678 :type data: list[tuple[int, float]]
679
680 :return: The derivative between all time series data points [time in seconds, value]
681 :rtype: list[tuple[int, float]]
682
683 >>> logger.debug = lambda s,x,y: print(s % (x,y))
684 >>> get_time_series_rates([])
685 []
686 >>> get_time_series_rates([[0, 1], [1, 3]])
687 [(1, 2.0)]
688 >>> get_time_series_rates([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
689 Duplicate timestamp in time series data: [0, 2], [0, 3]
690 Duplicate timestamp in time series data: [0, 3], [0, 1]
691 Duplicate timestamp in time series data: [1, 2], [1, 3]
692 [(1, 2.0)]
693 >>> get_time_series_rates([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
694 [(2, 2.0), (4, 4.0), (5, 5.0), (6, 6.0)]
695 """
696 data = _filter_time_series(data)
697 if not data:
698 return []
f67539c2 699 return [(data2[0], _derivative(data1, data2) if data1 is not None else 0.0) for data1, data2 in
9f95a23c
TL
700 _pairwise(data)]
701
f67539c2
TL
702
703def _filter_time_series(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
9f95a23c
TL
704 """ Filters time series data
705
706 Filters out samples with the same timestamp in given time series data.
707 It also enforces the list to contain at least two samples.
708
709 All filtered values will be shown in the debug log. If values were filtered it's a bug in the
710 time series data collector, please report it.
711
712 :param data: Time series data [time in seconds, value]
713 :type data: list[tuple[int, float]]
714
715 :return: Filtered time series data [time in seconds, value]
716 :rtype: list[tuple[int, float]]
717
718 >>> logger.debug = lambda s,x,y: print(s % (x,y))
719 >>> _filter_time_series([])
720 []
721 >>> _filter_time_series([[1, 42]])
722 []
723 >>> _filter_time_series([[10, 2], [10, 3]])
724 Duplicate timestamp in time series data: [10, 2], [10, 3]
725 []
726 >>> _filter_time_series([[0, 1], [1, 2]])
727 [[0, 1], [1, 2]]
728 >>> _filter_time_series([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
729 Duplicate timestamp in time series data: [0, 2], [0, 3]
730 Duplicate timestamp in time series data: [0, 3], [0, 1]
731 Duplicate timestamp in time series data: [1, 2], [1, 3]
732 [[0, 1], [1, 3]]
733 >>> _filter_time_series([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
734 [[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]]
735 """
736 filtered = []
737 for i in range(len(data) - 1):
738 if data[i][0] == data[i + 1][0]: # Same timestamp
739 logger.debug("Duplicate timestamp in time series data: %s, %s", data[i], data[i + 1])
740 continue
741 filtered.append(data[i])
742 if not filtered:
743 return []
744 filtered.append(data[-1])
745 return filtered
746
f67539c2
TL
747
748def _derivative(p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
9f95a23c
TL
749 """ Derivative between two time series data points
750
751 :param p1: Time series data [time in seconds, value]
752 :type p1: tuple[int, float]
753 :param p2: Time series data [time in seconds, value]
754 :type p2: tuple[int, float]
755
756 :return: Derivative between both points
757 :rtype: float
758
759 >>> _derivative([0, 0], [2, 1])
760 0.5
761 >>> _derivative([0, 1], [2, 0])
762 -0.5
763 >>> _derivative([0, 0], [3, 1])
764 0.3333333333333333
765 """
766 return (p2[1] - p1[1]) / float(p2[0] - p1[0])
767
f67539c2
TL
768
769def _pairwise(iterable: Iterable[T]) -> Generator[Tuple[Optional[T], T], None, None]:
9f95a23c
TL
770 it = iter(iterable)
771 a = next(it, None)
772
773 for b in it:
774 yield (a, b)
775 a = b
776
f67539c2
TL
777
778def to_pretty_timedelta(n: datetime.timedelta) -> str:
9f95a23c
TL
779 if n < datetime.timedelta(seconds=120):
780 return str(n.seconds) + 's'
781 if n < datetime.timedelta(minutes=120):
782 return str(n.seconds // 60) + 'm'
783 if n < datetime.timedelta(hours=48):
784 return str(n.seconds // 3600) + 'h'
785 if n < datetime.timedelta(days=14):
786 return str(n.days) + 'd'
787 if n < datetime.timedelta(days=7*12):
788 return str(n.days // 7) + 'w'
789 if n < datetime.timedelta(days=365*2):
790 return str(n.days // 30) + 'M'
791 return str(n.days // 365) + 'y'
f6b5b4d7
TL
792
793
f67539c2 794def profile_method(skip_attribute: bool = False) -> Callable[[Callable[..., T]], Callable[..., T]]:
f6b5b4d7
TL
795 """
796 Decorator for methods of the Module class. Logs the name of the given
797 function f with the time it takes to execute it.
798 """
f67539c2 799 def outer(f: Callable[..., T]) -> Callable[..., T]:
f6b5b4d7 800 @wraps(f)
f67539c2 801 def wrapper(*args: Any, **kwargs: Any) -> T:
f6b5b4d7
TL
802 self = args[0]
803 t = time.time()
804 self.log.debug('Starting method {}.'.format(f.__name__))
805 result = f(*args, **kwargs)
806 duration = time.time() - t
807 if not skip_attribute:
808 wrapper._execution_duration = duration # type: ignore
809 self.log.debug('Method {} ran {:.3f} seconds.'.format(f.__name__, duration))
810 return result
811 return wrapper
812 return outer