]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_util.py
04bcc8485b2661819d9a4a83d1f2686570e242b1
[ceph.git] / ceph / src / pybind / mgr / mgr_util.py
1 import os
2
3 if 'UNITTEST' in os.environ:
4 import tests
5
6 import cephfs
7 import contextlib
8 import datetime
9 import errno
10 import socket
11 import time
12 import logging
13 import sys
14 from threading import Lock, Condition, Event
15 from typing import no_type_check
16 import urllib
17 from functools import wraps
18 if sys.version_info >= (3, 3):
19 from threading import Timer
20 else:
21 from threading import _Timer as Timer
22
23 from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator
24
25 from ceph.deployment.utils import wrap_ipv6
26
27 T = TypeVar('T')
28
29 if TYPE_CHECKING:
30 from mgr_module import MgrModule
31
32 Module_T = TypeVar('Module_T', bound="MgrModule")
33
34 (
35 BLACK,
36 RED,
37 GREEN,
38 YELLOW,
39 BLUE,
40 MAGENTA,
41 CYAN,
42 GRAY
43 ) = range(8)
44
45 RESET_SEQ = "\033[0m"
46 COLOR_SEQ = "\033[1;%dm"
47 COLOR_DARK_SEQ = "\033[0;%dm"
48 BOLD_SEQ = "\033[1m"
49 UNDERLINE_SEQ = "\033[4m"
50
51 logger = logging.getLogger(__name__)
52
53
54 class CephfsConnectionException(Exception):
55 def __init__(self, error_code: int, error_message: str):
56 self.errno = error_code
57 self.error_str = error_message
58
59 def to_tuple(self) -> Tuple[int, str, str]:
60 return self.errno, "", self.error_str
61
62 def __str__(self) -> str:
63 return "{0} ({1})".format(self.errno, self.error_str)
64
65 class RTimer(Timer):
66 """
67 recurring timer variant of Timer
68 """
69 @no_type_check
70 def run(self):
71 try:
72 while not self.finished.is_set():
73 self.finished.wait(self.interval)
74 self.function(*self.args, **self.kwargs)
75 self.finished.set()
76 except Exception as e:
77 logger.error("task exception: %s", e)
78 raise
79
80 @contextlib.contextmanager
81 def lock_timeout_log(lock: Lock, timeout: int = 5) -> Iterator[None]:
82 start = time.time()
83 WARN_AFTER = 30
84 warned = False
85 while True:
86 logger.debug("locking {} with {} timeout".format(lock, timeout))
87 if lock.acquire(timeout=timeout):
88 logger.debug("locked {}".format(lock))
89 yield
90 lock.release()
91 break
92 now = time.time()
93 if not warned and now - start > WARN_AFTER:
94 logger.info("possible deadlock acquiring {}".format(lock))
95 warned = True
96
97
98 class CephfsConnectionPool(object):
99 class Connection(object):
100 def __init__(self, mgr: Module_T, fs_name: str):
101 self.fs: Optional["cephfs.LibCephFS"] = None
102 self.mgr = mgr
103 self.fs_name = fs_name
104 self.ops_in_progress = 0
105 self.last_used = time.time()
106 self.fs_id = self.get_fs_id()
107
108 def get_fs_id(self) -> int:
109 fs_map = self.mgr.get('fs_map')
110 for fs in fs_map['filesystems']:
111 if fs['mdsmap']['fs_name'] == self.fs_name:
112 return fs['id']
113 raise CephfsConnectionException(
114 -errno.ENOENT, "FS '{0}' not found".format(self.fs_name))
115
116 def get_fs_handle(self) -> "cephfs.LibCephFS":
117 self.last_used = time.time()
118 self.ops_in_progress += 1
119 return self.fs
120
121 def put_fs_handle(self, notify: Callable) -> None:
122 assert self.ops_in_progress > 0
123 self.ops_in_progress -= 1
124 if self.ops_in_progress == 0:
125 notify()
126
127 def del_fs_handle(self, waiter: Optional[Callable]) -> None:
128 if waiter:
129 while self.ops_in_progress != 0:
130 waiter()
131 if self.is_connection_valid():
132 self.disconnect()
133 else:
134 self.abort()
135
136 def is_connection_valid(self) -> bool:
137 fs_id = None
138 try:
139 fs_id = self.get_fs_id()
140 except:
141 # the filesystem does not exist now -- connection is not valid.
142 pass
143 logger.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
144 return self.fs_id == fs_id
145
146 def is_connection_idle(self, timeout: float) -> bool:
147 return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
148
149 def connect(self) -> None:
150 assert self.ops_in_progress == 0
151 logger.debug("Connecting to cephfs '{0}'".format(self.fs_name))
152 self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
153 logger.debug("Setting user ID and group ID of CephFS mount as root...")
154 self.fs.conf_set("client_mount_uid", "0")
155 self.fs.conf_set("client_mount_gid", "0")
156 self.fs.conf_set("client_check_pool_perm", "false")
157 logger.debug("CephFS initializing...")
158 self.fs.init()
159 logger.debug("CephFS mounting...")
160 self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
161 logger.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
162 self.mgr._ceph_register_client(self.fs.get_addrs())
163
164 def disconnect(self) -> None:
165 try:
166 assert self.fs
167 assert self.ops_in_progress == 0
168 logger.info("disconnecting from cephfs '{0}'".format(self.fs_name))
169 addrs = self.fs.get_addrs()
170 self.fs.shutdown()
171 self.mgr._ceph_unregister_client(addrs)
172 self.fs = None
173 except Exception as e:
174 logger.debug("disconnect: ({0})".format(e))
175 raise
176
177 def abort(self) -> None:
178 assert self.fs
179 assert self.ops_in_progress == 0
180 logger.info("aborting connection from cephfs '{0}'".format(self.fs_name))
181 self.fs.abort_conn()
182 logger.info("abort done from cephfs '{0}'".format(self.fs_name))
183 self.fs = None
184
185 # TODO: make this configurable
186 TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
187 CONNECTION_IDLE_INTERVAL = 60.0 # seconds
188 MAX_CONCURRENT_CONNECTIONS = 5 # max number of concurrent connections per volume
189
190 def __init__(self, mgr: Module_T):
191 self.mgr = mgr
192 self.connections: Dict[str, List[CephfsConnectionPool.Connection]] = {}
193 self.lock = Lock()
194 self.cond = Condition(self.lock)
195 self.timer_task = RTimer(CephfsConnectionPool.TIMER_TASK_RUN_INTERVAL,
196 self.cleanup_connections)
197 self.timer_task.start()
198
199 def cleanup_connections(self) -> None:
200 with self.lock:
201 logger.info("scanning for idle connections..")
202 idle_conns = []
203 for fs_name, connections in self.connections.items():
204 logger.debug(f'fs_name ({fs_name}) connections ({connections})')
205 for connection in connections:
206 if connection.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL):
207 idle_conns.append((fs_name, connection))
208 logger.info(f'cleaning up connections: {idle_conns}')
209 for idle_conn in idle_conns:
210 self._del_connection(idle_conn[0], idle_conn[1])
211
212 def get_fs_handle(self, fs_name: str) -> "cephfs.LibCephFS":
213 with self.lock:
214 try:
215 min_shared = 0
216 shared_connection = None
217 connections = self.connections.setdefault(fs_name, [])
218 logger.debug(f'[get] volume: ({fs_name}) connection: ({connections})')
219 if connections:
220 min_shared = connections[0].ops_in_progress
221 shared_connection = connections[0]
222 for connection in list(connections):
223 logger.debug(f'[get] connection: {connection} usage: {connection.ops_in_progress}')
224 if connection.ops_in_progress == 0:
225 if connection.is_connection_valid():
226 logger.debug(f'[get] connection ({connection}) can be reused')
227 return connection.get_fs_handle()
228 else:
229 # filesystem id changed beneath us (or the filesystem does not exist).
230 # this is possible if the filesystem got removed (and recreated with
231 # same name) via "ceph fs rm/new" mon command.
232 logger.warning(f'[get] filesystem id changed for volume ({fs_name}), disconnecting ({connection})')
233 # note -- this will mutate @connections too
234 self._del_connection(fs_name, connection)
235 else:
236 if connection.ops_in_progress < min_shared:
237 min_shared = connection.ops_in_progress
238 shared_connection = connection
239 # when we end up here, there are no "free" connections. so either spin up a new
240 # one or share it.
241 if len(connections) < CephfsConnectionPool.MAX_CONCURRENT_CONNECTIONS:
242 logger.debug('[get] spawning new connection since no connection is unused and we still have room for more')
243 connection = CephfsConnectionPool.Connection(self.mgr, fs_name)
244 connection.connect()
245 self.connections[fs_name].append(connection)
246 return connection.get_fs_handle()
247 else:
248 assert shared_connection is not None
249 logger.debug(f'[get] using shared connection ({shared_connection})')
250 return shared_connection.get_fs_handle()
251 except cephfs.Error as e:
252 # try to provide a better error string if possible
253 if e.args[0] == errno.ENOENT:
254 raise CephfsConnectionException(
255 -errno.ENOENT, "FS '{0}' not found".format(fs_name))
256 raise CephfsConnectionException(-e.args[0], e.args[1])
257
258 def put_fs_handle(self, fs_name: str, fs_handle: cephfs.LibCephFS) -> None:
259 with self.lock:
260 connections = self.connections.get(fs_name, [])
261 for connection in connections:
262 if connection.fs == fs_handle:
263 logger.debug(f'[put] connection: {connection} usage: {connection.ops_in_progress}')
264 connection.put_fs_handle(notify=lambda: self.cond.notifyAll())
265
266 def _del_connection(self, fs_name: str, connection: Connection, wait: bool = False) -> None:
267 self.connections[fs_name].remove(connection)
268 connection.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
269
270 def _del_connections(self, fs_name: str, wait: bool = False) -> None:
271 for connection in list(self.connections.get(fs_name, [])):
272 self._del_connection(fs_name, connection, wait)
273
274 def del_connections(self, fs_name: str, wait: bool = False) -> None:
275 with self.lock:
276 self._del_connections(fs_name, wait)
277
278 def del_all_connections(self) -> None:
279 with self.lock:
280 for fs_name in list(self.connections.keys()):
281 logger.info("waiting for pending ops for '{}'".format(fs_name))
282 self._del_connections(fs_name, wait=True)
283 logger.info("pending ops completed for '{}'".format(fs_name))
284 # no new connections should have been initialized since its
285 # guarded on shutdown.
286 assert len(self.connections) == 0
287
288
289 class CephfsClient(Generic[Module_T]):
290 def __init__(self, mgr: Module_T):
291 self.mgr = mgr
292 self.stopping = Event()
293 self.connection_pool = CephfsConnectionPool(self.mgr)
294
295 def is_stopping(self) -> bool:
296 return self.stopping.is_set()
297
298 def shutdown(self) -> None:
299 logger.info("shutting down")
300 # first, note that we're shutting down
301 self.stopping.set()
302 # second, delete all libcephfs handles from connection pool
303 self.connection_pool.del_all_connections()
304
305 def get_fs(self, fs_name: str) -> Optional["cephfs.LibCephFS"]:
306 fs_map = self.mgr.get('fs_map')
307 for fs in fs_map['filesystems']:
308 if fs['mdsmap']['fs_name'] == fs_name:
309 return fs
310 return None
311
312 def get_mds_names(self, fs_name: str) -> List[str]:
313 fs = self.get_fs(fs_name)
314 if fs is None:
315 return []
316 return [mds['name'] for mds in fs['mdsmap']['info'].values()]
317
318 def get_metadata_pool(self, fs_name: str) -> Optional[str]:
319 fs = self.get_fs(fs_name)
320 if fs:
321 return fs['mdsmap']['metadata_pool']
322 return None
323
324 def get_all_filesystems(self) -> List[str]:
325 fs_list: List[str] = []
326 fs_map = self.mgr.get('fs_map')
327 if fs_map['filesystems']:
328 for fs in fs_map['filesystems']:
329 fs_list.append(fs['mdsmap']['fs_name'])
330 return fs_list
331
332
333
334 @contextlib.contextmanager
335 def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCephFS", None, None]:
336 """
337 Open a volume with shared access.
338 This API is to be used as a context manager.
339
340 :param fsc: cephfs client instance
341 :param fs_name: fs name
342 :return: yields a fs handle (ceph filesystem handle)
343 """
344 if fsc.is_stopping():
345 raise CephfsConnectionException(-errno.ESHUTDOWN,
346 "shutdown in progress")
347
348 fs_handle = fsc.connection_pool.get_fs_handle(fs_name)
349 try:
350 yield fs_handle
351 finally:
352 fsc.connection_pool.put_fs_handle(fs_name, fs_handle)
353
354
355 def colorize(msg: str, color: int, dark: bool = False) -> str:
356 """
357 Decorate `msg` with escape sequences to give the requested color
358 """
359 return (COLOR_DARK_SEQ if dark else COLOR_SEQ) % (30 + color) \
360 + msg + RESET_SEQ
361
362
363 def bold(msg: str) -> str:
364 """
365 Decorate `msg` with escape sequences to make it appear bold
366 """
367 return BOLD_SEQ + msg + RESET_SEQ
368
369
370 def format_units(n: int, width: int, colored: bool, decimal: bool) -> str:
371 """
372 Format a number without units, so as to fit into `width` characters, substituting
373 an appropriate unit suffix.
374
375 Use decimal for dimensionless things, use base 2 (decimal=False) for byte sizes/rates.
376 """
377
378 factor = 1000 if decimal else 1024
379 units = [' ', 'k', 'M', 'G', 'T', 'P', 'E']
380 unit = 0
381 while len("%s" % (int(n) // (factor**unit))) > width - 1:
382 unit += 1
383
384 if unit > 0:
385 truncated_float = ("%f" % (n / (float(factor) ** unit)))[0:width - 1]
386 if truncated_float[-1] == '.':
387 truncated_float = " " + truncated_float[0:-1]
388 else:
389 truncated_float = "%{wid}d".format(wid=width - 1) % n
390 formatted = "%s%s" % (truncated_float, units[unit])
391
392 if colored:
393 if n == 0:
394 color = BLACK, False
395 else:
396 color = YELLOW, False
397 return bold(colorize(formatted[0:-1], color[0], color[1])) \
398 + bold(colorize(formatted[-1], YELLOW, False))
399 else:
400 return formatted
401
402
403 def format_dimless(n: int, width: int, colored: bool = False) -> str:
404 return format_units(n, width, colored, decimal=True)
405
406
407 def format_bytes(n: int, width: int, colored: bool = False) -> str:
408 return format_units(n, width, colored, decimal=False)
409
410
411 def merge_dicts(*args: Dict[T, Any]) -> Dict[T, Any]:
412 """
413 >>> merge_dicts({1:2}, {3:4})
414 {1: 2, 3: 4}
415
416 You can also overwrite keys:
417 >>> merge_dicts({1:2}, {1:4})
418 {1: 4}
419
420 :rtype: dict[str, Any]
421 """
422 ret = {}
423 for arg in args:
424 ret.update(arg)
425 return ret
426
427
428 def get_default_addr():
429 # type: () -> str
430 def is_ipv6_enabled() -> bool:
431 try:
432 sock = socket.socket(socket.AF_INET6)
433 with contextlib.closing(sock):
434 sock.bind(("::1", 0))
435 return True
436 except (AttributeError, socket.error):
437 return False
438
439 try:
440 return get_default_addr.result # type: ignore
441 except AttributeError:
442 result = '::' if is_ipv6_enabled() else '0.0.0.0'
443 get_default_addr.result = result # type: ignore
444 return result
445
446
447 def build_url(host: str, scheme: Optional[str] = None, port: Optional[int] = None, path: str = '') -> str:
448 """
449 Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets
450 automatically.
451
452 >>> build_url('example.com', 'https', 443)
453 'https://example.com:443'
454
455 >>> build_url(host='example.com', port=443)
456 '//example.com:443'
457
458 >>> build_url('fce:9af7:a667:7286:4917:b8d3:34df:8373', port=80, scheme='http')
459 'http://[fce:9af7:a667:7286:4917:b8d3:34df:8373]:80'
460
461 >>> build_url('example.com', 'https', 443, path='/metrics')
462 'https://example.com:443/metrics'
463
464
465 :param scheme: The scheme, e.g. http, https or ftp.
466 :type scheme: str
467 :param host: Consisting of either a registered name (including but not limited to
468 a hostname) or an IP address.
469 :type host: str
470 :type port: int
471 :rtype: str
472 """
473 netloc = wrap_ipv6(host)
474 if port:
475 netloc += ':{}'.format(port)
476 pr = urllib.parse.ParseResult(
477 scheme=scheme if scheme else '',
478 netloc=netloc,
479 path=path,
480 params='',
481 query='',
482 fragment='')
483 return pr.geturl()
484
485
486 class ServerConfigException(Exception):
487 pass
488
489
490 def create_self_signed_cert(organisation: str = 'Ceph',
491 common_name: str = 'mgr',
492 dname: Optional[Dict[str, str]] = None) -> Tuple[str, str]:
493 """Returns self-signed PEM certificates valid for 10 years.
494
495 The optional dname parameter provides complete control of the cert/key
496 creation by supporting all valid RDNs via a dictionary. However, if dname
497 is not provided the default O and CN settings will be applied.
498
499 :param organisation: String representing the Organisation(O) RDN (default='Ceph')
500 :param common_name: String representing the Common Name(CN) RDN (default='mgr')
501 :param dname: Optional dictionary containing RDNs to use for crt/key generation
502
503 :return: ssl crt and key in utf-8 format
504
505 :raises ValueError: if the dname parameter received contains invalid RDNs
506
507 """
508
509 from OpenSSL import crypto
510 from uuid import uuid4
511
512 # RDN = Relative Distinguished Name
513 valid_RDN_list = ['C', 'ST', 'L', 'O', 'OU', 'CN', 'emailAddress']
514
515 # create a key pair
516 pkey = crypto.PKey()
517 pkey.generate_key(crypto.TYPE_RSA, 2048)
518
519 # Create a "subject" object
520 req = crypto.X509Req()
521 subj = req.get_subject()
522
523 if dname:
524 # dname received, so check it contains valid RDNs
525 if not all(field in valid_RDN_list for field in dname):
526 raise ValueError("Invalid DNAME received. Valid DNAME fields are {}".format(', '.join(valid_RDN_list)))
527 else:
528 dname = {"O": organisation, "CN": common_name}
529
530 # populate the subject with the dname settings
531 for k, v in dname.items():
532 setattr(subj, k, v)
533
534 # create a self-signed cert
535 cert = crypto.X509()
536 cert.set_subject(req.get_subject())
537 cert.set_serial_number(int(uuid4()))
538 cert.gmtime_adj_notBefore(0)
539 cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
540 cert.set_issuer(cert.get_subject())
541 cert.set_pubkey(pkey)
542 cert.sign(pkey, 'sha512')
543
544 cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
545 pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
546
547 return cert.decode('utf-8'), pkey.decode('utf-8')
548
549
550 def verify_cacrt_content(crt):
551 # type: (str) -> None
552 from OpenSSL import crypto
553 try:
554 x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
555 if x509.has_expired():
556 logger.warning('Certificate has expired: {}'.format(crt))
557 except (ValueError, crypto.Error) as e:
558 raise ServerConfigException(
559 'Invalid certificate: {}'.format(str(e)))
560
561
562 def verify_cacrt(cert_fname):
563 # type: (str) -> None
564 """Basic validation of a ca cert"""
565
566 if not cert_fname:
567 raise ServerConfigException("CA cert not configured")
568 if not os.path.isfile(cert_fname):
569 raise ServerConfigException("Certificate {} does not exist".format(cert_fname))
570
571 try:
572 with open(cert_fname) as f:
573 verify_cacrt_content(f.read())
574 except ValueError as e:
575 raise ServerConfigException(
576 'Invalid certificate {}: {}'.format(cert_fname, str(e)))
577
578
579 def verify_tls(crt, key):
580 # type: (str, str) -> None
581 verify_cacrt_content(crt)
582
583 from OpenSSL import crypto, SSL
584 try:
585 _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
586 _key.check()
587 except (ValueError, crypto.Error) as e:
588 raise ServerConfigException(
589 'Invalid private key: {}'.format(str(e)))
590 try:
591 _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
592 except ValueError as e:
593 raise ServerConfigException(
594 'Invalid certificate key: {}'.format(str(e))
595 )
596
597 try:
598 context = SSL.Context(SSL.TLSv1_METHOD)
599 context.use_certificate(_crt)
600 context.use_privatekey(_key)
601 context.check_privatekey()
602 except crypto.Error as e:
603 logger.warning(
604 'Private key and certificate do not match up: {}'.format(str(e)))
605
606
607 def verify_tls_files(cert_fname, pkey_fname):
608 # type: (str, str) -> None
609 """Basic checks for TLS certificate and key files
610
611 Do some validations to the private key and certificate:
612 - Check the type and format
613 - Check the certificate expiration date
614 - Check the consistency of the private key
615 - Check that the private key and certificate match up
616
617 :param cert_fname: Name of the certificate file
618 :param pkey_fname: name of the certificate public key file
619
620 :raises ServerConfigException: An error with a message
621
622 """
623
624 if not cert_fname or not pkey_fname:
625 raise ServerConfigException('no certificate configured')
626
627 verify_cacrt(cert_fname)
628
629 if not os.path.isfile(pkey_fname):
630 raise ServerConfigException('private key %s does not exist' % pkey_fname)
631
632 from OpenSSL import crypto, SSL
633
634 try:
635 with open(pkey_fname) as f:
636 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
637 pkey.check()
638 except (ValueError, crypto.Error) as e:
639 raise ServerConfigException(
640 'Invalid private key {}: {}'.format(pkey_fname, str(e)))
641 try:
642 context = SSL.Context(SSL.TLSv1_METHOD)
643 context.use_certificate_file(cert_fname, crypto.FILETYPE_PEM)
644 context.use_privatekey_file(pkey_fname, crypto.FILETYPE_PEM)
645 context.check_privatekey()
646 except crypto.Error as e:
647 logger.warning(
648 'Private key {} and certificate {} do not match up: {}'.format(
649 pkey_fname, cert_fname, str(e)))
650
651
652 def get_most_recent_rate(rates: Optional[List[Tuple[float, float]]]) -> float:
653 """ Get most recent rate from rates
654
655 :param rates: The derivative between all time series data points [time in seconds, value]
656 :type rates: list[tuple[int, float]]
657
658 :return: The last derivative or 0.0 if none exists
659 :rtype: float
660
661 >>> get_most_recent_rate(None)
662 0.0
663 >>> get_most_recent_rate([])
664 0.0
665 >>> get_most_recent_rate([(1, -2.0)])
666 -2.0
667 >>> get_most_recent_rate([(1, 2.0), (2, 1.5), (3, 5.0)])
668 5.0
669 """
670 if not rates:
671 return 0.0
672 return rates[-1][1]
673
674 def get_time_series_rates(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
675 """ Rates from time series data
676
677 :param data: Time series data [time in seconds, value]
678 :type data: list[tuple[int, float]]
679
680 :return: The derivative between all time series data points [time in seconds, value]
681 :rtype: list[tuple[int, float]]
682
683 >>> logger.debug = lambda s,x,y: print(s % (x,y))
684 >>> get_time_series_rates([])
685 []
686 >>> get_time_series_rates([[0, 1], [1, 3]])
687 [(1, 2.0)]
688 >>> get_time_series_rates([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
689 Duplicate timestamp in time series data: [0, 2], [0, 3]
690 Duplicate timestamp in time series data: [0, 3], [0, 1]
691 Duplicate timestamp in time series data: [1, 2], [1, 3]
692 [(1, 2.0)]
693 >>> get_time_series_rates([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
694 [(2, 2.0), (4, 4.0), (5, 5.0), (6, 6.0)]
695 """
696 data = _filter_time_series(data)
697 if not data:
698 return []
699 return [(data2[0], _derivative(data1, data2) if data1 is not None else 0.0) for data1, data2 in
700 _pairwise(data)]
701
702
703 def _filter_time_series(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
704 """ Filters time series data
705
706 Filters out samples with the same timestamp in given time series data.
707 It also enforces the list to contain at least two samples.
708
709 All filtered values will be shown in the debug log. If values were filtered it's a bug in the
710 time series data collector, please report it.
711
712 :param data: Time series data [time in seconds, value]
713 :type data: list[tuple[int, float]]
714
715 :return: Filtered time series data [time in seconds, value]
716 :rtype: list[tuple[int, float]]
717
718 >>> logger.debug = lambda s,x,y: print(s % (x,y))
719 >>> _filter_time_series([])
720 []
721 >>> _filter_time_series([[1, 42]])
722 []
723 >>> _filter_time_series([[10, 2], [10, 3]])
724 Duplicate timestamp in time series data: [10, 2], [10, 3]
725 []
726 >>> _filter_time_series([[0, 1], [1, 2]])
727 [[0, 1], [1, 2]]
728 >>> _filter_time_series([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
729 Duplicate timestamp in time series data: [0, 2], [0, 3]
730 Duplicate timestamp in time series data: [0, 3], [0, 1]
731 Duplicate timestamp in time series data: [1, 2], [1, 3]
732 [[0, 1], [1, 3]]
733 >>> _filter_time_series([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
734 [[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]]
735 """
736 filtered = []
737 for i in range(len(data) - 1):
738 if data[i][0] == data[i + 1][0]: # Same timestamp
739 logger.debug("Duplicate timestamp in time series data: %s, %s", data[i], data[i + 1])
740 continue
741 filtered.append(data[i])
742 if not filtered:
743 return []
744 filtered.append(data[-1])
745 return filtered
746
747
748 def _derivative(p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
749 """ Derivative between two time series data points
750
751 :param p1: Time series data [time in seconds, value]
752 :type p1: tuple[int, float]
753 :param p2: Time series data [time in seconds, value]
754 :type p2: tuple[int, float]
755
756 :return: Derivative between both points
757 :rtype: float
758
759 >>> _derivative([0, 0], [2, 1])
760 0.5
761 >>> _derivative([0, 1], [2, 0])
762 -0.5
763 >>> _derivative([0, 0], [3, 1])
764 0.3333333333333333
765 """
766 return (p2[1] - p1[1]) / float(p2[0] - p1[0])
767
768
769 def _pairwise(iterable: Iterable[T]) -> Generator[Tuple[Optional[T], T], None, None]:
770 it = iter(iterable)
771 a = next(it, None)
772
773 for b in it:
774 yield (a, b)
775 a = b
776
777
778 def to_pretty_timedelta(n: datetime.timedelta) -> str:
779 if n < datetime.timedelta(seconds=120):
780 return str(n.seconds) + 's'
781 if n < datetime.timedelta(minutes=120):
782 return str(n.seconds // 60) + 'm'
783 if n < datetime.timedelta(hours=48):
784 return str(n.seconds // 3600) + 'h'
785 if n < datetime.timedelta(days=14):
786 return str(n.days) + 'd'
787 if n < datetime.timedelta(days=7*12):
788 return str(n.days // 7) + 'w'
789 if n < datetime.timedelta(days=365*2):
790 return str(n.days // 30) + 'M'
791 return str(n.days // 365) + 'y'
792
793
794 def profile_method(skip_attribute: bool = False) -> Callable[[Callable[..., T]], Callable[..., T]]:
795 """
796 Decorator for methods of the Module class. Logs the name of the given
797 function f with the time it takes to execute it.
798 """
799 def outer(f: Callable[..., T]) -> Callable[..., T]:
800 @wraps(f)
801 def wrapper(*args: Any, **kwargs: Any) -> T:
802 self = args[0]
803 t = time.time()
804 self.log.debug('Starting method {}.'.format(f.__name__))
805 result = f(*args, **kwargs)
806 duration = time.time() - t
807 if not skip_attribute:
808 wrapper._execution_duration = duration # type: ignore
809 self.log.debug('Method {} ran {:.3f} seconds.'.format(f.__name__, duration))
810 return result
811 return wrapper
812 return outer