]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/mgr/dashboard/test_auth.py
update download target update for octopus release
[ceph.git] / ceph / qa / tasks / mgr / dashboard / test_auth.py
CommitLineData
11fdf7f2
TL
1# -*- coding: utf-8 -*-
2
3from __future__ import absolute_import
4
5import time
6
7import jwt
8
9from .helper import DashboardTestCase
10
11
12class AuthTest(DashboardTestCase):
13
14 AUTO_AUTHENTICATE = False
15
16 def setUp(self):
17 self.reset_session()
18
19 def _validate_jwt_token(self, token, username, permissions):
20 payload = jwt.decode(token, verify=False)
21 self.assertIn('username', payload)
22 self.assertEqual(payload['username'], username)
23
24 for scope, perms in permissions.items():
25 self.assertIsNotNone(scope)
26 self.assertIn('read', perms)
27 self.assertIn('update', perms)
28 self.assertIn('create', perms)
29 self.assertIn('delete', perms)
30
31 def test_a_set_login_credentials(self):
32 self.create_user('admin2', 'admin2', ['administrator'])
33 self._post("/api/auth", {'username': 'admin2', 'password': 'admin2'})
34 self.assertStatus(201)
35 data = self.jsonBody()
36 self._validate_jwt_token(data['token'], "admin2", data['permissions'])
37 self.delete_user('admin2')
38
39 def test_login_valid(self):
40 self._post("/api/auth", {'username': 'admin', 'password': 'admin'})
41 self.assertStatus(201)
42 data = self.jsonBody()
43 self._validate_jwt_token(data['token'], "admin", data['permissions'])
44
45 def test_login_invalid(self):
46 self._post("/api/auth", {'username': 'admin', 'password': 'inval'})
47 self.assertStatus(400)
48 self.assertJsonBody({
49 "component": "auth",
50 "code": "invalid_credentials",
51 "detail": "Invalid credentials"
52 })
53
54 def test_login_without_password(self):
55 self.create_user('admin2', '', ['administrator'])
56 self._post("/api/auth", {'username': 'admin2', 'password': ''})
57 self.assertStatus(400)
58 self.assertJsonBody({
59 "component": "auth",
60 "code": "invalid_credentials",
61 "detail": "Invalid credentials"
62 })
63 self.delete_user('admin2')
64
65 def test_logout(self):
66 self._post("/api/auth", {'username': 'admin', 'password': 'admin'})
67 self.assertStatus(201)
68 data = self.jsonBody()
69 self._validate_jwt_token(data['token'], "admin", data['permissions'])
70 self.set_jwt_token(data['token'])
71 self._post("/api/auth/logout")
72 self.assertStatus(200)
73 self.assertJsonBody({
74 "redirect_url": "#/login"
75 })
76 self._get("/api/host")
77 self.assertStatus(401)
78 self.set_jwt_token(None)
79
80 def test_token_ttl(self):
81 self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '5'])
82 self._post("/api/auth", {'username': 'admin', 'password': 'admin'})
83 self.assertStatus(201)
84 self.set_jwt_token(self.jsonBody()['token'])
85 self._get("/api/host")
86 self.assertStatus(200)
87 time.sleep(6)
88 self._get("/api/host")
89 self.assertStatus(401)
90 self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '28800'])
91 self.set_jwt_token(None)
92
93 def test_remove_from_blacklist(self):
94 self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '5'])
95 self._post("/api/auth", {'username': 'admin', 'password': 'admin'})
96 self.assertStatus(201)
97 self.set_jwt_token(self.jsonBody()['token'])
98 # the following call adds the token to the blacklist
99 self._post("/api/auth/logout")
100 self.assertStatus(200)
101 self._get("/api/host")
102 self.assertStatus(401)
103 time.sleep(6)
104 self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '28800'])
105 self.set_jwt_token(None)
106 self._post("/api/auth", {'username': 'admin', 'password': 'admin'})
107 self.assertStatus(201)
108 self.set_jwt_token(self.jsonBody()['token'])
109 # the following call removes expired tokens from the blacklist
110 self._post("/api/auth/logout")
111 self.assertStatus(200)
112
113 def test_unauthorized(self):
114 self._get("/api/host")
115 self.assertStatus(401)
116
117 def test_invalidate_token_by_admin(self):
118 self._get("/api/host")
119 self.assertStatus(401)
120 self.create_user('user', 'user', ['read-only'])
121 time.sleep(1)
122 self._post("/api/auth", {'username': 'user', 'password': 'user'})
123 self.assertStatus(201)
124 self.set_jwt_token(self.jsonBody()['token'])
125 self._get("/api/host")
126 self.assertStatus(200)
127 time.sleep(1)
128 self._ceph_cmd(['dashboard', 'ac-user-set-password', 'user', 'user2'])
129 time.sleep(1)
130 self._get("/api/host")
131 self.assertStatus(401)
132 self.set_jwt_token(None)
133 self._post("/api/auth", {'username': 'user', 'password': 'user2'})
134 self.assertStatus(201)
135 self.set_jwt_token(self.jsonBody()['token'])
136 self._get("/api/host")
137 self.assertStatus(200)
138 self.delete_user("user")