version = cls.VERSION
return "{}{}".format(cls.ACDB_CONFIG_KEY, version)
+ def check_and_update_db(self):
+ logger.debug("Checking for previous DB versions")
+
+ def check_migrate_v1_to_current():
+ # Check if version 1 exists in the DB and migrate it to current version
+ v1_db = mgr.get_store(self.accessdb_config_key(1))
+ if v1_db:
+ logger.debug("Found database v1 credentials")
+ v1_db = json.loads(v1_db)
+
+ for user, _ in v1_db['users'].items():
+ v1_db['users'][user]['enabled'] = True
+ v1_db['users'][user]['pwdExpirationDate'] = None
+ v1_db['users'][user]['pwdUpdateRequired'] = False
+
+ self.roles = {rn: Role.from_dict(r) for rn, r in v1_db.get('roles', {}).items()}
+ self.users = {un: User.from_dict(u, dict(self.roles, **SYSTEM_ROLES))
+ for un, u in v1_db.get('users', {}).items()}
+
+ self.save()
+
+ check_migrate_v1_to_current()
+
@classmethod
def load(cls):
logger.info("Loading user roles DB version=%s", cls.VERSION)
if json_db is None:
logger.debug("No DB v%s found, creating new...", cls.VERSION)
db = cls(cls.VERSION, {}, {})
+ # check if we can update from a previous version database
+ db.check_and_update_db()
return db
dict_db = json.loads(json_db)