if 'UNITTEST' in os.environ:
import tests
+import bcrypt
import cephfs
import contextlib
import datetime
logger = logging.getLogger(__name__)
+class PortAlreadyInUse(Exception):
+ pass
+
+
class CephfsConnectionException(Exception):
def __init__(self, error_code: int, error_message: str):
self.errno = error_code
class CephfsClient(Generic[Module_T]):
def __init__(self, mgr: Module_T):
self.mgr = mgr
- self.stopping = Event()
self.connection_pool = CephfsConnectionPool(self.mgr)
- def is_stopping(self) -> bool:
- return self.stopping.is_set()
-
def shutdown(self) -> None:
logger.info("shutting down")
- # first, note that we're shutting down
- self.stopping.set()
# second, delete all libcephfs handles from connection pool
self.connection_pool.del_all_connections()
:param fs_name: fs name
:return: yields a fs handle (ceph filesystem handle)
"""
- if fsc.is_stopping():
- raise CephfsConnectionException(-errno.ESHUTDOWN,
- "shutdown in progress")
-
fs_handle = fsc.connection_pool.get_fs_handle(fs_name)
try:
yield fs_handle
return format_units(n, width, colored, decimal=False)
+def test_port_allocation(addr: str, port: int) -> None:
+ """Checks if the port is available
+ :raises PortAlreadyInUse: in case port is already in use
+ :raises Exception: any generic error other than port already in use
+ If no exception is raised, the port can be assumed available
+ """
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.bind((addr, port))
+ sock.close()
+ except socket.error as e:
+ if e.errno == errno.EADDRINUSE:
+ raise PortAlreadyInUse
+ else:
+ raise e
+
+
def merge_dicts(*args: Dict[T, Any]) -> Dict[T, Any]:
"""
>>> merge_dicts({1:2}, {3:4})
common_name: str = 'mgr',
dname: Optional[Dict[str, str]] = None) -> Tuple[str, str]:
"""Returns self-signed PEM certificates valid for 10 years.
-
+
The optional dname parameter provides complete control of the cert/key
creation by supporting all valid RDNs via a dictionary. However, if dname
is not provided the default O and CN settings will be applied.
# type: (str) -> None
from OpenSSL import crypto
try:
- x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
+ crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
+ x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
if x509.has_expired():
- logger.warning('Certificate has expired: {}'.format(crt))
+ org, cn = get_cert_issuer_info(crt)
+ no_after = x509.get_notAfter()
+ end_date = None
+ if no_after is not None:
+ end_date = datetime.datetime.strptime(no_after.decode('ascii'), '%Y%m%d%H%M%SZ')
+ msg = f'Certificate issued by "{org}/{cn}" expired on {end_date}'
+ logger.warning(msg)
+ raise ServerConfigException(msg)
except (ValueError, crypto.Error) as e:
- raise ServerConfigException(
- 'Invalid certificate: {}'.format(str(e)))
+ raise ServerConfigException(f'Invalid certificate: {e}')
def verify_cacrt(cert_fname):
raise ServerConfigException(
'Invalid certificate {}: {}'.format(cert_fname, str(e)))
+def get_cert_issuer_info(crt: str) -> Tuple[Optional[str],Optional[str]]:
+ """Basic validation of a ca cert"""
+
+ from OpenSSL import crypto, SSL
+ try:
+ crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
+ (org_name, cn) = (None, None)
+ cert = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
+ components = cert.get_issuer().get_components()
+ for c in components:
+ if c[0].decode() == 'O': # org comp
+ org_name = c[1].decode()
+ elif c[0].decode() == 'CN': # common name comp
+ cn = c[1].decode()
+ return (org_name, cn)
+ except (ValueError, crypto.Error) as e:
+ raise ServerConfigException(f'Invalid certificate key: {e}')
def verify_tls(crt, key):
# type: (str, str) -> None
raise ServerConfigException(
'Invalid private key: {}'.format(str(e)))
try:
- _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
+ crt_buffer = crt.encode("ascii") if isinstance(crt, str) else crt
+ _crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt_buffer)
except ValueError as e:
raise ServerConfigException(
'Invalid certificate key: {}'.format(str(e))
context.use_privatekey(_key)
context.check_privatekey()
except crypto.Error as e:
- logger.warning(
- 'Private key and certificate do not match up: {}'.format(str(e)))
+ logger.warning('Private key and certificate do not match up: {}'.format(str(e)))
+ except SSL.Error as e:
+ raise ServerConfigException(f'Invalid cert/key pair: {e}')
+
def verify_tls_files(cert_fname, pkey_fname):
def to_pretty_timedelta(n: datetime.timedelta) -> str:
if n < datetime.timedelta(seconds=120):
- return str(n.seconds) + 's'
+ return str(int(n.total_seconds())) + 's'
if n < datetime.timedelta(minutes=120):
- return str(n.seconds // 60) + 'm'
+ return str(int(n.total_seconds()) // 60) + 'm'
if n < datetime.timedelta(hours=48):
- return str(n.seconds // 3600) + 'h'
+ return str(int(n.total_seconds()) // 3600) + 'h'
if n < datetime.timedelta(days=14):
- return str(n.days) + 'd'
+ return str(int(n.total_seconds()) // (3600*24)) + 'd'
if n < datetime.timedelta(days=7*12):
- return str(n.days // 7) + 'w'
+ return str(int(n.total_seconds()) // (3600*24*7)) + 'w'
if n < datetime.timedelta(days=365*2):
- return str(n.days // 30) + 'M'
- return str(n.days // 365) + 'y'
+ return str(int(n.total_seconds()) // (3600*24*30)) + 'M'
+ return str(int(n.total_seconds()) // (3600*24*365)) + 'y'
def profile_method(skip_attribute: bool = False) -> Callable[[Callable[..., T]], Callable[..., T]]:
return result
return wrapper
return outer
+
+
+def password_hash(password: Optional[str], salt_password: Optional[str] = None) -> Optional[str]:
+ if not password:
+ return None
+ if not salt_password:
+ salt = bcrypt.gensalt()
+ else:
+ salt = salt_password.encode('utf8')
+ return bcrypt.hashpw(password.encode('utf8'), salt).decode('utf8')