]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_util.py
1a86967cb4a32c797827367fac96132dd855be75
[ceph.git] / ceph / src / pybind / mgr / mgr_util.py
1 import os
2
3 if 'UNITTEST' in os.environ:
4 import tests
5
6 import cephfs
7 import contextlib
8 import datetime
9 import errno
10 import socket
11 import time
12 import logging
13 import sys
14 from threading import Lock, Condition, Event
15 from typing import no_type_check
16 import urllib
17 from functools import wraps
18 if sys.version_info >= (3, 3):
19 from threading import Timer
20 else:
21 from threading import _Timer as Timer
22
23 from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator
24
25 from ceph.deployment.utils import wrap_ipv6
26
27 T = TypeVar('T')
28
29 if TYPE_CHECKING:
30 from mgr_module import MgrModule
31
32 Module_T = TypeVar('Module_T', bound="MgrModule")
33
34 (
35 BLACK,
36 RED,
37 GREEN,
38 YELLOW,
39 BLUE,
40 MAGENTA,
41 CYAN,
42 GRAY
43 ) = range(8)
44
45 RESET_SEQ = "\033[0m"
46 COLOR_SEQ = "\033[1;%dm"
47 COLOR_DARK_SEQ = "\033[0;%dm"
48 BOLD_SEQ = "\033[1m"
49 UNDERLINE_SEQ = "\033[4m"
50
51 logger = logging.getLogger(__name__)
52
53
54 class CephfsConnectionException(Exception):
55 def __init__(self, error_code: int, error_message: str):
56 self.errno = error_code
57 self.error_str = error_message
58
59 def to_tuple(self) -> Tuple[int, str, str]:
60 return self.errno, "", self.error_str
61
62 def __str__(self) -> str:
63 return "{0} ({1})".format(self.errno, self.error_str)
64
65 class RTimer(Timer):
66 """
67 recurring timer variant of Timer
68 """
69 @no_type_check
70 def run(self):
71 try:
72 while not self.finished.is_set():
73 self.finished.wait(self.interval)
74 self.function(*self.args, **self.kwargs)
75 self.finished.set()
76 except Exception as e:
77 logger.error("task exception: %s", e)
78 raise
79
80 @contextlib.contextmanager
81 def lock_timeout_log(lock: Lock, timeout: int = 5) -> Iterator[None]:
82 start = time.time()
83 WARN_AFTER = 30
84 warned = False
85 while True:
86 logger.debug("locking {} with {} timeout".format(lock, timeout))
87 if lock.acquire(timeout=timeout):
88 logger.debug("locked {}".format(lock))
89 yield
90 lock.release()
91 break
92 now = time.time()
93 if not warned and now - start > WARN_AFTER:
94 logger.info("possible deadlock acquiring {}".format(lock))
95 warned = True
96
97
98 class CephfsConnectionPool(object):
99 class Connection(object):
100 def __init__(self, mgr: Module_T, fs_name: str):
101 self.fs: Optional["cephfs.LibCephFS"] = None
102 self.mgr = mgr
103 self.fs_name = fs_name
104 self.ops_in_progress = 0
105 self.last_used = time.time()
106 self.fs_id = self.get_fs_id()
107
108 def get_fs_id(self) -> int:
109 fs_map = self.mgr.get('fs_map')
110 for fs in fs_map['filesystems']:
111 if fs['mdsmap']['fs_name'] == self.fs_name:
112 return fs['id']
113 raise CephfsConnectionException(
114 -errno.ENOENT, "FS '{0}' not found".format(self.fs_name))
115
116 def get_fs_handle(self) -> "cephfs.LibCephFS":
117 self.last_used = time.time()
118 self.ops_in_progress += 1
119 return self.fs
120
121 def put_fs_handle(self, notify: Callable) -> None:
122 assert self.ops_in_progress > 0
123 self.ops_in_progress -= 1
124 if self.ops_in_progress == 0:
125 notify()
126
127 def del_fs_handle(self, waiter: Optional[Callable]) -> None:
128 if waiter:
129 while self.ops_in_progress != 0:
130 waiter()
131 if self.is_connection_valid():
132 self.disconnect()
133 else:
134 self.abort()
135
136 def is_connection_valid(self) -> bool:
137 fs_id = None
138 try:
139 fs_id = self.get_fs_id()
140 except:
141 # the filesystem does not exist now -- connection is not valid.
142 pass
143 logger.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
144 return self.fs_id == fs_id
145
146 def is_connection_idle(self, timeout: float) -> bool:
147 return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
148
149 def connect(self) -> None:
150 assert self.ops_in_progress == 0
151 logger.debug("Connecting to cephfs '{0}'".format(self.fs_name))
152 self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
153 logger.debug("Setting user ID and group ID of CephFS mount as root...")
154 self.fs.conf_set("client_mount_uid", "0")
155 self.fs.conf_set("client_mount_gid", "0")
156 self.fs.conf_set("client_check_pool_perm", "false")
157 self.fs.conf_set("client_quota", "false")
158 logger.debug("CephFS initializing...")
159 self.fs.init()
160 logger.debug("CephFS mounting...")
161 self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
162 logger.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
163 self.mgr._ceph_register_client(self.fs.get_addrs())
164
165 def disconnect(self) -> None:
166 try:
167 assert self.fs
168 assert self.ops_in_progress == 0
169 logger.info("disconnecting from cephfs '{0}'".format(self.fs_name))
170 addrs = self.fs.get_addrs()
171 self.fs.shutdown()
172 self.mgr._ceph_unregister_client(addrs)
173 self.fs = None
174 except Exception as e:
175 logger.debug("disconnect: ({0})".format(e))
176 raise
177
178 def abort(self) -> None:
179 assert self.fs
180 assert self.ops_in_progress == 0
181 logger.info("aborting connection from cephfs '{0}'".format(self.fs_name))
182 self.fs.abort_conn()
183 logger.info("abort done from cephfs '{0}'".format(self.fs_name))
184 self.fs = None
185
186 # TODO: make this configurable
187 TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
188 CONNECTION_IDLE_INTERVAL = 60.0 # seconds
189 MAX_CONCURRENT_CONNECTIONS = 5 # max number of concurrent connections per volume
190
191 def __init__(self, mgr: Module_T):
192 self.mgr = mgr
193 self.connections: Dict[str, List[CephfsConnectionPool.Connection]] = {}
194 self.lock = Lock()
195 self.cond = Condition(self.lock)
196 self.timer_task = RTimer(CephfsConnectionPool.TIMER_TASK_RUN_INTERVAL,
197 self.cleanup_connections)
198 self.timer_task.start()
199
200 def cleanup_connections(self) -> None:
201 with self.lock:
202 logger.info("scanning for idle connections..")
203 idle_conns = []
204 for fs_name, connections in self.connections.items():
205 logger.debug(f'fs_name ({fs_name}) connections ({connections})')
206 for connection in connections:
207 if connection.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL):
208 idle_conns.append((fs_name, connection))
209 logger.info(f'cleaning up connections: {idle_conns}')
210 for idle_conn in idle_conns:
211 self._del_connection(idle_conn[0], idle_conn[1])
212
213 def get_fs_handle(self, fs_name: str) -> "cephfs.LibCephFS":
214 with self.lock:
215 try:
216 min_shared = 0
217 shared_connection = None
218 connections = self.connections.setdefault(fs_name, [])
219 logger.debug(f'[get] volume: ({fs_name}) connection: ({connections})')
220 if connections:
221 min_shared = connections[0].ops_in_progress
222 shared_connection = connections[0]
223 for connection in list(connections):
224 logger.debug(f'[get] connection: {connection} usage: {connection.ops_in_progress}')
225 if connection.ops_in_progress == 0:
226 if connection.is_connection_valid():
227 logger.debug(f'[get] connection ({connection}) can be reused')
228 return connection.get_fs_handle()
229 else:
230 # filesystem id changed beneath us (or the filesystem does not exist).
231 # this is possible if the filesystem got removed (and recreated with
232 # same name) via "ceph fs rm/new" mon command.
233 logger.warning(f'[get] filesystem id changed for volume ({fs_name}), disconnecting ({connection})')
234 # note -- this will mutate @connections too
235 self._del_connection(fs_name, connection)
236 else:
237 if connection.ops_in_progress < min_shared:
238 min_shared = connection.ops_in_progress
239 shared_connection = connection
240 # when we end up here, there are no "free" connections. so either spin up a new
241 # one or share it.
242 if len(connections) < CephfsConnectionPool.MAX_CONCURRENT_CONNECTIONS:
243 logger.debug('[get] spawning new connection since no connection is unused and we still have room for more')
244 connection = CephfsConnectionPool.Connection(self.mgr, fs_name)
245 connection.connect()
246 self.connections[fs_name].append(connection)
247 return connection.get_fs_handle()
248 else:
249 assert shared_connection is not None
250 logger.debug(f'[get] using shared connection ({shared_connection})')
251 return shared_connection.get_fs_handle()
252 except cephfs.Error as e:
253 # try to provide a better error string if possible
254 if e.args[0] == errno.ENOENT:
255 raise CephfsConnectionException(
256 -errno.ENOENT, "FS '{0}' not found".format(fs_name))
257 raise CephfsConnectionException(-e.args[0], e.args[1])
258
259 def put_fs_handle(self, fs_name: str, fs_handle: cephfs.LibCephFS) -> None:
260 with self.lock:
261 connections = self.connections.get(fs_name, [])
262 for connection in connections:
263 if connection.fs == fs_handle:
264 logger.debug(f'[put] connection: {connection} usage: {connection.ops_in_progress}')
265 connection.put_fs_handle(notify=lambda: self.cond.notifyAll())
266
267 def _del_connection(self, fs_name: str, connection: Connection, wait: bool = False) -> None:
268 self.connections[fs_name].remove(connection)
269 connection.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
270
271 def _del_connections(self, fs_name: str, wait: bool = False) -> None:
272 for connection in list(self.connections.get(fs_name, [])):
273 self._del_connection(fs_name, connection, wait)
274
275 def del_connections(self, fs_name: str, wait: bool = False) -> None:
276 with self.lock:
277 self._del_connections(fs_name, wait)
278
279 def del_all_connections(self) -> None:
280 with self.lock:
281 for fs_name in list(self.connections.keys()):
282 logger.info("waiting for pending ops for '{}'".format(fs_name))
283 self._del_connections(fs_name, wait=True)
284 logger.info("pending ops completed for '{}'".format(fs_name))
285 # no new connections should have been initialized since its
286 # guarded on shutdown.
287 assert len(self.connections) == 0
288
289
290 class CephfsClient(Generic[Module_T]):
291 def __init__(self, mgr: Module_T):
292 self.mgr = mgr
293 self.stopping = Event()
294 self.connection_pool = CephfsConnectionPool(self.mgr)
295
296 def is_stopping(self) -> bool:
297 return self.stopping.is_set()
298
299 def shutdown(self) -> None:
300 logger.info("shutting down")
301 # first, note that we're shutting down
302 self.stopping.set()
303 # second, delete all libcephfs handles from connection pool
304 self.connection_pool.del_all_connections()
305
306 def get_fs(self, fs_name: str) -> Optional["cephfs.LibCephFS"]:
307 fs_map = self.mgr.get('fs_map')
308 for fs in fs_map['filesystems']:
309 if fs['mdsmap']['fs_name'] == fs_name:
310 return fs
311 return None
312
313 def get_mds_names(self, fs_name: str) -> List[str]:
314 fs = self.get_fs(fs_name)
315 if fs is None:
316 return []
317 return [mds['name'] for mds in fs['mdsmap']['info'].values()]
318
319 def get_metadata_pool(self, fs_name: str) -> Optional[str]:
320 fs = self.get_fs(fs_name)
321 if fs:
322 return fs['mdsmap']['metadata_pool']
323 return None
324
325 def get_all_filesystems(self) -> List[str]:
326 fs_list: List[str] = []
327 fs_map = self.mgr.get('fs_map')
328 if fs_map['filesystems']:
329 for fs in fs_map['filesystems']:
330 fs_list.append(fs['mdsmap']['fs_name'])
331 return fs_list
332
333
334
335 @contextlib.contextmanager
336 def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCephFS", None, None]:
337 """
338 Open a volume with shared access.
339 This API is to be used as a context manager.
340
341 :param fsc: cephfs client instance
342 :param fs_name: fs name
343 :return: yields a fs handle (ceph filesystem handle)
344 """
345 if fsc.is_stopping():
346 raise CephfsConnectionException(-errno.ESHUTDOWN,
347 "shutdown in progress")
348
349 fs_handle = fsc.connection_pool.get_fs_handle(fs_name)
350 try:
351 yield fs_handle
352 finally:
353 fsc.connection_pool.put_fs_handle(fs_name, fs_handle)
354
355
356 def colorize(msg: str, color: int, dark: bool = False) -> str:
357 """
358 Decorate `msg` with escape sequences to give the requested color
359 """
360 return (COLOR_DARK_SEQ if dark else COLOR_SEQ) % (30 + color) \
361 + msg + RESET_SEQ
362
363
364 def bold(msg: str) -> str:
365 """
366 Decorate `msg` with escape sequences to make it appear bold
367 """
368 return BOLD_SEQ + msg + RESET_SEQ
369
370
371 def format_units(n: int, width: int, colored: bool, decimal: bool) -> str:
372 """
373 Format a number without units, so as to fit into `width` characters, substituting
374 an appropriate unit suffix.
375
376 Use decimal for dimensionless things, use base 2 (decimal=False) for byte sizes/rates.
377 """
378
379 factor = 1000 if decimal else 1024
380 units = [' ', 'k', 'M', 'G', 'T', 'P', 'E']
381 unit = 0
382 while len("%s" % (int(n) // (factor**unit))) > width - 1:
383 unit += 1
384
385 if unit > 0:
386 truncated_float = ("%f" % (n / (float(factor) ** unit)))[0:width - 1]
387 if truncated_float[-1] == '.':
388 truncated_float = " " + truncated_float[0:-1]
389 else:
390 truncated_float = "%{wid}d".format(wid=width - 1) % n
391 formatted = "%s%s" % (truncated_float, units[unit])
392
393 if colored:
394 if n == 0:
395 color = BLACK, False
396 else:
397 color = YELLOW, False
398 return bold(colorize(formatted[0:-1], color[0], color[1])) \
399 + bold(colorize(formatted[-1], YELLOW, False))
400 else:
401 return formatted
402
403
404 def format_dimless(n: int, width: int, colored: bool = False) -> str:
405 return format_units(n, width, colored, decimal=True)
406
407
408 def format_bytes(n: int, width: int, colored: bool = False) -> str:
409 return format_units(n, width, colored, decimal=False)
410
411
412 def merge_dicts(*args: Dict[T, Any]) -> Dict[T, Any]:
413 """
414 >>> merge_dicts({1:2}, {3:4})
415 {1: 2, 3: 4}
416
417 You can also overwrite keys:
418 >>> merge_dicts({1:2}, {1:4})
419 {1: 4}
420
421 :rtype: dict[str, Any]
422 """
423 ret = {}
424 for arg in args:
425 ret.update(arg)
426 return ret
427
428
429 def get_default_addr():
430 # type: () -> str
431 def is_ipv6_enabled() -> bool:
432 try:
433 sock = socket.socket(socket.AF_INET6)
434 with contextlib.closing(sock):
435 sock.bind(("::1", 0))
436 return True
437 except (AttributeError, socket.error):
438 return False
439
440 try:
441 return get_default_addr.result # type: ignore
442 except AttributeError:
443 result = '::' if is_ipv6_enabled() else '0.0.0.0'
444 get_default_addr.result = result # type: ignore
445 return result
446
447
448 def build_url(host: str, scheme: Optional[str] = None, port: Optional[int] = None, path: str = '') -> str:
449 """
450 Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets
451 automatically.
452
453 >>> build_url('example.com', 'https', 443)
454 'https://example.com:443'
455
456 >>> build_url(host='example.com', port=443)
457 '//example.com:443'
458
459 >>> build_url('fce:9af7:a667:7286:4917:b8d3:34df:8373', port=80, scheme='http')
460 'http://[fce:9af7:a667:7286:4917:b8d3:34df:8373]:80'
461
462 >>> build_url('example.com', 'https', 443, path='/metrics')
463 'https://example.com:443/metrics'
464
465
466 :param scheme: The scheme, e.g. http, https or ftp.
467 :type scheme: str
468 :param host: Consisting of either a registered name (including but not limited to
469 a hostname) or an IP address.
470 :type host: str
471 :type port: int
472 :rtype: str
473 """
474 netloc = wrap_ipv6(host)
475 if port:
476 netloc += ':{}'.format(port)
477 pr = urllib.parse.ParseResult(
478 scheme=scheme if scheme else '',
479 netloc=netloc,
480 path=path,
481 params='',
482 query='',
483 fragment='')
484 return pr.geturl()
485
486
487 class ServerConfigException(Exception):
488 pass
489
490
491 def create_self_signed_cert(organisation: str = 'Ceph',
492 common_name: str = 'mgr',
493 dname: Optional[Dict[str, str]] = None) -> Tuple[str, str]:
494 """Returns self-signed PEM certificates valid for 10 years.
495
496 The optional dname parameter provides complete control of the cert/key
497 creation by supporting all valid RDNs via a dictionary. However, if dname
498 is not provided the default O and CN settings will be applied.
499
500 :param organisation: String representing the Organisation(O) RDN (default='Ceph')
501 :param common_name: String representing the Common Name(CN) RDN (default='mgr')
502 :param dname: Optional dictionary containing RDNs to use for crt/key generation
503
504 :return: ssl crt and key in utf-8 format
505
506 :raises ValueError: if the dname parameter received contains invalid RDNs
507
508 """
509
510 from OpenSSL import crypto
511 from uuid import uuid4
512
513 # RDN = Relative Distinguished Name
514 valid_RDN_list = ['C', 'ST', 'L', 'O', 'OU', 'CN', 'emailAddress']
515
516 # create a key pair
517 pkey = crypto.PKey()
518 pkey.generate_key(crypto.TYPE_RSA, 2048)
519
520 # Create a "subject" object
521 req = crypto.X509Req()
522 subj = req.get_subject()
523
524 if dname:
525 # dname received, so check it contains valid RDNs
526 if not all(field in valid_RDN_list for field in dname):
527 raise ValueError("Invalid DNAME received. Valid DNAME fields are {}".format(', '.join(valid_RDN_list)))
528 else:
529 dname = {"O": organisation, "CN": common_name}
530
531 # populate the subject with the dname settings
532 for k, v in dname.items():
533 setattr(subj, k, v)
534
535 # create a self-signed cert
536 cert = crypto.X509()
537 cert.set_subject(req.get_subject())
538 cert.set_serial_number(int(uuid4()))
539 cert.gmtime_adj_notBefore(0)
540 cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
541 cert.set_issuer(cert.get_subject())
542 cert.set_pubkey(pkey)
543 cert.sign(pkey, 'sha512')
544
545 cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
546 pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
547
548 return cert.decode('utf-8'), pkey.decode('utf-8')
549
550
551 def verify_cacrt_content(crt):
552 # type: (str) -> None
553 from OpenSSL import crypto
554 try:
555 x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
556 if x509.has_expired():
557 logger.warning('Certificate has expired: {}'.format(crt))
558 except (ValueError, crypto.Error) as e:
559 raise ServerConfigException(
560 'Invalid certificate: {}'.format(str(e)))
561
562
563 def verify_cacrt(cert_fname):
564 # type: (str) -> None
565 """Basic validation of a ca cert"""
566
567 if not cert_fname:
568 raise ServerConfigException("CA cert not configured")
569 if not os.path.isfile(cert_fname):
570 raise ServerConfigException("Certificate {} does not exist".format(cert_fname))
571
572 try:
573 with open(cert_fname) as f:
574 verify_cacrt_content(f.read())
575 except ValueError as e:
576 raise ServerConfigException(
577 'Invalid certificate {}: {}'.format(cert_fname, str(e)))
578
579
580 def verify_tls(crt, key):
581 # type: (str, str) -> None
582 verify_cacrt_content(crt)
583
584 from OpenSSL import crypto, SSL
585 try:
586 _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
587 _key.check()
588 except (ValueError, crypto.Error) as e:
589 raise ServerConfigException(
590 'Invalid private key: {}'.format(str(e)))
591 try:
592 _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
593 except ValueError as e:
594 raise ServerConfigException(
595 'Invalid certificate key: {}'.format(str(e))
596 )
597
598 try:
599 context = SSL.Context(SSL.TLSv1_METHOD)
600 context.use_certificate(_crt)
601 context.use_privatekey(_key)
602 context.check_privatekey()
603 except crypto.Error as e:
604 logger.warning(
605 'Private key and certificate do not match up: {}'.format(str(e)))
606
607
608 def verify_tls_files(cert_fname, pkey_fname):
609 # type: (str, str) -> None
610 """Basic checks for TLS certificate and key files
611
612 Do some validations to the private key and certificate:
613 - Check the type and format
614 - Check the certificate expiration date
615 - Check the consistency of the private key
616 - Check that the private key and certificate match up
617
618 :param cert_fname: Name of the certificate file
619 :param pkey_fname: name of the certificate public key file
620
621 :raises ServerConfigException: An error with a message
622
623 """
624
625 if not cert_fname or not pkey_fname:
626 raise ServerConfigException('no certificate configured')
627
628 verify_cacrt(cert_fname)
629
630 if not os.path.isfile(pkey_fname):
631 raise ServerConfigException('private key %s does not exist' % pkey_fname)
632
633 from OpenSSL import crypto, SSL
634
635 try:
636 with open(pkey_fname) as f:
637 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
638 pkey.check()
639 except (ValueError, crypto.Error) as e:
640 raise ServerConfigException(
641 'Invalid private key {}: {}'.format(pkey_fname, str(e)))
642 try:
643 context = SSL.Context(SSL.TLSv1_METHOD)
644 context.use_certificate_file(cert_fname, crypto.FILETYPE_PEM)
645 context.use_privatekey_file(pkey_fname, crypto.FILETYPE_PEM)
646 context.check_privatekey()
647 except crypto.Error as e:
648 logger.warning(
649 'Private key {} and certificate {} do not match up: {}'.format(
650 pkey_fname, cert_fname, str(e)))
651
652
653 def get_most_recent_rate(rates: Optional[List[Tuple[float, float]]]) -> float:
654 """ Get most recent rate from rates
655
656 :param rates: The derivative between all time series data points [time in seconds, value]
657 :type rates: list[tuple[int, float]]
658
659 :return: The last derivative or 0.0 if none exists
660 :rtype: float
661
662 >>> get_most_recent_rate(None)
663 0.0
664 >>> get_most_recent_rate([])
665 0.0
666 >>> get_most_recent_rate([(1, -2.0)])
667 -2.0
668 >>> get_most_recent_rate([(1, 2.0), (2, 1.5), (3, 5.0)])
669 5.0
670 """
671 if not rates:
672 return 0.0
673 return rates[-1][1]
674
675 def get_time_series_rates(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
676 """ Rates from time series data
677
678 :param data: Time series data [time in seconds, value]
679 :type data: list[tuple[int, float]]
680
681 :return: The derivative between all time series data points [time in seconds, value]
682 :rtype: list[tuple[int, float]]
683
684 >>> logger.debug = lambda s,x,y: print(s % (x,y))
685 >>> get_time_series_rates([])
686 []
687 >>> get_time_series_rates([[0, 1], [1, 3]])
688 [(1, 2.0)]
689 >>> get_time_series_rates([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
690 Duplicate timestamp in time series data: [0, 2], [0, 3]
691 Duplicate timestamp in time series data: [0, 3], [0, 1]
692 Duplicate timestamp in time series data: [1, 2], [1, 3]
693 [(1, 2.0)]
694 >>> get_time_series_rates([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
695 [(2, 2.0), (4, 4.0), (5, 5.0), (6, 6.0)]
696 """
697 data = _filter_time_series(data)
698 if not data:
699 return []
700 return [(data2[0], _derivative(data1, data2) if data1 is not None else 0.0) for data1, data2 in
701 _pairwise(data)]
702
703
704 def _filter_time_series(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
705 """ Filters time series data
706
707 Filters out samples with the same timestamp in given time series data.
708 It also enforces the list to contain at least two samples.
709
710 All filtered values will be shown in the debug log. If values were filtered it's a bug in the
711 time series data collector, please report it.
712
713 :param data: Time series data [time in seconds, value]
714 :type data: list[tuple[int, float]]
715
716 :return: Filtered time series data [time in seconds, value]
717 :rtype: list[tuple[int, float]]
718
719 >>> logger.debug = lambda s,x,y: print(s % (x,y))
720 >>> _filter_time_series([])
721 []
722 >>> _filter_time_series([[1, 42]])
723 []
724 >>> _filter_time_series([[10, 2], [10, 3]])
725 Duplicate timestamp in time series data: [10, 2], [10, 3]
726 []
727 >>> _filter_time_series([[0, 1], [1, 2]])
728 [[0, 1], [1, 2]]
729 >>> _filter_time_series([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
730 Duplicate timestamp in time series data: [0, 2], [0, 3]
731 Duplicate timestamp in time series data: [0, 3], [0, 1]
732 Duplicate timestamp in time series data: [1, 2], [1, 3]
733 [[0, 1], [1, 3]]
734 >>> _filter_time_series([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
735 [[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]]
736 """
737 filtered = []
738 for i in range(len(data) - 1):
739 if data[i][0] == data[i + 1][0]: # Same timestamp
740 logger.debug("Duplicate timestamp in time series data: %s, %s", data[i], data[i + 1])
741 continue
742 filtered.append(data[i])
743 if not filtered:
744 return []
745 filtered.append(data[-1])
746 return filtered
747
748
749 def _derivative(p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
750 """ Derivative between two time series data points
751
752 :param p1: Time series data [time in seconds, value]
753 :type p1: tuple[int, float]
754 :param p2: Time series data [time in seconds, value]
755 :type p2: tuple[int, float]
756
757 :return: Derivative between both points
758 :rtype: float
759
760 >>> _derivative([0, 0], [2, 1])
761 0.5
762 >>> _derivative([0, 1], [2, 0])
763 -0.5
764 >>> _derivative([0, 0], [3, 1])
765 0.3333333333333333
766 """
767 return (p2[1] - p1[1]) / float(p2[0] - p1[0])
768
769
770 def _pairwise(iterable: Iterable[T]) -> Generator[Tuple[Optional[T], T], None, None]:
771 it = iter(iterable)
772 a = next(it, None)
773
774 for b in it:
775 yield (a, b)
776 a = b
777
778
779 def to_pretty_timedelta(n: datetime.timedelta) -> str:
780 if n < datetime.timedelta(seconds=120):
781 return str(n.seconds) + 's'
782 if n < datetime.timedelta(minutes=120):
783 return str(n.seconds // 60) + 'm'
784 if n < datetime.timedelta(hours=48):
785 return str(n.seconds // 3600) + 'h'
786 if n < datetime.timedelta(days=14):
787 return str(n.days) + 'd'
788 if n < datetime.timedelta(days=7*12):
789 return str(n.days // 7) + 'w'
790 if n < datetime.timedelta(days=365*2):
791 return str(n.days // 30) + 'M'
792 return str(n.days // 365) + 'y'
793
794
795 def profile_method(skip_attribute: bool = False) -> Callable[[Callable[..., T]], Callable[..., T]]:
796 """
797 Decorator for methods of the Module class. Logs the name of the given
798 function f with the time it takes to execute it.
799 """
800 def outer(f: Callable[..., T]) -> Callable[..., T]:
801 @wraps(f)
802 def wrapper(*args: Any, **kwargs: Any) -> T:
803 self = args[0]
804 t = time.time()
805 self.log.debug('Starting method {}.'.format(f.__name__))
806 result = f(*args, **kwargs)
807 duration = time.time() - t
808 if not skip_attribute:
809 wrapper._execution_duration = duration # type: ignore
810 self.log.debug('Method {} ran {:.3f} seconds.'.format(f.__name__, duration))
811 return result
812 return wrapper
813 return outer