# -*- coding: utf-8 -*-
# pylint: disable=too-many-arguments,too-many-return-statements
# pylint: disable=too-many-branches, too-many-locals, too-many-statements
-from __future__ import absolute_import
-
-from string import punctuation, ascii_lowercase, digits, ascii_uppercase
import errno
import json
import logging
+import re
import threading
import time
-import re
-
from datetime import datetime, timedelta
+from string import ascii_lowercase, ascii_uppercase, digits, punctuation
+from typing import List, Optional, Sequence
import bcrypt
-
-from mgr_module import CLIReadCommand, CLIWriteCommand
+from mgr_module import CLICheckNonemptyFileInput, CLIReadCommand, CLIWriteCommand
from .. import mgr
-from ..security import Scope, Permission
+from ..exceptions import PasswordPolicyException, PermissionNotValid, \
+ PwdExpirationDateNotValid, RoleAlreadyExists, RoleDoesNotExist, \
+ RoleIsAssociatedWithUser, RoleNotInUser, ScopeNotInRole, ScopeNotValid, \
+ UserAlreadyExists, UserDoesNotExist
+from ..security import Permission, Scope
from ..settings import Settings
-from ..exceptions import RoleAlreadyExists, RoleDoesNotExist, ScopeNotValid, \
- PermissionNotValid, RoleIsAssociatedWithUser, \
- UserAlreadyExists, UserDoesNotExist, ScopeNotInRole, \
- RoleNotInUser, PasswordPolicyException, PwdExpirationDateNotValid
-
logger = logging.getLogger('access_control')
+DEFAULT_FILE_DESC = 'password/secret'
# password hashing algorithm
# this roles cannot be deleted nor updated
# admin role provides all permissions for all scopes
-ADMIN_ROLE = Role('administrator', 'Administrator', {
- scope_name: Permission.all_permissions()
- for scope_name in Scope.all_scopes()
-})
+ADMIN_ROLE = Role(
+ 'administrator', 'allows full permissions for all security scopes', {
+ scope_name: Permission.all_permissions()
+ for scope_name in Scope.all_scopes()
+ })
# read-only role provides read-only permission for all scopes
-READ_ONLY_ROLE = Role('read-only', 'Read-Only', {
- scope_name: [_P.READ] for scope_name in Scope.all_scopes()
- if scope_name != Scope.DASHBOARD_SETTINGS
-})
+READ_ONLY_ROLE = Role(
+ 'read-only',
+ 'allows read permission for all security scope except dashboard settings and config-opt', {
+ scope_name: [_P.READ] for scope_name in Scope.all_scopes()
+ if scope_name not in (Scope.DASHBOARD_SETTINGS, Scope.CONFIG_OPT)
+ })
# block manager role provides all permission for block related scopes
-BLOCK_MGR_ROLE = Role('block-manager', 'Block Manager', {
- Scope.RBD_IMAGE: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.POOL: [_P.READ],
- Scope.ISCSI: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.RBD_MIRRORING: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.GRAFANA: [_P.READ],
-})
+BLOCK_MGR_ROLE = Role(
+ 'block-manager', 'allows full permissions for rbd-image, rbd-mirroring, and iscsi scopes', {
+ Scope.RBD_IMAGE: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.POOL: [_P.READ],
+ Scope.ISCSI: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.RBD_MIRRORING: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.GRAFANA: [_P.READ],
+ })
# RadosGW manager role provides all permissions for block related scopes
-RGW_MGR_ROLE = Role('rgw-manager', 'RGW Manager', {
- Scope.RGW: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.GRAFANA: [_P.READ],
-})
+RGW_MGR_ROLE = Role(
+ 'rgw-manager', 'allows full permissions for the rgw scope', {
+ Scope.RGW: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.GRAFANA: [_P.READ],
+ })
# Cluster manager role provides all permission for OSDs, Monitors, and
# Config options
-CLUSTER_MGR_ROLE = Role('cluster-manager', 'Cluster Manager', {
- Scope.HOSTS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.OSD: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.MONITOR: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.MANAGER: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.CONFIG_OPT: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.LOG: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.GRAFANA: [_P.READ],
-})
+CLUSTER_MGR_ROLE = Role(
+ 'cluster-manager', """allows full permissions for the hosts, osd, mon, mgr,
+ and config-opt scopes""", {
+ Scope.HOSTS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.OSD: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.MONITOR: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.MANAGER: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.CONFIG_OPT: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.LOG: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.GRAFANA: [_P.READ],
+ })
# Pool manager role provides all permissions for pool related scopes
-POOL_MGR_ROLE = Role('pool-manager', 'Pool Manager', {
- Scope.POOL: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.GRAFANA: [_P.READ],
-})
+POOL_MGR_ROLE = Role(
+ 'pool-manager', 'allows full permissions for the pool scope', {
+ Scope.POOL: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.GRAFANA: [_P.READ],
+ })
# CephFS manager role provides all permissions for CephFS related scopes
-CEPHFS_MGR_ROLE = Role('cephfs-manager', 'CephFS Manager', {
- Scope.CEPHFS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.GRAFANA: [_P.READ],
-})
+CEPHFS_MGR_ROLE = Role(
+ 'cephfs-manager', 'allows full permissions for the cephfs scope', {
+ Scope.CEPHFS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.GRAFANA: [_P.READ],
+ })
-GANESHA_MGR_ROLE = Role('ganesha-manager', 'NFS Ganesha Manager', {
- Scope.NFS_GANESHA: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.CEPHFS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.RGW: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
- Scope.GRAFANA: [_P.READ],
-})
+GANESHA_MGR_ROLE = Role(
+ 'ganesha-manager', 'allows full permissions for the nfs-ganesha scope', {
+ Scope.NFS_GANESHA: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.CEPHFS: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.RGW: [_P.READ, _P.CREATE, _P.UPDATE, _P.DELETE],
+ Scope.GRAFANA: [_P.READ],
+ })
SYSTEM_ROLES = {
self.password = password
self.name = name
self.email = email
+ self.invalid_auth_attempt = 0
if roles is None:
self.roles = set()
else:
self.set_password_hash(password_hash(password))
def set_password_hash(self, hashed_password):
+ self.invalid_auth_attempt = 0
self.password = hashed_password
self.refresh_last_update()
self.refresh_pwd_expiration_date()
raise RoleDoesNotExist(name)
return self.roles[name]
+ def increment_attempt(self, username):
+ with self.lock:
+ if username in self.users:
+ self.users[username].invalid_auth_attempt += 1
+
+ def reset_attempt(self, username):
+ with self.lock:
+ if username in self.users:
+ self.users[username].invalid_auth_attempt = 0
+
+ def get_attempt(self, username):
+ with self.lock:
+ try:
+ return self.users[username].invalid_auth_attempt
+ except KeyError:
+ return 0
+
def delete_role(self, name):
with self.lock:
if name not in self.roles:
return "{}{}".format(cls.ACDB_CONFIG_KEY, version)
def check_and_update_db(self):
- logger.debug("Checking for previews DB versions")
-
- def check_migrate_v0_to_current():
- # check if there is username/password from previous version
- username = mgr.get_module_option('username', None)
- password = mgr.get_module_option('password', None)
- if username and password:
- logger.debug("Found single user credentials: user=%s", username)
- # found user credentials
- user = self.create_user(username, "", None, None)
- # password is already hashed, so setting manually
- user.password = password
- user.add_roles([ADMIN_ROLE])
- self.save()
+ logger.debug("Checking for previous DB versions")
def check_migrate_v1_to_current():
# Check if version 1 exists in the DB and migrate it to current version
for un, u in v1_db.get('users', {}).items()}
self.save()
- else:
- # If version 1 does not exist, check if migration of VERSION "0" needs to be done
- check_migrate_v0_to_current()
check_migrate_v1_to_current()
# CLI dashboard access control scope commands
-@CLIWriteCommand('dashboard set-login-credentials',
- 'name=username,type=CephString '
- 'name=password,type=CephString',
- 'Set the login credentials')
-def set_login_credentials_cmd(_, username, password):
+@CLIWriteCommand('dashboard set-login-credentials')
+@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
+def set_login_credentials_cmd(_, username: str, inbuf: str):
+ '''
+ Set the login credentials. Password read from -i <file>
+ '''
+ password = inbuf
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
user.set_password(password)
Username and password updated''', ''
-@CLIReadCommand('dashboard ac-role-show',
- 'name=rolename,type=CephString,req=false',
- 'Show role info')
-def ac_role_show_cmd(_, rolename=None):
+@CLIReadCommand('dashboard ac-role-show')
+def ac_role_show_cmd(_, rolename: Optional[str] = None):
+ '''
+ Show role info
+ '''
if not rolename:
roles = dict(mgr.ACCESS_CTRL_DB.roles)
roles.update(SYSTEM_ROLES)
return 0, json.dumps(role.to_dict()), ''
-@CLIWriteCommand('dashboard ac-role-create',
- 'name=rolename,type=CephString '
- 'name=description,type=CephString,req=false',
- 'Create a new access control role')
-def ac_role_create_cmd(_, rolename, description=None):
+@CLIWriteCommand('dashboard ac-role-create')
+def ac_role_create_cmd(_, rolename: str, description: Optional[str] = None):
+ '''
+ Create a new access control role
+ '''
try:
role = mgr.ACCESS_CTRL_DB.create_role(rolename, description)
mgr.ACCESS_CTRL_DB.save()
return -errno.EEXIST, '', str(ex)
-@CLIWriteCommand('dashboard ac-role-delete',
- 'name=rolename,type=CephString',
- 'Delete an access control role')
-def ac_role_delete_cmd(_, rolename):
+@CLIWriteCommand('dashboard ac-role-delete')
+def ac_role_delete_cmd(_, rolename: str):
+ '''
+ Delete an access control role
+ '''
try:
mgr.ACCESS_CTRL_DB.delete_role(rolename)
mgr.ACCESS_CTRL_DB.save()
except RoleDoesNotExist as ex:
if rolename in SYSTEM_ROLES:
return -errno.EPERM, '', "Cannot delete system role '{}'" \
- .format(rolename)
+ .format(rolename)
return -errno.ENOENT, '', str(ex)
except RoleIsAssociatedWithUser as ex:
return -errno.EPERM, '', str(ex)
-@CLIWriteCommand('dashboard ac-role-add-scope-perms',
- 'name=rolename,type=CephString '
- 'name=scopename,type=CephString '
- 'name=permissions,type=CephString,n=N',
- 'Add the scope permissions for a role')
-def ac_role_add_scope_perms_cmd(_, rolename, scopename, permissions):
+@CLIWriteCommand('dashboard ac-role-add-scope-perms')
+def ac_role_add_scope_perms_cmd(_,
+ rolename: str,
+ scopename: str,
+ permissions: Sequence[str]):
+ '''
+ Add the scope permissions for a role
+ '''
try:
role = mgr.ACCESS_CTRL_DB.get_role(rolename)
perms_array = [perm.strip() for perm in permissions]
except RoleDoesNotExist as ex:
if rolename in SYSTEM_ROLES:
return -errno.EPERM, '', "Cannot update system role '{}'" \
- .format(rolename)
+ .format(rolename)
return -errno.ENOENT, '', str(ex)
except ScopeNotValid as ex:
return -errno.EINVAL, '', str(ex) + "\n Possible values: {}" \
.format(Scope.all_scopes())
except PermissionNotValid as ex:
return -errno.EINVAL, '', str(ex) + \
- "\n Possible values: {}" \
- .format(Permission.all_permissions())
+ "\n Possible values: {}" \
+ .format(Permission.all_permissions())
-@CLIWriteCommand('dashboard ac-role-del-scope-perms',
- 'name=rolename,type=CephString '
- 'name=scopename,type=CephString',
- 'Delete the scope permissions for a role')
-def ac_role_del_scope_perms_cmd(_, rolename, scopename):
+@CLIWriteCommand('dashboard ac-role-del-scope-perms')
+def ac_role_del_scope_perms_cmd(_, rolename: str, scopename: str):
+ '''
+ Delete the scope permissions for a role
+ '''
try:
role = mgr.ACCESS_CTRL_DB.get_role(rolename)
role.del_scope_permissions(scopename)
except RoleDoesNotExist as ex:
if rolename in SYSTEM_ROLES:
return -errno.EPERM, '', "Cannot update system role '{}'" \
- .format(rolename)
+ .format(rolename)
return -errno.ENOENT, '', str(ex)
except ScopeNotInRole as ex:
return -errno.ENOENT, '', str(ex)
-@CLIReadCommand('dashboard ac-user-show',
- 'name=username,type=CephString,req=false',
- 'Show user info')
-def ac_user_show_cmd(_, username=None):
+@CLIReadCommand('dashboard ac-user-show')
+def ac_user_show_cmd(_, username: Optional[str] = None):
+ '''
+ Show user info
+ '''
if not username:
users = mgr.ACCESS_CTRL_DB.users
users_list = [name for name, _ in users.items()]
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-create',
- 'name=username,type=CephString '
- 'name=password,type=CephString,req=false '
- 'name=rolename,type=CephString,req=false '
- 'name=name,type=CephString,req=false '
- 'name=email,type=CephString,req=false '
- 'name=enabled,type=CephBool,req=false '
- 'name=force_password,type=CephBool,req=false '
- 'name=pwd_expiration_date,type=CephInt,req=false '
- 'name=pwd_update_required,type=CephBool,req=false',
- 'Create a user')
-def ac_user_create_cmd(_, username, password=None, rolename=None, name=None,
- email=None, enabled=True, force_password=False,
- pwd_expiration_date=None, pwd_update_required=False):
+@CLIWriteCommand('dashboard ac-user-create')
+@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
+def ac_user_create_cmd(_, username: str, inbuf: str,
+ rolename: Optional[str] = None,
+ name: Optional[str] = None,
+ email: Optional[str] = None,
+ enabled: bool = True,
+ force_password: bool = False,
+ pwd_expiration_date: Optional[int] = None,
+ pwd_update_required: bool = False):
+ '''
+ Create a user. Password read from -i <file>
+ '''
+ password = inbuf
try:
role = mgr.ACCESS_CTRL_DB.get_role(rolename) if rolename else None
except RoleDoesNotExist as ex:
return 0, json.dumps(user.to_dict()), ''
-@CLIWriteCommand('dashboard ac-user-enable',
- 'name=username,type=CephString',
- 'Enable a user')
-def ac_user_enable(_, username):
+@CLIWriteCommand('dashboard ac-user-enable')
+def ac_user_enable(_, username: str):
+ '''
+ Enable a user
+ '''
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
user.enabled = True
+ mgr.ACCESS_CTRL_DB.reset_attempt(username)
mgr.ACCESS_CTRL_DB.save()
return 0, json.dumps(user.to_dict()), ''
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-disable',
- 'name=username,type=CephString',
- 'Disable a user')
-def ac_user_disable(_, username):
+@CLIWriteCommand('dashboard ac-user-disable')
+def ac_user_disable(_, username: str):
+ '''
+ Disable a user
+ '''
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
user.enabled = False
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-delete',
- 'name=username,type=CephString',
- 'Delete user')
-def ac_user_delete_cmd(_, username):
+@CLIWriteCommand('dashboard ac-user-delete')
+def ac_user_delete_cmd(_, username: str):
+ '''
+ Delete user
+ '''
try:
mgr.ACCESS_CTRL_DB.delete_user(username)
mgr.ACCESS_CTRL_DB.save()
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-set-roles',
- 'name=username,type=CephString '
- 'name=roles,type=CephString,n=N',
- 'Set user roles')
-def ac_user_set_roles_cmd(_, username, roles):
+@CLIWriteCommand('dashboard ac-user-set-roles')
+def ac_user_set_roles_cmd(_, username: str, roles: Sequence[str]):
+ '''
+ Set user roles
+ '''
rolesname = roles
- roles = []
+ roles: List[Role] = []
for rolename in rolesname:
try:
roles.append(mgr.ACCESS_CTRL_DB.get_role(rolename))
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-add-roles',
- 'name=username,type=CephString '
- 'name=roles,type=CephString,n=N',
- 'Add roles to user')
-def ac_user_add_roles_cmd(_, username, roles):
+@CLIWriteCommand('dashboard ac-user-add-roles')
+def ac_user_add_roles_cmd(_, username: str, roles: Sequence[str]):
+ '''
+ Add roles to user
+ '''
rolesname = roles
- roles = []
+ roles: List[Role] = []
for rolename in rolesname:
try:
roles.append(mgr.ACCESS_CTRL_DB.get_role(rolename))
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-del-roles',
- 'name=username,type=CephString '
- 'name=roles,type=CephString,n=N',
- 'Delete roles from user')
-def ac_user_del_roles_cmd(_, username, roles):
+@CLIWriteCommand('dashboard ac-user-del-roles')
+def ac_user_del_roles_cmd(_, username: str, roles: Sequence[str]):
+ '''
+ Delete roles from user
+ '''
rolesname = roles
- roles = []
+ roles: List[Role] = []
for rolename in rolesname:
try:
roles.append(mgr.ACCESS_CTRL_DB.get_role(rolename))
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-set-password',
- 'name=username,type=CephString '
- 'name=password,type=CephString '
- 'name=force_password,type=CephBool,req=false',
- 'Set user password')
-def ac_user_set_password(_, username, password, force_password=False):
+@CLIWriteCommand('dashboard ac-user-set-password')
+@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
+def ac_user_set_password(_, username: str, inbuf: str,
+ force_password: bool = False):
+ '''
+ Set user password from -i <file>
+ '''
+ password = inbuf
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
if not force_password:
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-set-password-hash',
- 'name=username,type=CephString '
- 'name=hashed_password,type=CephString',
- 'Set user password bcrypt hash')
-def ac_user_set_password_hash(_, username, hashed_password):
+@CLIWriteCommand('dashboard ac-user-set-password-hash')
+@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
+def ac_user_set_password_hash(_, username: str, inbuf: str):
+ '''
+ Set user password bcrypt hash from -i <file>
+ '''
+ hashed_password = inbuf
try:
# make sure the hashed_password is actually a bcrypt hash
bcrypt.checkpw(b'', hashed_password.encode('utf-8'))
return -errno.ENOENT, '', str(ex)
-@CLIWriteCommand('dashboard ac-user-set-info',
- 'name=username,type=CephString '
- 'name=name,type=CephString '
- 'name=email,type=CephString',
- 'Set user info')
-def ac_user_set_info(_, username, name, email):
+@CLIWriteCommand('dashboard ac-user-set-info')
+def ac_user_set_info(_, username: str, name: str, email: str):
+ '''
+ Set user info
+ '''
try:
user = mgr.ACCESS_CTRL_DB.get_user(username)
if name: