]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/mgr_util.py
import ceph pacific 16.2.5
[ceph.git] / ceph / src / pybind / mgr / mgr_util.py
CommitLineData
f67539c2
TL
1import os
2
3if 'UNITTEST' in os.environ:
4 import tests
5
6import cephfs
494da23a 7import contextlib
9f95a23c 8import datetime
f67539c2 9import errno
494da23a 10import socket
f6b5b4d7 11import time
f67539c2
TL
12import logging
13import sys
14from threading import Lock, Condition, Event
15from typing import no_type_check
f6b5b4d7 16from functools import wraps
f67539c2
TL
17if sys.version_info >= (3, 3):
18 from threading import Timer
19else:
20 from threading import _Timer as Timer
21
22from typing import Tuple, Any, Callable, Optional, Dict, TYPE_CHECKING, TypeVar, List, Iterable, Generator, Generic, Iterator
23T = TypeVar('T')
11fdf7f2 24
f67539c2
TL
25if TYPE_CHECKING:
26 from mgr_module import MgrModule
27
28Module_T = TypeVar('Module_T', bound="MgrModule")
9f95a23c 29
11fdf7f2
TL
30(
31 BLACK,
32 RED,
33 GREEN,
34 YELLOW,
35 BLUE,
36 MAGENTA,
37 CYAN,
38 GRAY
39) = range(8)
40
41RESET_SEQ = "\033[0m"
42COLOR_SEQ = "\033[1;%dm"
43COLOR_DARK_SEQ = "\033[0;%dm"
44BOLD_SEQ = "\033[1m"
45UNDERLINE_SEQ = "\033[4m"
46
eafe8130
TL
47logger = logging.getLogger(__name__)
48
11fdf7f2 49
f67539c2
TL
50class CephfsConnectionException(Exception):
51 def __init__(self, error_code: int, error_message: str):
52 self.errno = error_code
53 self.error_str = error_message
54
55 def to_tuple(self) -> Tuple[int, str, str]:
56 return self.errno, "", self.error_str
57
58 def __str__(self) -> str:
59 return "{0} ({1})".format(self.errno, self.error_str)
60
61class RTimer(Timer):
62 """
63 recurring timer variant of Timer
64 """
65 @no_type_check
66 def run(self):
67 try:
68 while not self.finished.is_set():
69 self.finished.wait(self.interval)
70 self.function(*self.args, **self.kwargs)
71 self.finished.set()
72 except Exception as e:
73 logger.error("task exception: %s", e)
74 raise
75
76@contextlib.contextmanager
77def lock_timeout_log(lock: Lock, timeout: int = 5) -> Iterator[None]:
78 start = time.time()
79 WARN_AFTER = 30
80 warned = False
81 while True:
82 logger.debug("locking {} with {} timeout".format(lock, timeout))
83 if lock.acquire(timeout=timeout):
84 logger.debug("locked {}".format(lock))
85 yield
86 lock.release()
87 break
88 now = time.time()
89 if not warned and now - start > WARN_AFTER:
90 logger.info("possible deadlock acquiring {}".format(lock))
91 warned = True
92
93
94class CephfsConnectionPool(object):
95 class Connection(object):
96 def __init__(self, mgr: Module_T, fs_name: str):
97 self.fs: Optional["cephfs.LibCephFS"] = None
98 self.mgr = mgr
99 self.fs_name = fs_name
100 self.ops_in_progress = 0
101 self.last_used = time.time()
102 self.fs_id = self.get_fs_id()
103
104 def get_fs_id(self) -> int:
105 fs_map = self.mgr.get('fs_map')
106 for fs in fs_map['filesystems']:
107 if fs['mdsmap']['fs_name'] == self.fs_name:
108 return fs['id']
109 raise CephfsConnectionException(
110 -errno.ENOENT, "FS '{0}' not found".format(self.fs_name))
111
112 def get_fs_handle(self) -> "cephfs.LibCephFS":
113 self.last_used = time.time()
114 self.ops_in_progress += 1
115 return self.fs
116
117 def put_fs_handle(self, notify: Callable) -> None:
118 assert self.ops_in_progress > 0
119 self.ops_in_progress -= 1
120 if self.ops_in_progress == 0:
121 notify()
122
123 def del_fs_handle(self, waiter: Optional[Callable]) -> None:
124 if waiter:
125 while self.ops_in_progress != 0:
126 waiter()
127 if self.is_connection_valid():
128 self.disconnect()
129 else:
130 self.abort()
131
132 def is_connection_valid(self) -> bool:
133 fs_id = None
134 try:
135 fs_id = self.get_fs_id()
136 except:
137 # the filesystem does not exist now -- connection is not valid.
138 pass
139 logger.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
140 return self.fs_id == fs_id
141
142 def is_connection_idle(self, timeout: float) -> bool:
143 return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
144
145 def connect(self) -> None:
146 assert self.ops_in_progress == 0
147 logger.debug("Connecting to cephfs '{0}'".format(self.fs_name))
148 self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
149 logger.debug("Setting user ID and group ID of CephFS mount as root...")
150 self.fs.conf_set("client_mount_uid", "0")
151 self.fs.conf_set("client_mount_gid", "0")
b3b6e05e 152 self.fs.conf_set("client_check_pool_perm", "false")
f67539c2
TL
153 logger.debug("CephFS initializing...")
154 self.fs.init()
155 logger.debug("CephFS mounting...")
156 self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
157 logger.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
158 self.mgr._ceph_register_client(self.fs.get_addrs())
159
160 def disconnect(self) -> None:
161 try:
162 assert self.fs
163 assert self.ops_in_progress == 0
164 logger.info("disconnecting from cephfs '{0}'".format(self.fs_name))
165 addrs = self.fs.get_addrs()
166 self.fs.shutdown()
167 self.mgr._ceph_unregister_client(addrs)
168 self.fs = None
169 except Exception as e:
170 logger.debug("disconnect: ({0})".format(e))
171 raise
172
173 def abort(self) -> None:
174 assert self.fs
175 assert self.ops_in_progress == 0
176 logger.info("aborting connection from cephfs '{0}'".format(self.fs_name))
177 self.fs.abort_conn()
178 logger.info("abort done from cephfs '{0}'".format(self.fs_name))
179 self.fs = None
180
181 # TODO: make this configurable
182 TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
183 CONNECTION_IDLE_INTERVAL = 60.0 # seconds
184
185 def __init__(self, mgr: Module_T):
186 self.mgr = mgr
187 self.connections: Dict[str, CephfsConnectionPool.Connection] = {}
188 self.lock = Lock()
189 self.cond = Condition(self.lock)
190 self.timer_task = RTimer(CephfsConnectionPool.TIMER_TASK_RUN_INTERVAL,
191 self.cleanup_connections)
192 self.timer_task.start()
193
194 def cleanup_connections(self) -> None:
195 with self.lock:
196 logger.info("scanning for idle connections..")
197 idle_fs = [fs_name for fs_name, conn in self.connections.items()
198 if conn.is_connection_idle(CephfsConnectionPool.CONNECTION_IDLE_INTERVAL)]
199 for fs_name in idle_fs:
200 logger.info("cleaning up connection for '{}'".format(fs_name))
201 self._del_fs_handle(fs_name)
202
203 def get_fs_handle(self, fs_name: str) -> "cephfs.LibCephFS":
204 with self.lock:
205 conn = None
206 try:
207 conn = self.connections.get(fs_name, None)
208 if conn:
209 if conn.is_connection_valid():
210 return conn.get_fs_handle()
211 else:
212 # filesystem id changed beneath us (or the filesystem does not exist).
213 # this is possible if the filesystem got removed (and recreated with
214 # same name) via "ceph fs rm/new" mon command.
215 logger.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
216 self._del_fs_handle(fs_name)
217 conn = CephfsConnectionPool.Connection(self.mgr, fs_name)
218 conn.connect()
219 except cephfs.Error as e:
220 # try to provide a better error string if possible
221 if e.args[0] == errno.ENOENT:
222 raise CephfsConnectionException(
223 -errno.ENOENT, "FS '{0}' not found".format(fs_name))
224 raise CephfsConnectionException(-e.args[0], e.args[1])
225 self.connections[fs_name] = conn
226 return conn.get_fs_handle()
227
228 def put_fs_handle(self, fs_name: str) -> None:
229 with self.lock:
230 conn = self.connections.get(fs_name, None)
231 if conn:
232 conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
233
234 def _del_fs_handle(self, fs_name: str, wait: bool = False) -> None:
235 conn = self.connections.pop(fs_name, None)
236 if conn:
237 conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
238
239 def del_fs_handle(self, fs_name: str, wait: bool = False) -> None:
240 with self.lock:
241 self._del_fs_handle(fs_name, wait)
242
243 def del_all_handles(self) -> None:
244 with self.lock:
245 for fs_name in list(self.connections.keys()):
246 logger.info("waiting for pending ops for '{}'".format(fs_name))
247 self._del_fs_handle(fs_name, wait=True)
248 logger.info("pending ops completed for '{}'".format(fs_name))
249 # no new connections should have been initialized since its
250 # guarded on shutdown.
251 assert len(self.connections) == 0
252
253
254class CephfsClient(Generic[Module_T]):
255 def __init__(self, mgr: Module_T):
256 self.mgr = mgr
257 self.stopping = Event()
258 self.connection_pool = CephfsConnectionPool(self.mgr)
259
260 def is_stopping(self) -> bool:
261 return self.stopping.is_set()
262
263 def shutdown(self) -> None:
264 logger.info("shutting down")
265 # first, note that we're shutting down
266 self.stopping.set()
267 # second, delete all libcephfs handles from connection pool
268 self.connection_pool.del_all_handles()
269
270 def get_fs(self, fs_name: str) -> Optional["cephfs.LibCephFS"]:
271 fs_map = self.mgr.get('fs_map')
272 for fs in fs_map['filesystems']:
273 if fs['mdsmap']['fs_name'] == fs_name:
274 return fs
275 return None
276
277 def get_mds_names(self, fs_name: str) -> List[str]:
278 fs = self.get_fs(fs_name)
279 if fs is None:
280 return []
281 return [mds['name'] for mds in fs['mdsmap']['info'].values()]
282
283 def get_metadata_pool(self, fs_name: str) -> Optional[str]:
284 fs = self.get_fs(fs_name)
285 if fs:
286 return fs['mdsmap']['metadata_pool']
287 return None
288
289
290@contextlib.contextmanager
291def open_filesystem(fsc: CephfsClient, fs_name: str) -> Generator["cephfs.LibCephFS", None, None]:
292 """
293 Open a volume with shared access.
294 This API is to be used as a context manager.
295
296 :param fsc: cephfs client instance
297 :param fs_name: fs name
298 :return: yields a fs handle (ceph filesystem handle)
299 """
300 if fsc.is_stopping():
301 raise CephfsConnectionException(-errno.ESHUTDOWN,
302 "shutdown in progress")
303
304 fs_handle = fsc.connection_pool.get_fs_handle(fs_name)
305 try:
306 yield fs_handle
307 finally:
308 fsc.connection_pool.put_fs_handle(fs_name)
309
310
311def colorize(msg: str, color: int, dark: bool = False) -> str:
11fdf7f2
TL
312 """
313 Decorate `msg` with escape sequences to give the requested color
314 """
315 return (COLOR_DARK_SEQ if dark else COLOR_SEQ) % (30 + color) \
316 + msg + RESET_SEQ
317
318
f67539c2 319def bold(msg: str) -> str:
11fdf7f2
TL
320 """
321 Decorate `msg` with escape sequences to make it appear bold
322 """
323 return BOLD_SEQ + msg + RESET_SEQ
324
325
f67539c2 326def format_units(n: int, width: int, colored: bool, decimal: bool) -> str:
11fdf7f2
TL
327 """
328 Format a number without units, so as to fit into `width` characters, substituting
329 an appropriate unit suffix.
330
331 Use decimal for dimensionless things, use base 2 (decimal=False) for byte sizes/rates.
332 """
333
334 factor = 1000 if decimal else 1024
335 units = [' ', 'k', 'M', 'G', 'T', 'P', 'E']
336 unit = 0
337 while len("%s" % (int(n) // (factor**unit))) > width - 1:
338 unit += 1
339
340 if unit > 0:
341 truncated_float = ("%f" % (n / (float(factor) ** unit)))[0:width - 1]
342 if truncated_float[-1] == '.':
343 truncated_float = " " + truncated_float[0:-1]
344 else:
345 truncated_float = "%{wid}d".format(wid=width - 1) % n
346 formatted = "%s%s" % (truncated_float, units[unit])
347
348 if colored:
349 if n == 0:
350 color = BLACK, False
351 else:
352 color = YELLOW, False
353 return bold(colorize(formatted[0:-1], color[0], color[1])) \
f67539c2 354 + bold(colorize(formatted[-1], YELLOW, False))
11fdf7f2
TL
355 else:
356 return formatted
357
358
f67539c2 359def format_dimless(n: int, width: int, colored: bool = False) -> str:
11fdf7f2
TL
360 return format_units(n, width, colored, decimal=True)
361
362
f67539c2 363def format_bytes(n: int, width: int, colored: bool = False) -> str:
11fdf7f2 364 return format_units(n, width, colored, decimal=False)
81eedcae
TL
365
366
f67539c2 367def merge_dicts(*args: Dict[T, Any]) -> Dict[T, Any]:
81eedcae 368 """
9f95a23c
TL
369 >>> merge_dicts({1:2}, {3:4})
370 {1: 2, 3: 4}
371
372 You can also overwrite keys:
373 >>> merge_dicts({1:2}, {1:4})
374 {1: 4}
375
81eedcae
TL
376 :rtype: dict[str, Any]
377 """
378 ret = {}
379 for arg in args:
380 ret.update(arg)
381 return ret
494da23a
TL
382
383
384def get_default_addr():
9f95a23c 385 # type: () -> str
f67539c2 386 def is_ipv6_enabled() -> bool:
494da23a
TL
387 try:
388 sock = socket.socket(socket.AF_INET6)
389 with contextlib.closing(sock):
390 sock.bind(("::1", 0))
391 return True
f67539c2
TL
392 except (AttributeError, socket.error):
393 return False
494da23a
TL
394
395 try:
9f95a23c 396 return get_default_addr.result # type: ignore
494da23a
TL
397 except AttributeError:
398 result = '::' if is_ipv6_enabled() else '0.0.0.0'
9f95a23c 399 get_default_addr.result = result # type: ignore
494da23a
TL
400 return result
401
eafe8130
TL
402
403class ServerConfigException(Exception):
404 pass
405
9f95a23c 406
f67539c2
TL
407def create_self_signed_cert(organisation: str = 'Ceph',
408 common_name: str = 'mgr',
409 dname: Optional[Dict[str, str]] = None) -> Tuple[str, str]:
9f95a23c 410 """Returns self-signed PEM certificates valid for 10 years.
f67539c2
TL
411
412 The optional dname parameter provides complete control of the cert/key
413 creation by supporting all valid RDNs via a dictionary. However, if dname
414 is not provided the default O and CN settings will be applied.
415
416 :param organisation: String representing the Organisation(O) RDN (default='Ceph')
417 :param common_name: String representing the Common Name(CN) RDN (default='mgr')
418 :param dname: Optional dictionary containing RDNs to use for crt/key generation
419
420 :return: ssl crt and key in utf-8 format
421
422 :raises ValueError: if the dname parameter received contains invalid RDNs
423
9f95a23c
TL
424 """
425
426 from OpenSSL import crypto
427 from uuid import uuid4
428
f67539c2
TL
429 # RDN = Relative Distinguished Name
430 valid_RDN_list = ['C', 'ST', 'L', 'O', 'OU', 'CN', 'emailAddress']
431
9f95a23c
TL
432 # create a key pair
433 pkey = crypto.PKey()
434 pkey.generate_key(crypto.TYPE_RSA, 2048)
435
f67539c2
TL
436 # Create a "subject" object
437 req = crypto.X509Req()
438 subj = req.get_subject()
439
440 if dname:
441 # dname received, so check it contains valid RDNs
442 if not all(field in valid_RDN_list for field in dname):
443 raise ValueError("Invalid DNAME received. Valid DNAME fields are {}".format(', '.join(valid_RDN_list)))
444 else:
445 dname = {"O": organisation, "CN": common_name}
446
447 # populate the subject with the dname settings
448 for k, v in dname.items():
449 setattr(subj, k, v)
450
9f95a23c
TL
451 # create a self-signed cert
452 cert = crypto.X509()
f67539c2 453 cert.set_subject(req.get_subject())
9f95a23c
TL
454 cert.set_serial_number(int(uuid4()))
455 cert.gmtime_adj_notBefore(0)
456 cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
457 cert.set_issuer(cert.get_subject())
458 cert.set_pubkey(pkey)
459 cert.sign(pkey, 'sha512')
460
461 cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
462 pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)
463
464 return cert.decode('utf-8'), pkey.decode('utf-8')
465
466
467def verify_cacrt_content(crt):
468 # type: (str) -> None
469 from OpenSSL import crypto
470 try:
471 x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
472 if x509.has_expired():
473 logger.warning('Certificate has expired: {}'.format(crt))
474 except (ValueError, crypto.Error) as e:
475 raise ServerConfigException(
476 'Invalid certificate: {}'.format(str(e)))
477
478
eafe8130 479def verify_cacrt(cert_fname):
9f95a23c 480 # type: (str) -> None
eafe8130
TL
481 """Basic validation of a ca cert"""
482
483 if not cert_fname:
484 raise ServerConfigException("CA cert not configured")
485 if not os.path.isfile(cert_fname):
486 raise ServerConfigException("Certificate {} does not exist".format(cert_fname))
487
eafe8130
TL
488 try:
489 with open(cert_fname) as f:
9f95a23c
TL
490 verify_cacrt_content(f.read())
491 except ValueError as e:
eafe8130
TL
492 raise ServerConfigException(
493 'Invalid certificate {}: {}'.format(cert_fname, str(e)))
494
495
9f95a23c
TL
496def verify_tls(crt, key):
497 # type: (str, str) -> None
498 verify_cacrt_content(crt)
499
500 from OpenSSL import crypto, SSL
501 try:
502 _key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
503 _key.check()
504 except (ValueError, crypto.Error) as e:
505 raise ServerConfigException(
506 'Invalid private key: {}'.format(str(e)))
507 try:
508 _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
509 except ValueError as e:
510 raise ServerConfigException(
511 'Invalid certificate key: {}'.format(str(e))
512 )
513
514 try:
515 context = SSL.Context(SSL.TLSv1_METHOD)
516 context.use_certificate(_crt)
517 context.use_privatekey(_key)
518 context.check_privatekey()
519 except crypto.Error as e:
520 logger.warning(
521 'Private key and certificate do not match up: {}'.format(str(e)))
522
523
eafe8130 524def verify_tls_files(cert_fname, pkey_fname):
9f95a23c 525 # type: (str, str) -> None
eafe8130
TL
526 """Basic checks for TLS certificate and key files
527
528 Do some validations to the private key and certificate:
529 - Check the type and format
530 - Check the certificate expiration date
531 - Check the consistency of the private key
532 - Check that the private key and certificate match up
533
534 :param cert_fname: Name of the certificate file
535 :param pkey_fname: name of the certificate public key file
536
537 :raises ServerConfigException: An error with a message
538
539 """
540
541 if not cert_fname or not pkey_fname:
542 raise ServerConfigException('no certificate configured')
543
544 verify_cacrt(cert_fname)
545
546 if not os.path.isfile(pkey_fname):
547 raise ServerConfigException('private key %s does not exist' % pkey_fname)
548
549 from OpenSSL import crypto, SSL
550
551 try:
552 with open(pkey_fname) as f:
553 pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, f.read())
554 pkey.check()
555 except (ValueError, crypto.Error) as e:
556 raise ServerConfigException(
557 'Invalid private key {}: {}'.format(pkey_fname, str(e)))
558 try:
559 context = SSL.Context(SSL.TLSv1_METHOD)
560 context.use_certificate_file(cert_fname, crypto.FILETYPE_PEM)
561 context.use_privatekey_file(pkey_fname, crypto.FILETYPE_PEM)
562 context.check_privatekey()
563 except crypto.Error as e:
564 logger.warning(
565 'Private key {} and certificate {} do not match up: {}'.format(
566 pkey_fname, cert_fname, str(e)))
9f95a23c 567
f67539c2
TL
568
569def get_most_recent_rate(rates: Optional[List[Tuple[float, float]]]) -> float:
9f95a23c
TL
570 """ Get most recent rate from rates
571
572 :param rates: The derivative between all time series data points [time in seconds, value]
573 :type rates: list[tuple[int, float]]
574
575 :return: The last derivative or 0.0 if none exists
576 :rtype: float
577
578 >>> get_most_recent_rate(None)
579 0.0
580 >>> get_most_recent_rate([])
581 0.0
582 >>> get_most_recent_rate([(1, -2.0)])
583 -2.0
584 >>> get_most_recent_rate([(1, 2.0), (2, 1.5), (3, 5.0)])
585 5.0
586 """
587 if not rates:
588 return 0.0
589 return rates[-1][1]
590
f67539c2 591def get_time_series_rates(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
9f95a23c
TL
592 """ Rates from time series data
593
594 :param data: Time series data [time in seconds, value]
595 :type data: list[tuple[int, float]]
596
597 :return: The derivative between all time series data points [time in seconds, value]
598 :rtype: list[tuple[int, float]]
599
600 >>> logger.debug = lambda s,x,y: print(s % (x,y))
601 >>> get_time_series_rates([])
602 []
603 >>> get_time_series_rates([[0, 1], [1, 3]])
604 [(1, 2.0)]
605 >>> get_time_series_rates([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
606 Duplicate timestamp in time series data: [0, 2], [0, 3]
607 Duplicate timestamp in time series data: [0, 3], [0, 1]
608 Duplicate timestamp in time series data: [1, 2], [1, 3]
609 [(1, 2.0)]
610 >>> get_time_series_rates([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
611 [(2, 2.0), (4, 4.0), (5, 5.0), (6, 6.0)]
612 """
613 data = _filter_time_series(data)
614 if not data:
615 return []
f67539c2 616 return [(data2[0], _derivative(data1, data2) if data1 is not None else 0.0) for data1, data2 in
9f95a23c
TL
617 _pairwise(data)]
618
f67539c2
TL
619
620def _filter_time_series(data: List[Tuple[float, float]]) -> List[Tuple[float, float]]:
9f95a23c
TL
621 """ Filters time series data
622
623 Filters out samples with the same timestamp in given time series data.
624 It also enforces the list to contain at least two samples.
625
626 All filtered values will be shown in the debug log. If values were filtered it's a bug in the
627 time series data collector, please report it.
628
629 :param data: Time series data [time in seconds, value]
630 :type data: list[tuple[int, float]]
631
632 :return: Filtered time series data [time in seconds, value]
633 :rtype: list[tuple[int, float]]
634
635 >>> logger.debug = lambda s,x,y: print(s % (x,y))
636 >>> _filter_time_series([])
637 []
638 >>> _filter_time_series([[1, 42]])
639 []
640 >>> _filter_time_series([[10, 2], [10, 3]])
641 Duplicate timestamp in time series data: [10, 2], [10, 3]
642 []
643 >>> _filter_time_series([[0, 1], [1, 2]])
644 [[0, 1], [1, 2]]
645 >>> _filter_time_series([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
646 Duplicate timestamp in time series data: [0, 2], [0, 3]
647 Duplicate timestamp in time series data: [0, 3], [0, 1]
648 Duplicate timestamp in time series data: [1, 2], [1, 3]
649 [[0, 1], [1, 3]]
650 >>> _filter_time_series([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
651 [[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]]
652 """
653 filtered = []
654 for i in range(len(data) - 1):
655 if data[i][0] == data[i + 1][0]: # Same timestamp
656 logger.debug("Duplicate timestamp in time series data: %s, %s", data[i], data[i + 1])
657 continue
658 filtered.append(data[i])
659 if not filtered:
660 return []
661 filtered.append(data[-1])
662 return filtered
663
f67539c2
TL
664
665def _derivative(p1: Tuple[float, float], p2: Tuple[float, float]) -> float:
9f95a23c
TL
666 """ Derivative between two time series data points
667
668 :param p1: Time series data [time in seconds, value]
669 :type p1: tuple[int, float]
670 :param p2: Time series data [time in seconds, value]
671 :type p2: tuple[int, float]
672
673 :return: Derivative between both points
674 :rtype: float
675
676 >>> _derivative([0, 0], [2, 1])
677 0.5
678 >>> _derivative([0, 1], [2, 0])
679 -0.5
680 >>> _derivative([0, 0], [3, 1])
681 0.3333333333333333
682 """
683 return (p2[1] - p1[1]) / float(p2[0] - p1[0])
684
f67539c2
TL
685
686def _pairwise(iterable: Iterable[T]) -> Generator[Tuple[Optional[T], T], None, None]:
9f95a23c
TL
687 it = iter(iterable)
688 a = next(it, None)
689
690 for b in it:
691 yield (a, b)
692 a = b
693
f67539c2
TL
694
695def to_pretty_timedelta(n: datetime.timedelta) -> str:
9f95a23c
TL
696 if n < datetime.timedelta(seconds=120):
697 return str(n.seconds) + 's'
698 if n < datetime.timedelta(minutes=120):
699 return str(n.seconds // 60) + 'm'
700 if n < datetime.timedelta(hours=48):
701 return str(n.seconds // 3600) + 'h'
702 if n < datetime.timedelta(days=14):
703 return str(n.days) + 'd'
704 if n < datetime.timedelta(days=7*12):
705 return str(n.days // 7) + 'w'
706 if n < datetime.timedelta(days=365*2):
707 return str(n.days // 30) + 'M'
708 return str(n.days // 365) + 'y'
f6b5b4d7
TL
709
710
f67539c2 711def profile_method(skip_attribute: bool = False) -> Callable[[Callable[..., T]], Callable[..., T]]:
f6b5b4d7
TL
712 """
713 Decorator for methods of the Module class. Logs the name of the given
714 function f with the time it takes to execute it.
715 """
f67539c2 716 def outer(f: Callable[..., T]) -> Callable[..., T]:
f6b5b4d7 717 @wraps(f)
f67539c2 718 def wrapper(*args: Any, **kwargs: Any) -> T:
f6b5b4d7
TL
719 self = args[0]
720 t = time.time()
721 self.log.debug('Starting method {}.'.format(f.__name__))
722 result = f(*args, **kwargs)
723 duration = time.time() - t
724 if not skip_attribute:
725 wrapper._execution_duration = duration # type: ignore
726 self.log.debug('Method {} ran {:.3f} seconds.'.format(f.__name__, duration))
727 return result
728 return wrapper
729 return outer