# -*- coding: utf-8 -*-
# pylint: disable=too-many-arguments,too-many-return-statements
# pylint: disable=too-many-branches, too-many-locals, too-many-statements
-from __future__ import absolute_import
import errno
import json
from ..settings import Settings
logger = logging.getLogger('access_control')
+DEFAULT_FILE_DESC = 'password/secret'
# password hashing algorithm
version = cls.VERSION
return "{}{}".format(cls.ACDB_CONFIG_KEY, version)
+ def check_and_update_db(self):
+ logger.debug("Checking for previous DB versions")
+
+ def check_migrate_v1_to_current():
+ # Check if version 1 exists in the DB and migrate it to current version
+ v1_db = mgr.get_store(self.accessdb_config_key(1))
+ if v1_db:
+ logger.debug("Found database v1 credentials")
+ v1_db = json.loads(v1_db)
+
+ for user, _ in v1_db['users'].items():
+ v1_db['users'][user]['enabled'] = True
+ v1_db['users'][user]['pwdExpirationDate'] = None
+ v1_db['users'][user]['pwdUpdateRequired'] = False
+
+ self.roles = {rn: Role.from_dict(r) for rn, r in v1_db.get('roles', {}).items()}
+ self.users = {un: User.from_dict(u, dict(self.roles, **SYSTEM_ROLES))
+ for un, u in v1_db.get('users', {}).items()}
+
+ self.save()
+
+ check_migrate_v1_to_current()
+
@classmethod
def load(cls):
logger.info("Loading user roles DB version=%s", cls.VERSION)
if json_db is None:
logger.debug("No DB v%s found, creating new...", cls.VERSION)
db = cls(cls.VERSION, {}, {})
+ # check if we can update from a previous version database
+ db.check_and_update_db()
return db
dict_db = json.loads(json_db)
# CLI dashboard access control scope commands
@CLIWriteCommand('dashboard set-login-credentials')
-@CLICheckNonemptyFileInput
+@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
def set_login_credentials_cmd(_, username: str, inbuf: str):
'''
Set the login credentials. Password read from -i <file>
@CLIWriteCommand('dashboard ac-user-create')
-@CLICheckNonemptyFileInput
+@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
def ac_user_create_cmd(_, username: str, inbuf: str,
rolename: Optional[str] = None,
name: Optional[str] = None,
@CLIWriteCommand('dashboard ac-user-set-password')
-@CLICheckNonemptyFileInput
+@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
def ac_user_set_password(_, username: str, inbuf: str,
force_password: bool = False):
'''
@CLIWriteCommand('dashboard ac-user-set-password-hash')
-@CLICheckNonemptyFileInput
+@CLICheckNonemptyFileInput(desc=DEFAULT_FILE_DESC)
def ac_user_set_password_hash(_, username: str, inbuf: str):
'''
Set user password bcrypt hash from -i <file>