3 if 'UNITTEST' in os
.environ
:
15 from threading
import Lock
, Condition
, Event
16 from typing
import no_type_check
, NewType
18 from functools
import wraps
19 if sys
.version_info
>= (3, 3):
20 from threading
import Timer
22 from threading
import _Timer
as Timer
24 from typing
import Tuple
, Any
, Callable
, Optional
, Dict
, TYPE_CHECKING
, TypeVar
, List
, Iterable
, Generator
, Generic
, Iterator
26 from ceph
.deployment
.utils
import wrap_ipv6
31 from mgr_module
import MgrModule
33 ConfEntity
= NewType('ConfEntity', str)
35 Module_T
= TypeVar('Module_T', bound
="MgrModule")
49 COLOR_SEQ
= "\033[1;%dm"
50 COLOR_DARK_SEQ
= "\033[0;%dm"
52 UNDERLINE_SEQ
= "\033[4m"
54 logger
= logging
.getLogger(__name__
)
57 class PortAlreadyInUse(Exception):
61 class CephfsConnectionException(Exception):
62 def __init__(self
, error_code
: int, error_message
: str):
63 self
.errno
= error_code
64 self
.error_str
= error_message
66 def to_tuple(self
) -> Tuple
[int, str, str]:
67 return self
.errno
, "", self
.error_str
69 def __str__(self
) -> str:
70 return "{0} ({1})".format(self
.errno
, self
.error_str
)
74 recurring timer variant of Timer
79 while not self
.finished
.is_set():
80 self
.finished
.wait(self
.interval
)
81 self
.function(*self
.args
, **self
.kwargs
)
83 except Exception as e
:
84 logger
.error("task exception: %s", e
)
87 @contextlib.contextmanager
88 def lock_timeout_log(lock
: Lock
, timeout
: int = 5) -> Iterator
[None]:
93 logger
.debug("locking {} with {} timeout".format(lock
, timeout
))
94 if lock
.acquire(timeout
=timeout
):
95 logger
.debug("locked {}".format(lock
))
100 if not warned
and now
- start
> WARN_AFTER
:
101 logger
.info("possible deadlock acquiring {}".format(lock
))
105 class CephfsConnectionPool(object):
106 class Connection(object):
107 def __init__(self
, mgr
: Module_T
, fs_name
: str):
108 self
.fs
: Optional
["cephfs.LibCephFS"] = None
110 self
.fs_name
= fs_name
111 self
.ops_in_progress
= 0
112 self
.last_used
= time
.time()
113 self
.fs_id
= self
.get_fs_id()
115 def get_fs_id(self
) -> int:
116 fs_map
= self
.mgr
.get('fs_map')
117 for fs
in fs_map
['filesystems']:
118 if fs
['mdsmap']['fs_name'] == self
.fs_name
:
120 raise CephfsConnectionException(
121 -errno
.ENOENT
, "FS '{0}' not found".format(self
.fs_name
))
123 def get_fs_handle(self
) -> "cephfs.LibCephFS":
124 self
.last_used
= time
.time()
125 self
.ops_in_progress
+= 1
128 def put_fs_handle(self
, notify
: Callable
) -> None:
129 assert self
.ops_in_progress
> 0
130 self
.ops_in_progress
-= 1
131 if self
.ops_in_progress
== 0:
134 def del_fs_handle(self
, waiter
: Optional
[Callable
]) -> None:
136 while self
.ops_in_progress
!= 0:
138 if self
.is_connection_valid():
143 def is_connection_valid(self
) -> bool:
146 fs_id
= self
.get_fs_id()
148 # the filesystem does not exist now -- connection is not valid.
150 logger
.debug("self.fs_id={0}, fs_id={1}".format(self
.fs_id
, fs_id
))
151 return self
.fs_id
== fs_id
153 def is_connection_idle(self
, timeout
: float) -> bool:
154 return (self
.ops_in_progress
== 0 and ((time
.time() - self
.last_used
) >= timeout
))
156 def connect(self
) -> None:
157 assert self
.ops_in_progress
== 0
158 logger
.debug("Connecting to cephfs '{0}'".format(self
.fs_name
))
159 self
.fs
= cephfs
.LibCephFS(rados_inst
=self
.mgr
.rados
)
160 logger
.debug("Setting user ID and group ID of CephFS mount as root...")
161 self
.fs
.conf_set("client_mount_uid", "0")
162 self
.fs
.conf_set("client_mount_gid", "0")
163 self
.fs
.conf_set("client_check_pool_perm", "false")
164 self
.fs
.conf_set("client_quota", "false")
165 logger
.debug("CephFS initializing...")
167 logger
.debug("CephFS mounting...")
168 self
.fs
.mount(filesystem_name
=self
.fs_name
.encode('utf-8'))
169 logger
.debug("Connection to cephfs '{0}' complete".format(self
.fs_name
))
170 self
.mgr
._ceph
_register
_client
(None, self
.fs
.get_addrs(), False)
172 def disconnect(self
) -> None:
175 assert self
.ops_in_progress
== 0
176 logger
.info("disconnecting from cephfs '{0}'".format(self
.fs_name
))
177 addrs
= self
.fs
.get_addrs()
179 self
.mgr
._ceph
_unregister
_client
(None, addrs
)
181 except Exception as e
:
182 logger
.debug("disconnect: ({0})".format(e
))
185 def abort(self
) -> None:
187 assert self
.ops_in_progress
== 0
188 logger
.info("aborting connection from cephfs '{0}'".format(self
.fs_name
))
190 logger
.info("abort done from cephfs '{0}'".format(self
.fs_name
))
193 # TODO: make this configurable
194 TIMER_TASK_RUN_INTERVAL
= 30.0 # seconds
195 CONNECTION_IDLE_INTERVAL
= 60.0 # seconds
196 MAX_CONCURRENT_CONNECTIONS
= 5 # max number of concurrent connections per volume
198 def __init__(self
, mgr
: Module_T
):
200 self
.connections
: Dict
[str, List
[CephfsConnectionPool
.Connection
]] = {}
202 self
.cond
= Condition(self
.lock
)
203 self
.timer_task
= RTimer(CephfsConnectionPool
.TIMER_TASK_RUN_INTERVAL
,
204 self
.cleanup_connections
)
205 self
.timer_task
.start()
207 def cleanup_connections(self
) -> None:
209 logger
.info("scanning for idle connections..")
211 for fs_name
, connections
in self
.connections
.items():
212 logger
.debug(f
'fs_name ({fs_name}) connections ({connections})')
213 for connection
in connections
:
214 if connection
.is_connection_idle(CephfsConnectionPool
.CONNECTION_IDLE_INTERVAL
):
215 idle_conns
.append((fs_name
, connection
))
216 logger
.info(f
'cleaning up connections: {idle_conns}')
217 for idle_conn
in idle_conns
:
218 self
._del
_connection
(idle_conn
[0], idle_conn
[1])
220 def get_fs_handle(self
, fs_name
: str) -> "cephfs.LibCephFS":
224 shared_connection
= None
225 connections
= self
.connections
.setdefault(fs_name
, [])
226 logger
.debug(f
'[get] volume: ({fs_name}) connection: ({connections})')
228 min_shared
= connections
[0].ops_in_progress
229 shared_connection
= connections
[0]
230 for connection
in list(connections
):
231 logger
.debug(f
'[get] connection: {connection} usage: {connection.ops_in_progress}')
232 if connection
.ops_in_progress
== 0:
233 if connection
.is_connection_valid():
234 logger
.debug(f
'[get] connection ({connection}) can be reused')
235 return connection
.get_fs_handle()
237 # filesystem id changed beneath us (or the filesystem does not exist).
238 # this is possible if the filesystem got removed (and recreated with
239 # same name) via "ceph fs rm/new" mon command.
240 logger
.warning(f
'[get] filesystem id changed for volume ({fs_name}), disconnecting ({connection})')
241 # note -- this will mutate @connections too
242 self
._del
_connection
(fs_name
, connection
)
244 if connection
.ops_in_progress
< min_shared
:
245 min_shared
= connection
.ops_in_progress
246 shared_connection
= connection
247 # when we end up here, there are no "free" connections. so either spin up a new
249 if len(connections
) < CephfsConnectionPool
.MAX_CONCURRENT_CONNECTIONS
:
250 logger
.debug('[get] spawning new connection since no connection is unused and we still have room for more')
251 connection
= CephfsConnectionPool
.Connection(self
.mgr
, fs_name
)
253 self
.connections
[fs_name
].append(connection
)
254 return connection
.get_fs_handle()
256 assert shared_connection
is not None
257 logger
.debug(f
'[get] using shared connection ({shared_connection})')
258 return shared_connection
.get_fs_handle()
259 except cephfs
.Error
as e
:
260 # try to provide a better error string if possible
261 if e
.args
[0] == errno
.ENOENT
:
262 raise CephfsConnectionException(
263 -errno
.ENOENT
, "FS '{0}' not found".format(fs_name
))
264 raise CephfsConnectionException(-e
.args
[0], e
.args
[1])
266 def put_fs_handle(self
, fs_name
: str, fs_handle
: cephfs
.LibCephFS
) -> None:
268 connections
= self
.connections
.get(fs_name
, [])
269 for connection
in connections
:
270 if connection
.fs
== fs_handle
:
271 logger
.debug(f
'[put] connection: {connection} usage: {connection.ops_in_progress}')
272 connection
.put_fs_handle(notify
=lambda: self
.cond
.notifyAll())
274 def _del_connection(self
, fs_name
: str, connection
: Connection
, wait
: bool = False) -> None:
275 self
.connections
[fs_name
].remove(connection
)
276 connection
.del_fs_handle(waiter
=None if not wait
else lambda: self
.cond
.wait())
278 def _del_connections(self
, fs_name
: str, wait
: bool = False) -> None:
279 for connection
in list(self
.connections
.get(fs_name
, [])):
280 self
._del
_connection
(fs_name
, connection
, wait
)
282 def del_connections(self
, fs_name
: str, wait
: bool = False) -> None:
284 self
._del
_connections
(fs_name
, wait
)
286 def del_all_connections(self
) -> None:
288 for fs_name
in list(self
.connections
.keys()):
289 logger
.info("waiting for pending ops for '{}'".format(fs_name
))
290 self
._del
_connections
(fs_name
, wait
=True)
291 logger
.info("pending ops completed for '{}'".format(fs_name
))
292 # no new connections should have been initialized since its
293 # guarded on shutdown.
294 assert len(self
.connections
) == 0
297 class CephfsClient(Generic
[Module_T
]):
298 def __init__(self
, mgr
: Module_T
):
300 self
.connection_pool
= CephfsConnectionPool(self
.mgr
)
302 def shutdown(self
) -> None:
303 logger
.info("shutting down")
304 # second, delete all libcephfs handles from connection pool
305 self
.connection_pool
.del_all_connections()
307 def get_fs(self
, fs_name
: str) -> Optional
["cephfs.LibCephFS"]:
308 fs_map
= self
.mgr
.get('fs_map')
309 for fs
in fs_map
['filesystems']:
310 if fs
['mdsmap']['fs_name'] == fs_name
:
314 def get_mds_names(self
, fs_name
: str) -> List
[str]:
315 fs
= self
.get_fs(fs_name
)
318 return [mds
['name'] for mds
in fs
['mdsmap']['info'].values()]
320 def get_metadata_pool(self
, fs_name
: str) -> Optional
[str]:
321 fs
= self
.get_fs(fs_name
)
323 return fs
['mdsmap']['metadata_pool']
326 def get_all_filesystems(self
) -> List
[str]:
327 fs_list
: List
[str] = []
328 fs_map
= self
.mgr
.get('fs_map')
329 if fs_map
['filesystems']:
330 for fs
in fs_map
['filesystems']:
331 fs_list
.append(fs
['mdsmap']['fs_name'])
336 @contextlib.contextmanager
337 def open_filesystem(fsc
: CephfsClient
, fs_name
: str) -> Generator
["cephfs.LibCephFS", None, None]:
339 Open a volume with shared access.
340 This API is to be used as a context manager.
342 :param fsc: cephfs client instance
343 :param fs_name: fs name
344 :return: yields a fs handle (ceph filesystem handle)
346 fs_handle
= fsc
.connection_pool
.get_fs_handle(fs_name
)
350 fsc
.connection_pool
.put_fs_handle(fs_name
, fs_handle
)
353 def colorize(msg
: str, color
: int, dark
: bool = False) -> str:
355 Decorate `msg` with escape sequences to give the requested color
357 return (COLOR_DARK_SEQ
if dark
else COLOR_SEQ
) % (30 + color
) \
361 def bold(msg
: str) -> str:
363 Decorate `msg` with escape sequences to make it appear bold
365 return BOLD_SEQ
+ msg
+ RESET_SEQ
368 def format_units(n
: int, width
: int, colored
: bool, decimal
: bool) -> str:
370 Format a number without units, so as to fit into `width` characters, substituting
371 an appropriate unit suffix.
373 Use decimal for dimensionless things, use base 2 (decimal=False) for byte sizes/rates.
376 factor
= 1000 if decimal
else 1024
377 units
= [' ', 'k', 'M', 'G', 'T', 'P', 'E']
379 while len("%s" % (int(n
) // (factor
**unit
))) > width
- 1:
383 truncated_float
= ("%f" % (n
/ (float(factor
) ** unit
)))[0:width
- 1]
384 if truncated_float
[-1] == '.':
385 truncated_float
= " " + truncated_float
[0:-1]
387 truncated_float
= "%{wid}d".format(wid
=width
- 1) % n
388 formatted
= "%s%s" % (truncated_float
, units
[unit
])
394 color
= YELLOW
, False
395 return bold(colorize(formatted
[0:-1], color
[0], color
[1])) \
396 + bold(colorize(formatted
[-1], YELLOW
, False))
401 def format_dimless(n
: int, width
: int, colored
: bool = False) -> str:
402 return format_units(n
, width
, colored
, decimal
=True)
405 def format_bytes(n
: int, width
: int, colored
: bool = False) -> str:
406 return format_units(n
, width
, colored
, decimal
=False)
409 def test_port_allocation(addr
: str, port
: int) -> None:
410 """Checks if the port is available
411 :raises PortAlreadyInUse: in case port is already in use
412 :raises Exception: any generic error other than port already in use
413 If no exception is raised, the port can be assumed available
416 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
417 sock
.bind((addr
, port
))
419 except socket
.error
as e
:
420 if e
.errno
== errno
.EADDRINUSE
:
421 raise PortAlreadyInUse
426 def merge_dicts(*args
: Dict
[T
, Any
]) -> Dict
[T
, Any
]:
428 >>> merge_dicts({1:2}, {3:4})
431 You can also overwrite keys:
432 >>> merge_dicts({1:2}, {1:4})
435 :rtype: dict[str, Any]
443 def get_default_addr():
445 def is_ipv6_enabled() -> bool:
447 sock
= socket
.socket(socket
.AF_INET6
)
448 with contextlib
.closing(sock
):
449 sock
.bind(("::1", 0))
451 except (AttributeError, socket
.error
):
455 return get_default_addr
.result
# type: ignore
456 except AttributeError:
457 result
= '::' if is_ipv6_enabled() else '0.0.0.0'
458 get_default_addr
.result
= result
# type: ignore
462 def build_url(host
: str, scheme
: Optional
[str] = None, port
: Optional
[int] = None, path
: str = '') -> str:
464 Build a valid URL. IPv6 addresses specified in host will be enclosed in brackets
467 >>> build_url('example.com', 'https', 443)
468 'https://example.com:443'
470 >>> build_url(host='example.com', port=443)
473 >>> build_url('fce:9af7:a667:7286:4917:b8d3:34df:8373', port=80, scheme='http')
474 'http://[fce:9af7:a667:7286:4917:b8d3:34df:8373]:80'
476 >>> build_url('example.com', 'https', 443, path='/metrics')
477 'https://example.com:443/metrics'
480 :param scheme: The scheme, e.g. http, https or ftp.
482 :param host: Consisting of either a registered name (including but not limited to
483 a hostname) or an IP address.
488 netloc
= wrap_ipv6(host
)
490 netloc
+= ':{}'.format(port
)
491 pr
= urllib
.parse
.ParseResult(
492 scheme
=scheme
if scheme
else '',
501 class ServerConfigException(Exception):
505 def create_self_signed_cert(organisation
: str = 'Ceph',
506 common_name
: str = 'mgr',
507 dname
: Optional
[Dict
[str, str]] = None) -> Tuple
[str, str]:
508 """Returns self-signed PEM certificates valid for 10 years.
510 The optional dname parameter provides complete control of the cert/key
511 creation by supporting all valid RDNs via a dictionary. However, if dname
512 is not provided the default O and CN settings will be applied.
514 :param organisation: String representing the Organisation(O) RDN (default='Ceph')
515 :param common_name: String representing the Common Name(CN) RDN (default='mgr')
516 :param dname: Optional dictionary containing RDNs to use for crt/key generation
518 :return: ssl crt and key in utf-8 format
520 :raises ValueError: if the dname parameter received contains invalid RDNs
524 from OpenSSL
import crypto
525 from uuid
import uuid4
527 # RDN = Relative Distinguished Name
528 valid_RDN_list
= ['C', 'ST', 'L', 'O', 'OU', 'CN', 'emailAddress']
532 pkey
.generate_key(crypto
.TYPE_RSA
, 2048)
534 # Create a "subject" object
535 req
= crypto
.X509Req()
536 subj
= req
.get_subject()
539 # dname received, so check it contains valid RDNs
540 if not all(field
in valid_RDN_list
for field
in dname
):
541 raise ValueError("Invalid DNAME received. Valid DNAME fields are {}".format(', '.join(valid_RDN_list
)))
543 dname
= {"O": organisation
, "CN": common_name
}
545 # populate the subject with the dname settings
546 for k
, v
in dname
.items():
549 # create a self-signed cert
551 cert
.set_subject(req
.get_subject())
552 cert
.set_serial_number(int(uuid4()))
553 cert
.gmtime_adj_notBefore(0)
554 cert
.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) # 10 years
555 cert
.set_issuer(cert
.get_subject())
556 cert
.set_pubkey(pkey
)
557 cert
.sign(pkey
, 'sha512')
559 cert
= crypto
.dump_certificate(crypto
.FILETYPE_PEM
, cert
)
560 pkey
= crypto
.dump_privatekey(crypto
.FILETYPE_PEM
, pkey
)
562 return cert
.decode('utf-8'), pkey
.decode('utf-8')
565 def verify_cacrt_content(crt
):
566 # type: (str) -> None
567 from OpenSSL
import crypto
569 crt_buffer
= crt
.encode("ascii") if isinstance(crt
, str) else crt
570 x509
= crypto
.load_certificate(crypto
.FILETYPE_PEM
, crt_buffer
)
571 if x509
.has_expired():
572 org
, cn
= get_cert_issuer_info(crt
)
573 no_after
= x509
.get_notAfter()
575 if no_after
is not None:
576 end_date
= datetime
.datetime
.strptime(no_after
.decode('ascii'), '%Y%m%d%H%M%SZ')
577 msg
= f
'Certificate issued by "{org}/{cn}" expired on {end_date}'
579 raise ServerConfigException(msg
)
580 except (ValueError, crypto
.Error
) as e
:
581 raise ServerConfigException(f
'Invalid certificate: {e}')
584 def verify_cacrt(cert_fname
):
585 # type: (str) -> None
586 """Basic validation of a ca cert"""
589 raise ServerConfigException("CA cert not configured")
590 if not os
.path
.isfile(cert_fname
):
591 raise ServerConfigException("Certificate {} does not exist".format(cert_fname
))
594 with
open(cert_fname
) as f
:
595 verify_cacrt_content(f
.read())
596 except ValueError as e
:
597 raise ServerConfigException(
598 'Invalid certificate {}: {}'.format(cert_fname
, str(e
)))
600 def get_cert_issuer_info(crt
: str) -> Tuple
[Optional
[str],Optional
[str]]:
601 """Basic validation of a ca cert"""
603 from OpenSSL
import crypto
, SSL
605 crt_buffer
= crt
.encode("ascii") if isinstance(crt
, str) else crt
606 (org_name
, cn
) = (None, None)
607 cert
= crypto
.load_certificate(crypto
.FILETYPE_PEM
, crt_buffer
)
608 components
= cert
.get_issuer().get_components()
610 if c
[0].decode() == 'O': # org comp
611 org_name
= c
[1].decode()
612 elif c
[0].decode() == 'CN': # common name comp
614 return (org_name
, cn
)
615 except (ValueError, crypto
.Error
) as e
:
616 raise ServerConfigException(f
'Invalid certificate key: {e}')
618 def verify_tls(crt
, key
):
619 # type: (str, str) -> None
620 verify_cacrt_content(crt
)
622 from OpenSSL
import crypto
, SSL
624 _key
= crypto
.load_privatekey(crypto
.FILETYPE_PEM
, key
)
626 except (ValueError, crypto
.Error
) as e
:
627 raise ServerConfigException(
628 'Invalid private key: {}'.format(str(e
)))
630 crt_buffer
= crt
.encode("ascii") if isinstance(crt
, str) else crt
631 _crt
= crypto
.load_certificate(crypto
.FILETYPE_PEM
, crt_buffer
)
632 except ValueError as e
:
633 raise ServerConfigException(
634 'Invalid certificate key: {}'.format(str(e
))
638 context
= SSL
.Context(SSL
.TLSv1_METHOD
)
639 context
.use_certificate(_crt
)
640 context
.use_privatekey(_key
)
641 context
.check_privatekey()
642 except crypto
.Error
as e
:
643 logger
.warning('Private key and certificate do not match up: {}'.format(str(e
)))
644 except SSL
.Error
as e
:
645 raise ServerConfigException(f
'Invalid cert/key pair: {e}')
649 def verify_tls_files(cert_fname
, pkey_fname
):
650 # type: (str, str) -> None
651 """Basic checks for TLS certificate and key files
653 Do some validations to the private key and certificate:
654 - Check the type and format
655 - Check the certificate expiration date
656 - Check the consistency of the private key
657 - Check that the private key and certificate match up
659 :param cert_fname: Name of the certificate file
660 :param pkey_fname: name of the certificate public key file
662 :raises ServerConfigException: An error with a message
666 if not cert_fname
or not pkey_fname
:
667 raise ServerConfigException('no certificate configured')
669 verify_cacrt(cert_fname
)
671 if not os
.path
.isfile(pkey_fname
):
672 raise ServerConfigException('private key %s does not exist' % pkey_fname
)
674 from OpenSSL
import crypto
, SSL
677 with
open(pkey_fname
) as f
:
678 pkey
= crypto
.load_privatekey(crypto
.FILETYPE_PEM
, f
.read())
680 except (ValueError, crypto
.Error
) as e
:
681 raise ServerConfigException(
682 'Invalid private key {}: {}'.format(pkey_fname
, str(e
)))
684 context
= SSL
.Context(SSL
.TLSv1_METHOD
)
685 context
.use_certificate_file(cert_fname
, crypto
.FILETYPE_PEM
)
686 context
.use_privatekey_file(pkey_fname
, crypto
.FILETYPE_PEM
)
687 context
.check_privatekey()
688 except crypto
.Error
as e
:
690 'Private key {} and certificate {} do not match up: {}'.format(
691 pkey_fname
, cert_fname
, str(e
)))
694 def get_most_recent_rate(rates
: Optional
[List
[Tuple
[float, float]]]) -> float:
695 """ Get most recent rate from rates
697 :param rates: The derivative between all time series data points [time in seconds, value]
698 :type rates: list[tuple[int, float]]
700 :return: The last derivative or 0.0 if none exists
703 >>> get_most_recent_rate(None)
705 >>> get_most_recent_rate([])
707 >>> get_most_recent_rate([(1, -2.0)])
709 >>> get_most_recent_rate([(1, 2.0), (2, 1.5), (3, 5.0)])
716 def get_time_series_rates(data
: List
[Tuple
[float, float]]) -> List
[Tuple
[float, float]]:
717 """ Rates from time series data
719 :param data: Time series data [time in seconds, value]
720 :type data: list[tuple[int, float]]
722 :return: The derivative between all time series data points [time in seconds, value]
723 :rtype: list[tuple[int, float]]
725 >>> logger.debug = lambda s,x,y: print(s % (x,y))
726 >>> get_time_series_rates([])
728 >>> get_time_series_rates([[0, 1], [1, 3]])
730 >>> get_time_series_rates([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
731 Duplicate timestamp in time series data: [0, 2], [0, 3]
732 Duplicate timestamp in time series data: [0, 3], [0, 1]
733 Duplicate timestamp in time series data: [1, 2], [1, 3]
735 >>> get_time_series_rates([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
736 [(2, 2.0), (4, 4.0), (5, 5.0), (6, 6.0)]
738 data
= _filter_time_series(data
)
741 return [(data2
[0], _derivative(data1
, data2
) if data1
is not None else 0.0) for data1
, data2
in
744 def name_to_config_section(name
: str) -> ConfEntity
:
746 Map from daemon names to ceph entity names (as seen in config)
748 daemon_type
= name
.split('.', 1)[0]
749 if daemon_type
in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi']:
750 return ConfEntity('client.' + name
)
751 elif daemon_type
in ['mon', 'osd', 'mds', 'mgr', 'client']:
752 return ConfEntity(name
)
754 return ConfEntity('mon')
757 def _filter_time_series(data
: List
[Tuple
[float, float]]) -> List
[Tuple
[float, float]]:
758 """ Filters time series data
760 Filters out samples with the same timestamp in given time series data.
761 It also enforces the list to contain at least two samples.
763 All filtered values will be shown in the debug log. If values were filtered it's a bug in the
764 time series data collector, please report it.
766 :param data: Time series data [time in seconds, value]
767 :type data: list[tuple[int, float]]
769 :return: Filtered time series data [time in seconds, value]
770 :rtype: list[tuple[int, float]]
772 >>> logger.debug = lambda s,x,y: print(s % (x,y))
773 >>> _filter_time_series([])
775 >>> _filter_time_series([[1, 42]])
777 >>> _filter_time_series([[10, 2], [10, 3]])
778 Duplicate timestamp in time series data: [10, 2], [10, 3]
780 >>> _filter_time_series([[0, 1], [1, 2]])
782 >>> _filter_time_series([[0, 2], [0, 3], [0, 1], [1, 2], [1, 3]])
783 Duplicate timestamp in time series data: [0, 2], [0, 3]
784 Duplicate timestamp in time series data: [0, 3], [0, 1]
785 Duplicate timestamp in time series data: [1, 2], [1, 3]
787 >>> _filter_time_series([[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]])
788 [[1, 1], [2, 3], [4, 11], [5, 16], [6, 22]]
791 for i
in range(len(data
) - 1):
792 if data
[i
][0] == data
[i
+ 1][0]: # Same timestamp
793 logger
.debug("Duplicate timestamp in time series data: %s, %s", data
[i
], data
[i
+ 1])
795 filtered
.append(data
[i
])
798 filtered
.append(data
[-1])
802 def _derivative(p1
: Tuple
[float, float], p2
: Tuple
[float, float]) -> float:
803 """ Derivative between two time series data points
805 :param p1: Time series data [time in seconds, value]
806 :type p1: tuple[int, float]
807 :param p2: Time series data [time in seconds, value]
808 :type p2: tuple[int, float]
810 :return: Derivative between both points
813 >>> _derivative([0, 0], [2, 1])
815 >>> _derivative([0, 1], [2, 0])
817 >>> _derivative([0, 0], [3, 1])
820 return (p2
[1] - p1
[1]) / float(p2
[0] - p1
[0])
823 def _pairwise(iterable
: Iterable
[T
]) -> Generator
[Tuple
[Optional
[T
], T
], None, None]:
832 def to_pretty_timedelta(n
: datetime
.timedelta
) -> str:
833 if n
< datetime
.timedelta(seconds
=120):
834 return str(int(n
.total_seconds())) + 's'
835 if n
< datetime
.timedelta(minutes
=120):
836 return str(int(n
.total_seconds()) // 60) + 'm'
837 if n
< datetime
.timedelta(hours
=48):
838 return str(int(n
.total_seconds()) // 3600) + 'h'
839 if n
< datetime
.timedelta(days
=14):
840 return str(int(n
.total_seconds()) // (3600*24)) + 'd'
841 if n
< datetime
.timedelta(days
=7*12):
842 return str(int(n
.total_seconds()) // (3600*24*7)) + 'w'
843 if n
< datetime
.timedelta(days
=365*2):
844 return str(int(n
.total_seconds()) // (3600*24*30)) + 'M'
845 return str(int(n
.total_seconds()) // (3600*24*365)) + 'y'
848 def profile_method(skip_attribute
: bool = False) -> Callable
[[Callable
[..., T
]], Callable
[..., T
]]:
850 Decorator for methods of the Module class. Logs the name of the given
851 function f with the time it takes to execute it.
853 def outer(f
: Callable
[..., T
]) -> Callable
[..., T
]:
855 def wrapper(*args
: Any
, **kwargs
: Any
) -> T
:
858 self
.log
.debug('Starting method {}.'.format(f
.__name
__))
859 result
= f(*args
, **kwargs
)
860 duration
= time
.time() - t
861 if not skip_attribute
:
862 wrapper
._execution
_duration
= duration
# type: ignore
863 self
.log
.debug('Method {} ran {:.3f} seconds.'.format(f
.__name
__, duration
))
869 def password_hash(password
: Optional
[str], salt_password
: Optional
[str] = None) -> Optional
[str]:
872 if not salt_password
:
873 salt
= bcrypt
.gensalt()
875 salt
= salt_password
.encode('utf8')
876 return bcrypt
.hashpw(password
.encode('utf8'), salt
).decode('utf8')