3 if 'UNITTEST' in os
.environ
:
14 from threading
import Lock
, Condition
, Event
15 from typing
import no_type_check
17 from functools
import wraps
18 if sys
.version_info
>= (3, 3):
19 from threading
import Timer
21 from threading
import _Timer
as Timer
23 from typing
import Tuple
, Any
, Callable
, Optional
, Dict
, TYPE_CHECKING
, TypeVar
, List
, Iterable
, Generator
, Generic
, Iterator
25 from ceph
.deployment
.utils
import wrap_ipv6
30 from mgr_module
import MgrModule
32 Module_T
= TypeVar('Module_T', bound
="MgrModule")
46 COLOR_SEQ
= "\033[1;%dm"
47 COLOR_DARK_SEQ
= "\033[0;%dm"
49 UNDERLINE_SEQ
= "\033[4m"
51 logger
= logging
.getLogger(__name__
)
54 class CephfsConnectionException(Exception):
55 def __init__(self
, error_code
: int, error_message
: str):
56 self
.errno
= error_code
57 self
.error_str
= error_message
59 def to_tuple(self
) -> Tuple
[int, str, str]:
60 return self
.errno
, "", self
.error_str
62 def __str__(self
) -> str:
63 return "{0} ({1})".format(self
.errno
, self
.error_str
)
67 recurring timer variant of Timer
72 while not self
.finished
.is_set():
73 self
.finished
.wait(self
.interval
)
74 self
.function(*self
.args
, **self
.kwargs
)
76 except Exception as e
:
77 logger
.error("task exception: %s", e
)
80 @contextlib.contextmanager
81 def lock_timeout_log(lock
: Lock
, timeout
: int = 5) -> Iterator
[None]:
86 logger
.debug("locking {} with {} timeout".format(lock
, timeout
))
87 if lock
.acquire(timeout
=timeout
):
88 logger
.debug("locked {}".format(lock
))
93 if not warned
and now
- start
> WARN_AFTER
:
94 logger
.info("possible deadlock acquiring {}".format(lock
))
98 class CephfsConnectionPool(object):
99 class Connection(object):
100 def __init__(self
, mgr
: Module_T
, fs_name
: str):
101 self
.fs
: Optional
["cephfs.LibCephFS"] = None
103 self
.fs_name
= fs_name
104 self
.ops_in_progress
= 0
105 self
.last_used
= time
.time()
106 self
.fs_id
= self
.get_fs_id()
108 def get_fs_id(self
) -> int:
109 fs_map
= self
.mgr
.get('fs_map')
110 for fs
in fs_map
['filesystems']:
111 if fs
['mdsmap']['fs_name'] == self
.fs_name
:
113 raise CephfsConnectionException(
114 -errno
.ENOENT
, "FS '{0}' not found".format(self
.fs_name
))
116 def get_fs_handle(self
) -> "cephfs.LibCephFS":
117 self
.last_used
= time
.time()
118 self
.ops_in_progress
+= 1
121 def put_fs_handle(self
, notify
: Callable
) -> None:
122 assert self
.ops_in_progress
> 0
123 self
.ops_in_progress
-= 1
124 if self
.ops_in_progress
== 0:
127 def del_fs_handle(self
, waiter
: Optional
[Callable
]) -> None:
129 while self
.ops_in_progress
!= 0:
131 if self
.is_connection_valid():
136 def is_connection_valid(self
) -> bool:
139 fs_id
= self
.get_fs_id()
141 # the filesystem does not exist now -- connection is not valid.
143 logger
.debug("self.fs_id={0}, fs_id={1}".format(self
.fs_id
, fs_id
))
144 return self
.fs_id
== fs_id
146 def is_connection_idle(self
, timeout
: float) -> bool:
147 return (self
.ops_in_progress
== 0 and ((time
.time() - self
.last_used
) >= timeout
))
149 def connect(self
) -> None:
150 assert self
.ops_in_progress
== 0
151 logger
.debug("Connecting to cephfs '{0}'".format(self
.fs_name
))
152 self
.fs
= cephfs
.LibCephFS(rados_inst
=self
.mgr
.rados
)
153 logger
.debug("Setting user ID and group ID of CephFS mount as root...")
154 self
.fs
.conf_set("client_mount_uid", "0")
155 self
.fs
.conf_set("client_mount_gid", "0")
156 self
.fs
.conf_set("client_check_pool_perm", "false")
157 self
.fs
.conf_set("client_quota", "false")
158 logger
.debug("CephFS initializing...")
160 logger
.debug("CephFS mounting...")
161 self
.fs
.mount(filesystem_name
=self
.fs_name
.encode('utf-8'))
162 logger
.debug("Connection to cephfs '{0}' complete".format(self
.fs_name
))
163 self
.mgr
._ceph
_register
_client
(self
.fs
.get_addrs())
165 def disconnect(self
) -> None:
168 assert self
.ops_in_progress
== 0
169 logger
.info("disconnecting from cephfs '{0}'".format(self
.fs_name
))
170 addrs
= self
.fs
.get_addrs()
172 self
.mgr
._ceph
_unregister
_client
(addrs
)
174 except Exception as e
:
175 logger
.debug("disconnect: ({0})".format(e
))
178 def abort(self
) -> None:
180 assert self
.ops_in_progress
== 0
181 logger
.info("aborting connection from cephfs '{0}'".format(self
.fs_name
))
183 logger
.info("abort done from cephfs '{0}'".format(self
.fs_name
))
186 # TODO: make this configurable
187 TIMER_TASK_RUN_INTERVAL
= 30.0 # seconds
188 CONNECTION_IDLE_INTERVAL
= 60.0 # seconds
189 MAX_CONCURRENT_CONNECTIONS
= 5 # max number of concurrent connections per volume
191 def __init__(self
, mgr
: Module_T
):
193 self
.connections
: Dict
[str, List
[CephfsConnectionPool
.Connection
]] = {}
195 self
.cond
= Condition(self
.lock
)
196 self
.timer_task
= RTimer(CephfsConnectionPool
.TIMER_TASK_RUN_INTERVAL
,
197 self
.cleanup_connections
)
198 self
.timer_task
.start()
200 def cleanup_connections(self
) -> None:
202 logger
.info("scanning for idle connections..")
204 for fs_name
, connections
in self
.connections
.items():
205 logger
.debug(f
'fs_name ({fs_name}) connections ({connections})')
206 for connection
in connections
:
207 if connection
.is_connection_idle(CephfsConnectionPool
.CONNECTION_IDLE_INTERVAL
):
208 idle_conns
.append((fs_name
, connection
))
209 logger
.info(f
'cleaning up connections: {idle_conns}')
210 for idle_conn
in idle_conns
:
211 self
._del
_connection
(idle_conn
[0], idle_conn
[1])
213 def get_fs_handle(self
, fs_name
: str) -> "cephfs.LibCephFS":
217 shared_connection
= None
218 connections
= self
.connections
.setdefault(fs_name
, [])
219 logger
.debug(f
'[get] volume: ({fs_name}) connection: ({connections})')
221 min_shared
= connections
[0].ops_in_progress
222 shared_connection
= connections
[0]
223 for connection
in list(connections
):
224 logger
.debug(f
'[get] connection: {connection} usage: {connection.ops_in_progress}')
225 if connection
.ops_in_progress
== 0:
226 if connection
.is_connection_valid():
227 logger
.debug(f
'[get] connection ({connection}) can be reused')
228 return connection
.get_fs_handle()
230 # filesystem id changed beneath us (or the filesystem does not exist).
231 # this is possible if the filesystem got removed (and recreated with
232 # same name) via "ceph fs rm/new" mon command.
233 logger
.warning(f
'[get] filesystem id changed for volume ({fs_name}), disconnecting ({connection})')
234 # note -- this will mutate @connections too
235 self
._del
_connection
(fs_name
, connection
)
237 if connection
.ops_in_progress
< min_shared
:
238 min_shared
= connection
.ops_in_progress
239 shared_connection
= connection
240 # when we end up here, there are no "free" connections. so either spin up a new
242 if len(connections
) < CephfsConnectionPool
.MAX_CONCURRENT_CONNECTIONS
:
243 logger
.debug('[get] spawning new connection since no connection is unused and we still have room for more')
244 connection
= CephfsConnectionPool
.Connection(self
.mgr
, fs_name
)
246 self
.connections
[fs_name
].append(connection
)
247 return connection
.get_fs_handle()
249 assert shared_connection
is not None
250 logger
.debug(f
'[get] using shared connection ({shared_connection})')
251 return shared_connection
.get_fs_handle()
252 except cephfs
.Error
as e
:
253 # try to provide a better error string if possible
254 if e
.args
[0] == errno
.ENOENT
:
255 raise CephfsConnectionException(
256 -errno
.ENOENT
, "FS '{0}' not found".format(fs_name
))
257 raise CephfsConnectionException(-e
.args
[0], e
.args
[1])
259 def put_fs_handle(self
, fs_name
: str, fs_handle
: cephfs
.LibCephFS
) -> None:
261 connections
= self
.connections
.get(fs_name
, [])
262 for connection
in connections
:
263 if connection
.fs
== fs_handle
:
264 logger
.debug(f
'[put] connection: {connection} usage: {connection.ops_in_progress}')
265 connection
.put_fs_handle(notify
=lambda: self
.cond
.notifyAll())
267 def _del_connection(self
, fs_name
: str, connection
: Connection
, wait
: bool = False) -> None:
268 self
.connections
[fs_name
].remove(connection
)
269 connection
.del_fs_handle(waiter
=None if not wait
else lambda: self
.cond
.wait())
271 def _del_connections(self
, fs_name
: str, wait
: bool = False) -> None:
272 for connection
in list(self
.connections
.get(fs_name
, [])):
273 self
._del
_connection
(fs_name
, connection
, wait
)
275 def del_connections(self
, fs_name
: str, wait
: bool = False) -> None:
277 self
._del
_connections
(fs_name
, wait
)
279 def del_all_connections(self
) -> None:
281 for fs_name
in list(self
.connections
.keys()):
282 logger
.info("waiting for pending ops for '{}'".format(fs_name
))
283 self
._del
_connections
(fs_name
, wait
=True)
284 logger
.info("pending ops completed for '{}'".format(fs_name
))
285 # no new connections should have been initialized since its
286 # guarded on shutdown.
287 assert len(self
.connections
) == 0
290 class CephfsClient(Generic
[Module_T
]):
291 def __init__(self
, mgr
: Module_T
):
293 self
.stopping
= Event()
294 self
.connection_pool
= CephfsConnectionPool(self
.mgr
)
296 def is_stopping(self
) -> bool:
297 return self
.stopping
.is_set()
299 def shutdown(self
) -> None:
300 logger
.info("shutting down")
301 # first, note that we're shutting down
303 # second, delete all libcephfs handles from connection pool
304 self
.connection_pool
.del_all_connections()
306 def get_fs(self
, fs_name
: str) -> Optional
["cephfs.LibCephFS"]:
307 fs_map
= self
.mgr
.get('fs_map')
308 for fs
in fs_map
['filesystems']:
309 if fs
['mdsmap']['fs_name'] == fs_name
:
313 def get_mds_names(self
, fs_name
: str) -> List
[str]:
314 fs
= self
.get_fs(fs_name
)
317 return [mds
['name'] for mds
in fs
['mdsmap']['info'].values()]
319 def get_metadata_pool(self
, fs_name
: str) -> Optional
[str]:
320 fs
= self
.get_fs(fs_name
)
322 return fs
['mdsmap']['metadata_pool']
325 def get_all_filesystems(self
) -> List
[str]:
326 fs_list
: List
[str] = []
327 fs_map
= self
.mgr
.get('fs_map')
328 if fs_map
['filesystems']:
329 for fs
in fs_map
['filesystems']:
330 fs_list
.append(fs
['mdsmap']['fs_name'])
335 @contextlib.contextmanager
336 def open_filesystem(fsc
: CephfsClient
, fs_name
: str) -> Generator
["cephfs.LibCephFS", None, None]:
338 Open a volume with shared access.
339 This API is to be used as a context manager.
341 :param fsc: cephfs client instance
342 :param fs_name: fs name
343 :return: yields a fs handle (ceph filesystem handle)
345 if fsc
.is_stopping():
346 raise CephfsConnectionException(-errno
.ESHUTDOWN
,
347 "shutdown in progress")
349 fs_handle
= fsc
.connection_pool
.get_fs_handle(fs_name
)
353 fsc
.connection_pool
.put_fs_handle(fs_name
, fs_handle
)
356 def colorize(msg
: str, color
: int, dark
: bool = False) -> str:
358 Decorate `msg` with escape sequences to give the requested color
360 return (COLOR_DARK_SEQ
if dark
else COLOR_SEQ
) % (30 + color
) \
364 def bold(msg
: str) -> str:
366 Decorate `msg` with escape sequences to make it appear bold
368 return BOLD_SEQ
+ msg
+ RESET_SEQ
371 def format_units(n
: int, width
: int, colored
: bool, decimal
: bool) -> str:
373 Format a number without units, so as to fit into `width` characters, substituting
374 an appropriate unit suffix.
376 Use decimal for dimensionless things, use base 2 (decimal=False) for byte sizes/rates.
379 factor
= 1000 if decimal
else 1024
380 units
= [' ', 'k', 'M', 'G', 'T', 'P', 'E']
382 while len("%s" % (int(n
) // (factor
**unit
))) > width
- 1:
386 truncated_float
= ("%f" % (n
/ (float(factor
) ** unit
)))[0:width
- 1]
387 if truncated_float
[-1] == '.':
388 truncated_float
= " " + truncated_float
[0:-1]
390 truncated_float
= "%{wid}d".format(wid
=width
- 1) % n
391 formatted
= "%s%s" % (truncated_float
, units
[unit
])
397 color
= YELLOW
, False
398 return bold(colorize(formatted
[0:-1], color
[0], color
[1])) \
399 + bold(colorize(formatted
[-1], YELLOW
, False))
404 def format_dimless(n
: int, width
: int, colored
: bool = False) -> str:
405 return format_units(n
, width
, colored
, decimal
=True)
408 def format_bytes(n
: int, width
: int, colored
: bool = False) -> str:
409 return format_units(n
, width
, colored
, decimal
=False)
412 def merge_dicts(*args
: Dict
[T
, Any
]) -> Dict
[T
, Any
]:
414 >>> merge_dicts({1:2}, {3:4})
417 You can also overwrite keys:
418 >>> merge_dicts({1:2}, {1:4})
421 :rtype: dict[str, Any]
429 def get_default_addr():
431 def is_ipv6_enabled() -> bool:
433 sock
= socket
.socket(socket
.AF_INET6
)
434 with contextlib
.closing(sock
):
435 sock
.bind(("::1", 0))
437 except (AttributeError, socket
.error
):
441 return get_default_addr
.result
# type: ignore
442 except AttributeError:
443 result
= '::' if is_ipv6_enabled() else '0.0.0.0'
444 get_default_addr
.result
= result
# type: ignore
448 def build_url(host
: str, scheme
: Optional
[str] = None, port
: Optional
[int] = None, path
: str = '') -> str:
450 Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets
453 >>> build_url('example.com', 'https', 443)
454 'https://example.com:443'
456 >>> build_url(host='example.com', port=443)
459 >>> build_url('fce:9af7:a667:7286:4917:b8d3:34df:8373', port=80, scheme='http')
460 'http://[fce:9af7:a667:7286:4917:b8d3:34df:8373]:80'
462 >>> build_url('example.com', 'https', 443, path='/metrics')
463 'https://example.com:443/metrics'
466 :param scheme: The scheme, e.g. http, https or ftp.
468 :param host: Consisting of either a registered name (including but not limited to
469 a hostname) or an IP address.
474 netloc
= wrap_ipv6(host
)
476 netloc
+= ':{}'.format(port
)
477 pr
= urllib
.parse
.ParseResult(
478 scheme
=scheme
if scheme
else '',
487 class ServerConfigException(Exception):
491 def create_self_signed_cert(organisation
: str = 'Ceph',
492 common_name
: str = 'mgr',
493 dname
: Optional
[Dict
[str, str]] = None) -> Tuple
[str, str]:
494 """Returns self-signed PEM certificates valid for 10 years.
496 The optional dname parameter provides complete control of the cert/key
497 creation by supporting all valid RDNs via a dictionary. However, if dname
498 is not provided the default O and CN settings will be applied.
500 :param organisation: String representing the Organisation(O) RDN (default='Ceph')
501 :param common_name: String representing the Common Name(CN) RDN (default='mgr')
502 :param dname: Optional dictionary containing RDNs to use for crt/key generation
504 :return: ssl crt and key in utf-8 format
506 :raises ValueError: if the dname parameter received contains invalid RDNs
510 from OpenSSL
import crypto
511 from uuid
import uuid4
513 # RDN = Relative Distinguished Name
514 valid_RDN_list
= ['C', 'ST', 'L', 'O', 'OU', 'CN', 'emailAddress']
518 pkey
.generate_key(crypto
.TYPE_RSA
, 2048)
520 # Create a "subject" object
521 req
= crypto
.X509Req()
522 subj
= req
.get_subject()
525 # dname received, so check it contains valid RDNs
526 if not all(field
in valid_RDN_list
for field
in dname
):
527 raise ValueError("Invalid DNAME received. Valid DNAME fields are {}".format(', '.join(valid_RDN_list
)))
529 dname
= {"O": organisation
, "CN": common_name
}
531 # populate the subject with the dname settings
532 for k
, v
in dname
.items():
535 # create a self-signed cert
537 cert
.set_subject(req
.get_subject())
538 cert
.set_serial_number(int(uuid4()))
539 cert
.gmtime_adj_notBefore(0)
540 cert
.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
541 cert
.set_issuer(cert
.get_subject())
542 cert
.set_pubkey(pkey
)
543 cert
.sign(pkey
, 'sha512')
545 cert
= crypto
.dump_certificate(crypto
.FILETYPE_PEM
, cert
)
546 pkey
= crypto
.dump_privatekey(crypto
.FILETYPE_PEM
, pkey
)
548 return cert
.decode('utf-8'), pkey
.decode('utf-8')
551 def verify_cacrt_content(crt
):
552 # type: (str) -> None
553 from OpenSSL
import crypto
555 x509
= crypto
.load_certificate(crypto
.FILETYPE_PEM
, crt
)
556 if x509
.has_expired():
557 logger
.warning('Certificate has expired: {}'.format(crt
))
558 except (ValueError, crypto
.Error
) as e
:
559 raise ServerConfigException(
560 'Invalid certificate: {}'.format(str(e
)))
563 def verify_cacrt(cert_fname
):
564 # type: (str) -> None
565 """Basic validation of a ca cert"""
568 raise ServerConfigException("CA cert not configured")
569 if not os
.path
.isfile(cert_fname
):
570 raise ServerConfigException("Certificate {} does not exist".format(cert_fname
))
573 with
open(cert_fname
) as f
:
574 verify_cacrt_content(f
.read())
575 except ValueError as e
:
576 raise ServerConfigException(
577 'Invalid certificate {}: {}'.format(cert_fname
, str(e
)))
580 def verify_tls(crt
, key
):
581 # type: (str, str) -> None
582 verify_cacrt_content(crt
)
584 from OpenSSL
import crypto
, SSL
586 _key
= crypto
.load_privatekey(crypto
.FILETYPE_PEM
, key
)
588 except (ValueError, crypto
.Error
) as e
:
589 raise ServerConfigException(
590 'Invalid private key: {}'.format(str(e
)))
592 _crt
= crypto
.load_certificate(crypto
.FILETYPE_PEM
, crt
)
593 except ValueError as e
:
594 raise ServerConfigException(
595 'Invalid certificate key: {}'.format(str(e
))
599 context
= SSL
.Context(SSL
.TLSv1_METHOD
)
600 context
.use_certificate(_crt
)
601 context
.use_privatekey(_key
)
602 context
.check_privatekey()
603 except crypto
.Error
as e
:
605 'Private key and certificate do not match up: {}'.format(str(e
)))
608 def verify_tls_files(cert_fname
, pkey_fname
):
609 # type: (str, str) -> None
610 """Basic checks for TLS certificate and key files
612 Do some validations to the private key and certificate:
613 - Check the type and format
614 - Check the certificate expiration date
615 - Check the consistency of the private key
616 - Check that the private key and certificate match up
618 :param cert_fname: Name of the certificate file
619 :param pkey_fname: name of the certificate public key file
621 :raises ServerConfigException: An error with a message
625 if not cert_fname
or not pkey_fname
:
626 raise ServerConfigException('no certificate configured')
628 verify_cacrt(cert_fname
)
630 if not os
.path
.isfile(pkey_fname
):
631 raise ServerConfigException('private key %s does not exist' % pkey_fname
)
633 from OpenSSL
import crypto
, SSL
636 with
open(pkey_fname
) as f
:
637 pkey
= crypto
.load_privatekey(crypto
.FILETYPE_PEM
, f
.read())
639 except (ValueError, crypto
.Error
) as e
:
640 raise ServerConfigException(
641 'Invalid private key {}: {}'.format(pkey_fname
, str(e
)))
643 context
= SSL
.Context(SSL
.TLSv1_METHOD
)
644 context
.use_certificate_file(cert_fname
, crypto
.FILETYPE_PEM
)
645 context
.use_privatekey_file(pkey_fname
, crypto
.FILETYPE_PEM
)
646 context
.check_privatekey()
647 except crypto
.Error
as e
:
649 'Private key {} and certificate {} do not match up: {}'.format(
650 pkey_fname
, cert_fname
, str(e
)))
653 def get_most_recent_rate(rates
: Optional
[List
[Tuple
[float, float]]]) -> float:
654 """ Get most recent rate from rates
656 :param rates: The derivative between all time series data points [time in seconds, value]
657 :type rates: list[tuple[int, float]]
659 :return: The last derivative or 0.0 if none exists
662 >>> get_most_recent_rate(None)
664 >>> get_most_recent_rate([])
666 >>> get_most_recent_rate([(1, -2.0)])
668 >>> get_most_recent_rate([(1, 2.0), (2, 1.5), (3, 5.0)])
675 def get_time_series_rates(data
: List
[Tuple
[float, float]]) -> List
[Tuple
[float, float]]:
676 """ Rates from time series data
678 :param data: Time series data [time in seconds, value]
679 :type data: list[tuple[int, float]]
681 :return: The derivative between all time series data points [time in seconds, value]
682 :rtype: list[tuple[int, float]]
684 >>> logger.debug = lambda s,x,y: print(s % (x,y))
685 >>> get_time_series_rates([])
687 >>> get_time_series_rates([[0, 1], [1, 3]])
689 >>> get_time_series_rates([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
690 Duplicate timestamp in time series data: [0, 2], [0, 3]
691 Duplicate timestamp in time series data: [0, 3], [0, 1]
692 Duplicate timestamp in time series data: [1, 2], [1, 3]
694 >>> get_time_series_rates([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
695 [(2, 2.0), (4, 4.0), (5, 5.0), (6, 6.0)]
697 data
= _filter_time_series(data
)
700 return [(data2
[0], _derivative(data1
, data2
) if data1
is not None else 0.0) for data1
, data2
in
704 def _filter_time_series(data
: List
[Tuple
[float, float]]) -> List
[Tuple
[float, float]]:
705 """ Filters time series data
707 Filters out samples with the same timestamp in given time series data.
708 It also enforces the list to contain at least two samples.
710 All filtered values will be shown in the debug log. If values were filtered it's a bug in the
711 time series data collector, please report it.
713 :param data: Time series data [time in seconds, value]
714 :type data: list[tuple[int, float]]
716 :return: Filtered time series data [time in seconds, value]
717 :rtype: list[tuple[int, float]]
719 >>> logger.debug = lambda s,x,y: print(s % (x,y))
720 >>> _filter_time_series([])
722 >>> _filter_time_series([[1, 42]])
724 >>> _filter_time_series([[10, 2], [10, 3]])
725 Duplicate timestamp in time series data: [10, 2], [10, 3]
727 >>> _filter_time_series([[0, 1], [1, 2]])
729 >>> _filter_time_series([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
730 Duplicate timestamp in time series data: [0, 2], [0, 3]
731 Duplicate timestamp in time series data: [0, 3], [0, 1]
732 Duplicate timestamp in time series data: [1, 2], [1, 3]
734 >>> _filter_time_series([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
735 [[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]]
738 for i
in range(len(data
) - 1):
739 if data
[i
][0] == data
[i
+ 1][0]: # Same timestamp
740 logger
.debug("Duplicate timestamp in time series data: %s, %s", data
[i
], data
[i
+ 1])
742 filtered
.append(data
[i
])
745 filtered
.append(data
[-1])
749 def _derivative(p1
: Tuple
[float, float], p2
: Tuple
[float, float]) -> float:
750 """ Derivative between two time series data points
752 :param p1: Time series data [time in seconds, value]
753 :type p1: tuple[int, float]
754 :param p2: Time series data [time in seconds, value]
755 :type p2: tuple[int, float]
757 :return: Derivative between both points
760 >>> _derivative([0, 0], [2, 1])
762 >>> _derivative([0, 1], [2, 0])
764 >>> _derivative([0, 0], [3, 1])
767 return (p2
[1] - p1
[1]) / float(p2
[0] - p1
[0])
770 def _pairwise(iterable
: Iterable
[T
]) -> Generator
[Tuple
[Optional
[T
], T
], None, None]:
779 def to_pretty_timedelta(n
: datetime
.timedelta
) -> str:
780 if n
< datetime
.timedelta(seconds
=120):
781 return str(n
.seconds
) + 's'
782 if n
< datetime
.timedelta(minutes
=120):
783 return str(n
.seconds
// 60) + 'm'
784 if n
< datetime
.timedelta(hours
=48):
785 return str(n
.seconds
// 3600) + 'h'
786 if n
< datetime
.timedelta(days
=14):
787 return str(n
.days
) + 'd'
788 if n
< datetime
.timedelta(days
=7*12):
789 return str(n
.days
// 7) + 'w'
790 if n
< datetime
.timedelta(days
=365*2):
791 return str(n
.days
// 30) + 'M'
792 return str(n
.days
// 365) + 'y'
795 def profile_method(skip_attribute
: bool = False) -> Callable
[[Callable
[..., T
]], Callable
[..., T
]]:
797 Decorator for methods of the Module class. Logs the name of the given
798 function f with the time it takes to execute it.
800 def outer(f
: Callable
[..., T
]) -> Callable
[..., T
]:
802 def wrapper(*args
: Any
, **kwargs
: Any
) -> T
:
805 self
.log
.debug('Starting method {}.'.format(f
.__name
__))
806 result
= f(*args
, **kwargs
)
807 duration
= time
.time() - t
808 if not skip_attribute
:
809 wrapper
._execution
_duration
= duration
# type: ignore
810 self
.log
.debug('Method {} ran {:.3f} seconds.'.format(f
.__name
__, duration
))