AUTH_ROLES = ['administrator']
@classmethod
- def create_user(cls, username, password, roles):
+ def create_user(cls, username, password, roles=None,
+ force_password=True, cmd_args=None):
+ """
+ :param username: The name of the user.
+ :type username: str
+ :param password: The password.
+ :type password: str
+ :param roles: A list of roles.
+ :type roles: list
+ :param force_password: Force the use of the specified password. This
+ will bypass the password complexity check. Defaults to 'True'.
+ :type force_password: bool
+ :param cmd_args: Additional command line arguments for the
+ 'ac-user-create' command.
+ :type cmd_args: None | list[str]
+ """
try:
cls._ceph_cmd(['dashboard', 'ac-user-show', username])
cls._ceph_cmd(['dashboard', 'ac-user-delete', username])
if ex.exitstatus != 2:
raise ex
- cls._ceph_cmd(['dashboard', 'ac-user-create', username, password])
-
- set_roles_args = ['dashboard', 'ac-user-set-roles', username]
- for idx, role in enumerate(roles):
- if isinstance(role, str):
- set_roles_args.append(role)
- else:
- assert isinstance(role, dict)
- rolename = 'test_role_{}'.format(idx)
- try:
- cls._ceph_cmd(['dashboard', 'ac-role-show', rolename])
- cls._ceph_cmd(['dashboard', 'ac-role-delete', rolename])
- except CommandFailedError as ex:
- if ex.exitstatus != 2:
- raise ex
- cls._ceph_cmd(['dashboard', 'ac-role-create', rolename])
- for mod, perms in role.items():
- args = ['dashboard', 'ac-role-add-scope-perms', rolename, mod]
- args.extend(perms)
- cls._ceph_cmd(args)
- set_roles_args.append(rolename)
- cls._ceph_cmd(set_roles_args)
+ user_create_args = [
+ 'dashboard', 'ac-user-create', username, password
+ ]
+ if force_password:
+ user_create_args.append('--force-password')
+ if cmd_args:
+ user_create_args.extend(cmd_args)
+ cls._ceph_cmd(user_create_args)
+
+ if roles:
+ set_roles_args = ['dashboard', 'ac-user-set-roles', username]
+ for idx, role in enumerate(roles):
+ if isinstance(role, str):
+ set_roles_args.append(role)
+ else:
+ assert isinstance(role, dict)
+ rolename = 'test_role_{}'.format(idx)
+ try:
+ cls._ceph_cmd(['dashboard', 'ac-role-show', rolename])
+ cls._ceph_cmd(['dashboard', 'ac-role-delete', rolename])
+ except CommandFailedError as ex:
+ if ex.exitstatus != 2:
+ raise ex
+ cls._ceph_cmd(['dashboard', 'ac-role-create', rolename])
+ for mod, perms in role.items():
+ args = ['dashboard', 'ac-role-add-scope-perms', rolename, mod]
+ args.extend(perms)
+ cls._ceph_cmd(args)
+ set_roles_args.append(rolename)
+ cls._ceph_cmd(set_roles_args)
@classmethod
def login(cls, username, password):
if cls._loggedin:
cls.logout()
cls._post('/api/auth', {'username': username, 'password': password})
+ cls._assertEq(cls._resp.status_code, 201)
cls._token = cls.jsonBody()['token']
cls._loggedin = True
def logout(cls):
if cls._loggedin:
cls._post('/api/auth/logout')
+ cls._assertEq(cls._resp.status_code, 200)
cls._token = None
cls._loggedin = False
cls._ceph_cmd(['dashboard', 'ac-role-delete', 'test_role_{}'.format(idx)])
@classmethod
- def RunAs(cls, username, password, roles):
+ def RunAs(cls, username, password, roles=None, force_password=True,
+ cmd_args=None, login=True):
def wrapper(func):
def execute(self, *args, **kwargs):
- self.create_user(username, password, roles)
- self.login(username, password)
+ self.create_user(username, password, roles,
+ force_password, cmd_args)
+ if login:
+ self.login(username, password)
res = func(self, *args, **kwargs)
- self.logout()
+ if login:
+ self.logout()
self.delete_user(username, roles)
return res
+
return execute
+
return wrapper
@classmethod
cls.login('admin', 'admin')
def setUp(self):
+ super(DashboardTestCase, self).setUp()
if not self._loggedin and self.AUTO_AUTHENTICATE:
self.login('admin', 'admin')
self.wait_for_health_clear(20)
@classmethod
def _task_request(cls, method, url, data, timeout):
res = cls._request(url, method, data)
- cls._assertIn(cls._resp.status_code, [200, 201, 202, 204, 400, 403])
+ cls._assertIn(cls._resp.status_code, [200, 201, 202, 204, 400, 403, 404])
if cls._resp.status_code == 403:
return None
log.info("command result: %s", res)
return res
+ @classmethod
+ def _ceph_cmd_result(cls, cmd):
+ exitstatus = cls.mgr_cluster.mon_manager.raw_cluster_cmd_result(*cmd)
+ log.info("command exit status: %d", exitstatus)
+ return exitstatus
+
def set_config_key(self, key, value):
self._ceph_cmd(['config-key', 'set', key, value])
def get_config_key(self, key):
return self._ceph_cmd(['config-key', 'get', key])
+ @classmethod
+ def _cmd(cls, args):
+ return cls.mgr_cluster.admin_remote.run(args=args)
+
@classmethod
def _rbd_cmd(cls, cmd):
- args = [
- 'rbd'
- ]
+ args = ['rbd']
args.extend(cmd)
- cls.mgr_cluster.admin_remote.run(args=args)
+ cls._cmd(args)
@classmethod
def _radosgw_admin_cmd(cls, cmd):
- args = [
- 'radosgw-admin'
- ]
+ args = ['radosgw-admin']
args.extend(cmd)
- cls.mgr_cluster.admin_remote.run(args=args)
+ cls._cmd(args)
@classmethod
def _rados_cmd(cls, cmd):
args = ['rados']
args.extend(cmd)
- cls.mgr_cluster.admin_remote.run(args=args)
+ cls._cmd(args)
@classmethod
def mons(cls):
- out = cls.ceph_cluster.mon_manager.raw_cluster_cmd('mon_status')
+ out = cls.ceph_cluster.mon_manager.raw_cluster_cmd('quorum_status')
j = json.loads(out)
return [mon['name'] for mon in j['monmap']['mons']]