]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/mgr/dashboard/helper.py
import 15.2.0 Octopus source
[ceph.git] / ceph / qa / tasks / mgr / dashboard / helper.py
index 454d86249fa568073fca8d39faf7df86b8508122..6419d318a8f84a69c9d4720326592946ea99e1d8 100644 (file)
@@ -35,7 +35,22 @@ class DashboardTestCase(MgrTestCase):
     AUTH_ROLES = ['administrator']
 
     @classmethod
-    def create_user(cls, username, password, roles):
+    def create_user(cls, username, password, roles=None,
+                    force_password=True, cmd_args=None):
+        """
+        :param username: The name of the user.
+        :type username: str
+        :param password: The password.
+        :type password: str
+        :param roles: A list of roles.
+        :type roles: list
+        :param force_password: Force the use of the specified password. This
+          will bypass the password complexity check. Defaults to 'True'.
+        :type force_password: bool
+        :param cmd_args: Additional command line arguments for the
+          'ac-user-create' command.
+        :type cmd_args: None | list[str]
+        """
         try:
             cls._ceph_cmd(['dashboard', 'ac-user-show', username])
             cls._ceph_cmd(['dashboard', 'ac-user-delete', username])
@@ -43,34 +58,43 @@ class DashboardTestCase(MgrTestCase):
             if ex.exitstatus != 2:
                 raise ex
 
-        cls._ceph_cmd(['dashboard', 'ac-user-create', username, password])
-
-        set_roles_args = ['dashboard', 'ac-user-set-roles', username]
-        for idx, role in enumerate(roles):
-            if isinstance(role, str):
-                set_roles_args.append(role)
-            else:
-                assert isinstance(role, dict)
-                rolename = 'test_role_{}'.format(idx)
-                try:
-                    cls._ceph_cmd(['dashboard', 'ac-role-show', rolename])
-                    cls._ceph_cmd(['dashboard', 'ac-role-delete', rolename])
-                except CommandFailedError as ex:
-                    if ex.exitstatus != 2:
-                        raise ex
-                cls._ceph_cmd(['dashboard', 'ac-role-create', rolename])
-                for mod, perms in role.items():
-                    args = ['dashboard', 'ac-role-add-scope-perms', rolename, mod]
-                    args.extend(perms)
-                    cls._ceph_cmd(args)
-                set_roles_args.append(rolename)
-        cls._ceph_cmd(set_roles_args)
+        user_create_args = [
+            'dashboard', 'ac-user-create', username, password
+        ]
+        if force_password:
+            user_create_args.append('--force-password')
+        if cmd_args:
+            user_create_args.extend(cmd_args)
+        cls._ceph_cmd(user_create_args)
+
+        if roles:
+            set_roles_args = ['dashboard', 'ac-user-set-roles', username]
+            for idx, role in enumerate(roles):
+                if isinstance(role, str):
+                    set_roles_args.append(role)
+                else:
+                    assert isinstance(role, dict)
+                    rolename = 'test_role_{}'.format(idx)
+                    try:
+                        cls._ceph_cmd(['dashboard', 'ac-role-show', rolename])
+                        cls._ceph_cmd(['dashboard', 'ac-role-delete', rolename])
+                    except CommandFailedError as ex:
+                        if ex.exitstatus != 2:
+                            raise ex
+                    cls._ceph_cmd(['dashboard', 'ac-role-create', rolename])
+                    for mod, perms in role.items():
+                        args = ['dashboard', 'ac-role-add-scope-perms', rolename, mod]
+                        args.extend(perms)
+                        cls._ceph_cmd(args)
+                    set_roles_args.append(rolename)
+            cls._ceph_cmd(set_roles_args)
 
     @classmethod
     def login(cls, username, password):
         if cls._loggedin:
             cls.logout()
         cls._post('/api/auth', {'username': username, 'password': password})
+        cls._assertEq(cls._resp.status_code, 201)
         cls._token = cls.jsonBody()['token']
         cls._loggedin = True
 
@@ -78,6 +102,7 @@ class DashboardTestCase(MgrTestCase):
     def logout(cls):
         if cls._loggedin:
             cls._post('/api/auth/logout')
+            cls._assertEq(cls._resp.status_code, 200)
             cls._token = None
             cls._loggedin = False
 
@@ -91,16 +116,22 @@ class DashboardTestCase(MgrTestCase):
                 cls._ceph_cmd(['dashboard', 'ac-role-delete', 'test_role_{}'.format(idx)])
 
     @classmethod
-    def RunAs(cls, username, password, roles):
+    def RunAs(cls, username, password, roles=None, force_password=True,
+              cmd_args=None, login=True):
         def wrapper(func):
             def execute(self, *args, **kwargs):
-                self.create_user(username, password, roles)
-                self.login(username, password)
+                self.create_user(username, password, roles,
+                                 force_password, cmd_args)
+                if login:
+                    self.login(username, password)
                 res = func(self, *args, **kwargs)
-                self.logout()
+                if login:
+                    self.logout()
                 self.delete_user(username, roles)
                 return res
+
             return execute
+
         return wrapper
 
     @classmethod
@@ -149,6 +180,7 @@ class DashboardTestCase(MgrTestCase):
             cls.login('admin', 'admin')
 
     def setUp(self):
+        super(DashboardTestCase, self).setUp()
         if not self._loggedin and self.AUTO_AUTHENTICATE:
             self.login('admin', 'admin')
         self.wait_for_health_clear(20)
@@ -245,7 +277,7 @@ class DashboardTestCase(MgrTestCase):
     @classmethod
     def _task_request(cls, method, url, data, timeout):
         res = cls._request(url, method, data)
-        cls._assertIn(cls._resp.status_code, [200, 201, 202, 204, 400, 403])
+        cls._assertIn(cls._resp.status_code, [200, 201, 202, 204, 400, 403, 404])
 
         if cls._resp.status_code == 403:
             return None
@@ -368,37 +400,43 @@ class DashboardTestCase(MgrTestCase):
         log.info("command result: %s", res)
         return res
 
+    @classmethod
+    def _ceph_cmd_result(cls, cmd):
+        exitstatus = cls.mgr_cluster.mon_manager.raw_cluster_cmd_result(*cmd)
+        log.info("command exit status: %d", exitstatus)
+        return exitstatus
+
     def set_config_key(self, key, value):
         self._ceph_cmd(['config-key', 'set', key, value])
 
     def get_config_key(self, key):
         return self._ceph_cmd(['config-key', 'get', key])
 
+    @classmethod
+    def _cmd(cls, args):
+        return cls.mgr_cluster.admin_remote.run(args=args)
+
     @classmethod
     def _rbd_cmd(cls, cmd):
-        args = [
-            'rbd'
-        ]
+        args = ['rbd']
         args.extend(cmd)
-        cls.mgr_cluster.admin_remote.run(args=args)
+        cls._cmd(args)
 
     @classmethod
     def _radosgw_admin_cmd(cls, cmd):
-        args = [
-            'radosgw-admin'
-        ]
+        args = ['radosgw-admin']
         args.extend(cmd)
-        cls.mgr_cluster.admin_remote.run(args=args)
+        cls._cmd(args)
 
     @classmethod
     def _rados_cmd(cls, cmd):
         args = ['rados']
         args.extend(cmd)
-        cls.mgr_cluster.admin_remote.run(args=args)
+        cls._cmd(args)
 
     @classmethod
     def mons(cls):
-        out = cls.ceph_cluster.mon_manager.raw_cluster_cmd('mon_status')
+        out = cls.ceph_cluster.mon_manager.raw_cluster_cmd('quorum_status')
         j = json.loads(out)
         return [mon['name'] for mon in j['monmap']['mons']]