3 if 'UNITTEST' in os
.environ
:
14 from threading
import Lock
, Condition
, Event
15 from typing
import no_type_check
17 from functools
import wraps
18 if sys
.version_info
>= (3, 3):
19 from threading
import Timer
21 from threading
import _Timer
as Timer
23 from typing
import Tuple
, Any
, Callable
, Optional
, Dict
, TYPE_CHECKING
, TypeVar
, List
, Iterable
, Generator
, Generic
, Iterator
25 from ceph
.deployment
.utils
import wrap_ipv6
30 from mgr_module
import MgrModule
32 Module_T
= TypeVar('Module_T', bound
="MgrModule")
46 COLOR_SEQ
= "\033[1;%dm"
47 COLOR_DARK_SEQ
= "\033[0;%dm"
49 UNDERLINE_SEQ
= "\033[4m"
51 logger
= logging
.getLogger(__name__
)
54 class CephfsConnectionException(Exception):
55 def __init__(self
, error_code
: int, error_message
: str):
56 self
.errno
= error_code
57 self
.error_str
= error_message
59 def to_tuple(self
) -> Tuple
[int, str, str]:
60 return self
.errno
, "", self
.error_str
62 def __str__(self
) -> str:
63 return "{0} ({1})".format(self
.errno
, self
.error_str
)
67 recurring timer variant of Timer
72 while not self
.finished
.is_set():
73 self
.finished
.wait(self
.interval
)
74 self
.function(*self
.args
, **self
.kwargs
)
76 except Exception as e
:
77 logger
.error("task exception: %s", e
)
80 @contextlib.contextmanager
81 def lock_timeout_log(lock
: Lock
, timeout
: int = 5) -> Iterator
[None]:
86 logger
.debug("locking {} with {} timeout".format(lock
, timeout
))
87 if lock
.acquire(timeout
=timeout
):
88 logger
.debug("locked {}".format(lock
))
93 if not warned
and now
- start
> WARN_AFTER
:
94 logger
.info("possible deadlock acquiring {}".format(lock
))
98 class CephfsConnectionPool(object):
99 class Connection(object):
100 def __init__(self
, mgr
: Module_T
, fs_name
: str):
101 self
.fs
: Optional
["cephfs.LibCephFS"] = None
103 self
.fs_name
= fs_name
104 self
.ops_in_progress
= 0
105 self
.last_used
= time
.time()
106 self
.fs_id
= self
.get_fs_id()
108 def get_fs_id(self
) -> int:
109 fs_map
= self
.mgr
.get('fs_map')
110 for fs
in fs_map
['filesystems']:
111 if fs
['mdsmap']['fs_name'] == self
.fs_name
:
113 raise CephfsConnectionException(
114 -errno
.ENOENT
, "FS '{0}' not found".format(self
.fs_name
))
116 def get_fs_handle(self
) -> "cephfs.LibCephFS":
117 self
.last_used
= time
.time()
118 self
.ops_in_progress
+= 1
121 def put_fs_handle(self
, notify
: Callable
) -> None:
122 assert self
.ops_in_progress
> 0
123 self
.ops_in_progress
-= 1
124 if self
.ops_in_progress
== 0:
127 def del_fs_handle(self
, waiter
: Optional
[Callable
]) -> None:
129 while self
.ops_in_progress
!= 0:
131 if self
.is_connection_valid():
136 def is_connection_valid(self
) -> bool:
139 fs_id
= self
.get_fs_id()
141 # the filesystem does not exist now -- connection is not valid.
143 logger
.debug("self.fs_id={0}, fs_id={1}".format(self
.fs_id
, fs_id
))
144 return self
.fs_id
== fs_id
146 def is_connection_idle(self
, timeout
: float) -> bool:
147 return (self
.ops_in_progress
== 0 and ((time
.time() - self
.last_used
) >= timeout
))
149 def connect(self
) -> None:
150 assert self
.ops_in_progress
== 0
151 logger
.debug("Connecting to cephfs '{0}'".format(self
.fs_name
))
152 self
.fs
= cephfs
.LibCephFS(rados_inst
=self
.mgr
.rados
)
153 logger
.debug("Setting user ID and group ID of CephFS mount as root...")
154 self
.fs
.conf_set("client_mount_uid", "0")
155 self
.fs
.conf_set("client_mount_gid", "0")
156 self
.fs
.conf_set("client_check_pool_perm", "false")
157 logger
.debug("CephFS initializing...")
159 logger
.debug("CephFS mounting...")
160 self
.fs
.mount(filesystem_name
=self
.fs_name
.encode('utf-8'))
161 logger
.debug("Connection to cephfs '{0}' complete".format(self
.fs_name
))
162 self
.mgr
._ceph
_register
_client
(self
.fs
.get_addrs())
164 def disconnect(self
) -> None:
167 assert self
.ops_in_progress
== 0
168 logger
.info("disconnecting from cephfs '{0}'".format(self
.fs_name
))
169 addrs
= self
.fs
.get_addrs()
171 self
.mgr
._ceph
_unregister
_client
(addrs
)
173 except Exception as e
:
174 logger
.debug("disconnect: ({0})".format(e
))
177 def abort(self
) -> None:
179 assert self
.ops_in_progress
== 0
180 logger
.info("aborting connection from cephfs '{0}'".format(self
.fs_name
))
182 logger
.info("abort done from cephfs '{0}'".format(self
.fs_name
))
185 # TODO: make this configurable
186 TIMER_TASK_RUN_INTERVAL
= 30.0 # seconds
187 CONNECTION_IDLE_INTERVAL
= 60.0 # seconds
188 MAX_CONCURRENT_CONNECTIONS
= 5 # max number of concurrent connections per volume
190 def __init__(self
, mgr
: Module_T
):
192 self
.connections
: Dict
[str, List
[CephfsConnectionPool
.Connection
]] = {}
194 self
.cond
= Condition(self
.lock
)
195 self
.timer_task
= RTimer(CephfsConnectionPool
.TIMER_TASK_RUN_INTERVAL
,
196 self
.cleanup_connections
)
197 self
.timer_task
.start()
199 def cleanup_connections(self
) -> None:
201 logger
.info("scanning for idle connections..")
203 for fs_name
, connections
in self
.connections
.items():
204 logger
.debug(f
'fs_name ({fs_name}) connections ({connections})')
205 for connection
in connections
:
206 if connection
.is_connection_idle(CephfsConnectionPool
.CONNECTION_IDLE_INTERVAL
):
207 idle_conns
.append((fs_name
, connection
))
208 logger
.info(f
'cleaning up connections: {idle_conns}')
209 for idle_conn
in idle_conns
:
210 self
._del
_connection
(idle_conn
[0], idle_conn
[1])
212 def get_fs_handle(self
, fs_name
: str) -> "cephfs.LibCephFS":
216 shared_connection
= None
217 connections
= self
.connections
.setdefault(fs_name
, [])
218 logger
.debug(f
'[get] volume: ({fs_name}) connection: ({connections})')
220 min_shared
= connections
[0].ops_in_progress
221 shared_connection
= connections
[0]
222 for connection
in list(connections
):
223 logger
.debug(f
'[get] connection: {connection} usage: {connection.ops_in_progress}')
224 if connection
.ops_in_progress
== 0:
225 if connection
.is_connection_valid():
226 logger
.debug(f
'[get] connection ({connection}) can be reused')
227 return connection
.get_fs_handle()
229 # filesystem id changed beneath us (or the filesystem does not exist).
230 # this is possible if the filesystem got removed (and recreated with
231 # same name) via "ceph fs rm/new" mon command.
232 logger
.warning(f
'[get] filesystem id changed for volume ({fs_name}), disconnecting ({connection})')
233 # note -- this will mutate @connections too
234 self
._del
_connection
(fs_name
, connection
)
236 if connection
.ops_in_progress
< min_shared
:
237 min_shared
= connection
.ops_in_progress
238 shared_connection
= connection
239 # when we end up here, there are no "free" connections. so either spin up a new
241 if len(connections
) < CephfsConnectionPool
.MAX_CONCURRENT_CONNECTIONS
:
242 logger
.debug('[get] spawning new connection since no connection is unused and we still have room for more')
243 connection
= CephfsConnectionPool
.Connection(self
.mgr
, fs_name
)
245 self
.connections
[fs_name
].append(connection
)
246 return connection
.get_fs_handle()
248 assert shared_connection
is not None
249 logger
.debug(f
'[get] using shared connection ({shared_connection})')
250 return shared_connection
.get_fs_handle()
251 except cephfs
.Error
as e
:
252 # try to provide a better error string if possible
253 if e
.args
[0] == errno
.ENOENT
:
254 raise CephfsConnectionException(
255 -errno
.ENOENT
, "FS '{0}' not found".format(fs_name
))
256 raise CephfsConnectionException(-e
.args
[0], e
.args
[1])
258 def put_fs_handle(self
, fs_name
: str, fs_handle
: cephfs
.LibCephFS
) -> None:
260 connections
= self
.connections
.get(fs_name
, [])
261 for connection
in connections
:
262 if connection
.fs
== fs_handle
:
263 logger
.debug(f
'[put] connection: {connection} usage: {connection.ops_in_progress}')
264 connection
.put_fs_handle(notify
=lambda: self
.cond
.notifyAll())
266 def _del_connection(self
, fs_name
: str, connection
: Connection
, wait
: bool = False) -> None:
267 self
.connections
[fs_name
].remove(connection
)
268 connection
.del_fs_handle(waiter
=None if not wait
else lambda: self
.cond
.wait())
270 def _del_connections(self
, fs_name
: str, wait
: bool = False) -> None:
271 for connection
in list(self
.connections
.get(fs_name
, [])):
272 self
._del
_connection
(fs_name
, connection
, wait
)
274 def del_connections(self
, fs_name
: str, wait
: bool = False) -> None:
276 self
._del
_connections
(fs_name
, wait
)
278 def del_all_connections(self
) -> None:
280 for fs_name
in list(self
.connections
.keys()):
281 logger
.info("waiting for pending ops for '{}'".format(fs_name
))
282 self
._del
_connections
(fs_name
, wait
=True)
283 logger
.info("pending ops completed for '{}'".format(fs_name
))
284 # no new connections should have been initialized since its
285 # guarded on shutdown.
286 assert len(self
.connections
) == 0
289 class CephfsClient(Generic
[Module_T
]):
290 def __init__(self
, mgr
: Module_T
):
292 self
.stopping
= Event()
293 self
.connection_pool
= CephfsConnectionPool(self
.mgr
)
295 def is_stopping(self
) -> bool:
296 return self
.stopping
.is_set()
298 def shutdown(self
) -> None:
299 logger
.info("shutting down")
300 # first, note that we're shutting down
302 # second, delete all libcephfs handles from connection pool
303 self
.connection_pool
.del_all_connections()
305 def get_fs(self
, fs_name
: str) -> Optional
["cephfs.LibCephFS"]:
306 fs_map
= self
.mgr
.get('fs_map')
307 for fs
in fs_map
['filesystems']:
308 if fs
['mdsmap']['fs_name'] == fs_name
:
312 def get_mds_names(self
, fs_name
: str) -> List
[str]:
313 fs
= self
.get_fs(fs_name
)
316 return [mds
['name'] for mds
in fs
['mdsmap']['info'].values()]
318 def get_metadata_pool(self
, fs_name
: str) -> Optional
[str]:
319 fs
= self
.get_fs(fs_name
)
321 return fs
['mdsmap']['metadata_pool']
324 def get_all_filesystems(self
) -> List
[str]:
325 fs_list
: List
[str] = []
326 fs_map
= self
.mgr
.get('fs_map')
327 if fs_map
['filesystems']:
328 for fs
in fs_map
['filesystems']:
329 fs_list
.append(fs
['mdsmap']['fs_name'])
334 @contextlib.contextmanager
335 def open_filesystem(fsc
: CephfsClient
, fs_name
: str) -> Generator
["cephfs.LibCephFS", None, None]:
337 Open a volume with shared access.
338 This API is to be used as a context manager.
340 :param fsc: cephfs client instance
341 :param fs_name: fs name
342 :return: yields a fs handle (ceph filesystem handle)
344 if fsc
.is_stopping():
345 raise CephfsConnectionException(-errno
.ESHUTDOWN
,
346 "shutdown in progress")
348 fs_handle
= fsc
.connection_pool
.get_fs_handle(fs_name
)
352 fsc
.connection_pool
.put_fs_handle(fs_name
, fs_handle
)
355 def colorize(msg
: str, color
: int, dark
: bool = False) -> str:
357 Decorate `msg` with escape sequences to give the requested color
359 return (COLOR_DARK_SEQ
if dark
else COLOR_SEQ
) % (30 + color
) \
363 def bold(msg
: str) -> str:
365 Decorate `msg` with escape sequences to make it appear bold
367 return BOLD_SEQ
+ msg
+ RESET_SEQ
370 def format_units(n
: int, width
: int, colored
: bool, decimal
: bool) -> str:
372 Format a number without units, so as to fit into `width` characters, substituting
373 an appropriate unit suffix.
375 Use decimal for dimensionless things, use base 2 (decimal=False) for byte sizes/rates.
378 factor
= 1000 if decimal
else 1024
379 units
= [' ', 'k', 'M', 'G', 'T', 'P', 'E']
381 while len("%s" % (int(n
) // (factor
**unit
))) > width
- 1:
385 truncated_float
= ("%f" % (n
/ (float(factor
) ** unit
)))[0:width
- 1]
386 if truncated_float
[-1] == '.':
387 truncated_float
= " " + truncated_float
[0:-1]
389 truncated_float
= "%{wid}d".format(wid
=width
- 1) % n
390 formatted
= "%s%s" % (truncated_float
, units
[unit
])
396 color
= YELLOW
, False
397 return bold(colorize(formatted
[0:-1], color
[0], color
[1])) \
398 + bold(colorize(formatted
[-1], YELLOW
, False))
403 def format_dimless(n
: int, width
: int, colored
: bool = False) -> str:
404 return format_units(n
, width
, colored
, decimal
=True)
407 def format_bytes(n
: int, width
: int, colored
: bool = False) -> str:
408 return format_units(n
, width
, colored
, decimal
=False)
411 def merge_dicts(*args
: Dict
[T
, Any
]) -> Dict
[T
, Any
]:
413 >>> merge_dicts({1:2}, {3:4})
416 You can also overwrite keys:
417 >>> merge_dicts({1:2}, {1:4})
420 :rtype: dict[str, Any]
428 def get_default_addr():
430 def is_ipv6_enabled() -> bool:
432 sock
= socket
.socket(socket
.AF_INET6
)
433 with contextlib
.closing(sock
):
434 sock
.bind(("::1", 0))
436 except (AttributeError, socket
.error
):
440 return get_default_addr
.result
# type: ignore
441 except AttributeError:
442 result
= '::' if is_ipv6_enabled() else '0.0.0.0'
443 get_default_addr
.result
= result
# type: ignore
447 def build_url(host
: str, scheme
: Optional
[str] = None, port
: Optional
[int] = None, path
: str = '') -> str:
449 Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets
452 >>> build_url('example.com', 'https', 443)
453 'https://example.com:443'
455 >>> build_url(host='example.com', port=443)
458 >>> build_url('fce:9af7:a667:7286:4917:b8d3:34df:8373', port=80, scheme='http')
459 'http://[fce:9af7:a667:7286:4917:b8d3:34df:8373]:80'
461 >>> build_url('example.com', 'https', 443, path='/metrics')
462 'https://example.com:443/metrics'
465 :param scheme: The scheme, e.g. http, https or ftp.
467 :param host: Consisting of either a registered name (including but not limited to
468 a hostname) or an IP address.
473 netloc
= wrap_ipv6(host
)
475 netloc
+= ':{}'.format(port
)
476 pr
= urllib
.parse
.ParseResult(
477 scheme
=scheme
if scheme
else '',
486 class ServerConfigException(Exception):
490 def create_self_signed_cert(organisation
: str = 'Ceph',
491 common_name
: str = 'mgr',
492 dname
: Optional
[Dict
[str, str]] = None) -> Tuple
[str, str]:
493 """Returns self-signed PEM certificates valid for 10 years.
495 The optional dname parameter provides complete control of the cert/key
496 creation by supporting all valid RDNs via a dictionary. However, if dname
497 is not provided the default O and CN settings will be applied.
499 :param organisation: String representing the Organisation(O) RDN (default='Ceph')
500 :param common_name: String representing the Common Name(CN) RDN (default='mgr')
501 :param dname: Optional dictionary containing RDNs to use for crt/key generation
503 :return: ssl crt and key in utf-8 format
505 :raises ValueError: if the dname parameter received contains invalid RDNs
509 from OpenSSL
import crypto
510 from uuid
import uuid4
512 # RDN = Relative Distinguished Name
513 valid_RDN_list
= ['C', 'ST', 'L', 'O', 'OU', 'CN', 'emailAddress']
517 pkey
.generate_key(crypto
.TYPE_RSA
, 2048)
519 # Create a "subject" object
520 req
= crypto
.X509Req()
521 subj
= req
.get_subject()
524 # dname received, so check it contains valid RDNs
525 if not all(field
in valid_RDN_list
for field
in dname
):
526 raise ValueError("Invalid DNAME received. Valid DNAME fields are {}".format(', '.join(valid_RDN_list
)))
528 dname
= {"O": organisation
, "CN": common_name
}
530 # populate the subject with the dname settings
531 for k
, v
in dname
.items():
534 # create a self-signed cert
536 cert
.set_subject(req
.get_subject())
537 cert
.set_serial_number(int(uuid4()))
538 cert
.gmtime_adj_notBefore(0)
539 cert
.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
540 cert
.set_issuer(cert
.get_subject())
541 cert
.set_pubkey(pkey
)
542 cert
.sign(pkey
, 'sha512')
544 cert
= crypto
.dump_certificate(crypto
.FILETYPE_PEM
, cert
)
545 pkey
= crypto
.dump_privatekey(crypto
.FILETYPE_PEM
, pkey
)
547 return cert
.decode('utf-8'), pkey
.decode('utf-8')
550 def verify_cacrt_content(crt
):
551 # type: (str) -> None
552 from OpenSSL
import crypto
554 x509
= crypto
.load_certificate(crypto
.FILETYPE_PEM
, crt
)
555 if x509
.has_expired():
556 logger
.warning('Certificate has expired: {}'.format(crt
))
557 except (ValueError, crypto
.Error
) as e
:
558 raise ServerConfigException(
559 'Invalid certificate: {}'.format(str(e
)))
562 def verify_cacrt(cert_fname
):
563 # type: (str) -> None
564 """Basic validation of a ca cert"""
567 raise ServerConfigException("CA cert not configured")
568 if not os
.path
.isfile(cert_fname
):
569 raise ServerConfigException("Certificate {} does not exist".format(cert_fname
))
572 with
open(cert_fname
) as f
:
573 verify_cacrt_content(f
.read())
574 except ValueError as e
:
575 raise ServerConfigException(
576 'Invalid certificate {}: {}'.format(cert_fname
, str(e
)))
579 def verify_tls(crt
, key
):
580 # type: (str, str) -> None
581 verify_cacrt_content(crt
)
583 from OpenSSL
import crypto
, SSL
585 _key
= crypto
.load_privatekey(crypto
.FILETYPE_PEM
, key
)
587 except (ValueError, crypto
.Error
) as e
:
588 raise ServerConfigException(
589 'Invalid private key: {}'.format(str(e
)))
591 _crt
= crypto
.load_certificate(crypto
.FILETYPE_PEM
, crt
)
592 except ValueError as e
:
593 raise ServerConfigException(
594 'Invalid certificate key: {}'.format(str(e
))
598 context
= SSL
.Context(SSL
.TLSv1_METHOD
)
599 context
.use_certificate(_crt
)
600 context
.use_privatekey(_key
)
601 context
.check_privatekey()
602 except crypto
.Error
as e
:
604 'Private key and certificate do not match up: {}'.format(str(e
)))
607 def verify_tls_files(cert_fname
, pkey_fname
):
608 # type: (str, str) -> None
609 """Basic checks for TLS certificate and key files
611 Do some validations to the private key and certificate:
612 - Check the type and format
613 - Check the certificate expiration date
614 - Check the consistency of the private key
615 - Check that the private key and certificate match up
617 :param cert_fname: Name of the certificate file
618 :param pkey_fname: name of the certificate public key file
620 :raises ServerConfigException: An error with a message
624 if not cert_fname
or not pkey_fname
:
625 raise ServerConfigException('no certificate configured')
627 verify_cacrt(cert_fname
)
629 if not os
.path
.isfile(pkey_fname
):
630 raise ServerConfigException('private key %s does not exist' % pkey_fname
)
632 from OpenSSL
import crypto
, SSL
635 with
open(pkey_fname
) as f
:
636 pkey
= crypto
.load_privatekey(crypto
.FILETYPE_PEM
, f
.read())
638 except (ValueError, crypto
.Error
) as e
:
639 raise ServerConfigException(
640 'Invalid private key {}: {}'.format(pkey_fname
, str(e
)))
642 context
= SSL
.Context(SSL
.TLSv1_METHOD
)
643 context
.use_certificate_file(cert_fname
, crypto
.FILETYPE_PEM
)
644 context
.use_privatekey_file(pkey_fname
, crypto
.FILETYPE_PEM
)
645 context
.check_privatekey()
646 except crypto
.Error
as e
:
648 'Private key {} and certificate {} do not match up: {}'.format(
649 pkey_fname
, cert_fname
, str(e
)))
652 def get_most_recent_rate(rates
: Optional
[List
[Tuple
[float, float]]]) -> float:
653 """ Get most recent rate from rates
655 :param rates: The derivative between all time series data points [time in seconds, value]
656 :type rates: list[tuple[int, float]]
658 :return: The last derivative or 0.0 if none exists
661 >>> get_most_recent_rate(None)
663 >>> get_most_recent_rate([])
665 >>> get_most_recent_rate([(1, -2.0)])
667 >>> get_most_recent_rate([(1, 2.0), (2, 1.5), (3, 5.0)])
674 def get_time_series_rates(data
: List
[Tuple
[float, float]]) -> List
[Tuple
[float, float]]:
675 """ Rates from time series data
677 :param data: Time series data [time in seconds, value]
678 :type data: list[tuple[int, float]]
680 :return: The derivative between all time series data points [time in seconds, value]
681 :rtype: list[tuple[int, float]]
683 >>> logger.debug = lambda s,x,y: print(s % (x,y))
684 >>> get_time_series_rates([])
686 >>> get_time_series_rates([[0, 1], [1, 3]])
688 >>> get_time_series_rates([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
689 Duplicate timestamp in time series data: [0, 2], [0, 3]
690 Duplicate timestamp in time series data: [0, 3], [0, 1]
691 Duplicate timestamp in time series data: [1, 2], [1, 3]
693 >>> get_time_series_rates([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
694 [(2, 2.0), (4, 4.0), (5, 5.0), (6, 6.0)]
696 data
= _filter_time_series(data
)
699 return [(data2
[0], _derivative(data1
, data2
) if data1
is not None else 0.0) for data1
, data2
in
703 def _filter_time_series(data
: List
[Tuple
[float, float]]) -> List
[Tuple
[float, float]]:
704 """ Filters time series data
706 Filters out samples with the same timestamp in given time series data.
707 It also enforces the list to contain at least two samples.
709 All filtered values will be shown in the debug log. If values were filtered it's a bug in the
710 time series data collector, please report it.
712 :param data: Time series data [time in seconds, value]
713 :type data: list[tuple[int, float]]
715 :return: Filtered time series data [time in seconds, value]
716 :rtype: list[tuple[int, float]]
718 >>> logger.debug = lambda s,x,y: print(s % (x,y))
719 >>> _filter_time_series([])
721 >>> _filter_time_series([[1, 42]])
723 >>> _filter_time_series([[10, 2], [10, 3]])
724 Duplicate timestamp in time series data: [10, 2], [10, 3]
726 >>> _filter_time_series([[0, 1], [1, 2]])
728 >>> _filter_time_series([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
729 Duplicate timestamp in time series data: [0, 2], [0, 3]
730 Duplicate timestamp in time series data: [0, 3], [0, 1]
731 Duplicate timestamp in time series data: [1, 2], [1, 3]
733 >>> _filter_time_series([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
734 [[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]]
737 for i
in range(len(data
) - 1):
738 if data
[i
][0] == data
[i
+ 1][0]: # Same timestamp
739 logger
.debug("Duplicate timestamp in time series data: %s, %s", data
[i
], data
[i
+ 1])
741 filtered
.append(data
[i
])
744 filtered
.append(data
[-1])
748 def _derivative(p1
: Tuple
[float, float], p2
: Tuple
[float, float]) -> float:
749 """ Derivative between two time series data points
751 :param p1: Time series data [time in seconds, value]
752 :type p1: tuple[int, float]
753 :param p2: Time series data [time in seconds, value]
754 :type p2: tuple[int, float]
756 :return: Derivative between both points
759 >>> _derivative([0, 0], [2, 1])
761 >>> _derivative([0, 1], [2, 0])
763 >>> _derivative([0, 0], [3, 1])
766 return (p2
[1] - p1
[1]) / float(p2
[0] - p1
[0])
769 def _pairwise(iterable
: Iterable
[T
]) -> Generator
[Tuple
[Optional
[T
], T
], None, None]:
778 def to_pretty_timedelta(n
: datetime
.timedelta
) -> str:
779 if n
< datetime
.timedelta(seconds
=120):
780 return str(n
.seconds
) + 's'
781 if n
< datetime
.timedelta(minutes
=120):
782 return str(n
.seconds
// 60) + 'm'
783 if n
< datetime
.timedelta(hours
=48):
784 return str(n
.seconds
// 3600) + 'h'
785 if n
< datetime
.timedelta(days
=14):
786 return str(n
.days
) + 'd'
787 if n
< datetime
.timedelta(days
=7*12):
788 return str(n
.days
// 7) + 'w'
789 if n
< datetime
.timedelta(days
=365*2):
790 return str(n
.days
// 30) + 'M'
791 return str(n
.days
// 365) + 'y'
794 def profile_method(skip_attribute
: bool = False) -> Callable
[[Callable
[..., T
]], Callable
[..., T
]]:
796 Decorator for methods of the Module class. Logs the name of the given
797 function f with the time it takes to execute it.
799 def outer(f
: Callable
[..., T
]) -> Callable
[..., T
]:
801 def wrapper(*args
: Any
, **kwargs
: Any
) -> T
:
804 self
.log
.debug('Starting method {}.'.format(f
.__name
__))
805 result
= f(*args
, **kwargs
)
806 duration
= time
.time() - t
807 if not skip_attribute
:
808 wrapper
._execution
_duration
= duration
# type: ignore
809 self
.log
.debug('Method {} ran {:.3f} seconds.'.format(f
.__name
__, duration
))