]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | # -*- coding: utf-8 -*- |
2 | ||
3 | from __future__ import absolute_import | |
4 | ||
5 | import time | |
6 | ||
7 | import jwt | |
8 | ||
9 | from .helper import DashboardTestCase | |
10 | ||
11 | ||
12 | class AuthTest(DashboardTestCase): | |
13 | ||
14 | AUTO_AUTHENTICATE = False | |
15 | ||
16 | def setUp(self): | |
17 | self.reset_session() | |
18 | ||
19 | def _validate_jwt_token(self, token, username, permissions): | |
20 | payload = jwt.decode(token, verify=False) | |
21 | self.assertIn('username', payload) | |
22 | self.assertEqual(payload['username'], username) | |
23 | ||
24 | for scope, perms in permissions.items(): | |
25 | self.assertIsNotNone(scope) | |
26 | self.assertIn('read', perms) | |
27 | self.assertIn('update', perms) | |
28 | self.assertIn('create', perms) | |
29 | self.assertIn('delete', perms) | |
30 | ||
31 | def test_a_set_login_credentials(self): | |
32 | self.create_user('admin2', 'admin2', ['administrator']) | |
33 | self._post("/api/auth", {'username': 'admin2', 'password': 'admin2'}) | |
34 | self.assertStatus(201) | |
35 | data = self.jsonBody() | |
36 | self._validate_jwt_token(data['token'], "admin2", data['permissions']) | |
37 | self.delete_user('admin2') | |
38 | ||
39 | def test_login_valid(self): | |
40 | self._post("/api/auth", {'username': 'admin', 'password': 'admin'}) | |
41 | self.assertStatus(201) | |
42 | data = self.jsonBody() | |
43 | self._validate_jwt_token(data['token'], "admin", data['permissions']) | |
44 | ||
45 | def test_login_invalid(self): | |
46 | self._post("/api/auth", {'username': 'admin', 'password': 'inval'}) | |
47 | self.assertStatus(400) | |
48 | self.assertJsonBody({ | |
49 | "component": "auth", | |
50 | "code": "invalid_credentials", | |
51 | "detail": "Invalid credentials" | |
52 | }) | |
53 | ||
54 | def test_login_without_password(self): | |
55 | self.create_user('admin2', '', ['administrator']) | |
56 | self._post("/api/auth", {'username': 'admin2', 'password': ''}) | |
57 | self.assertStatus(400) | |
58 | self.assertJsonBody({ | |
59 | "component": "auth", | |
60 | "code": "invalid_credentials", | |
61 | "detail": "Invalid credentials" | |
62 | }) | |
63 | self.delete_user('admin2') | |
64 | ||
65 | def test_logout(self): | |
66 | self._post("/api/auth", {'username': 'admin', 'password': 'admin'}) | |
67 | self.assertStatus(201) | |
68 | data = self.jsonBody() | |
69 | self._validate_jwt_token(data['token'], "admin", data['permissions']) | |
70 | self.set_jwt_token(data['token']) | |
71 | self._post("/api/auth/logout") | |
72 | self.assertStatus(200) | |
73 | self.assertJsonBody({ | |
74 | "redirect_url": "#/login" | |
75 | }) | |
76 | self._get("/api/host") | |
77 | self.assertStatus(401) | |
78 | self.set_jwt_token(None) | |
79 | ||
80 | def test_token_ttl(self): | |
81 | self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '5']) | |
82 | self._post("/api/auth", {'username': 'admin', 'password': 'admin'}) | |
83 | self.assertStatus(201) | |
84 | self.set_jwt_token(self.jsonBody()['token']) | |
85 | self._get("/api/host") | |
86 | self.assertStatus(200) | |
87 | time.sleep(6) | |
88 | self._get("/api/host") | |
89 | self.assertStatus(401) | |
90 | self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '28800']) | |
91 | self.set_jwt_token(None) | |
92 | ||
93 | def test_remove_from_blacklist(self): | |
94 | self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '5']) | |
95 | self._post("/api/auth", {'username': 'admin', 'password': 'admin'}) | |
96 | self.assertStatus(201) | |
97 | self.set_jwt_token(self.jsonBody()['token']) | |
98 | # the following call adds the token to the blacklist | |
99 | self._post("/api/auth/logout") | |
100 | self.assertStatus(200) | |
101 | self._get("/api/host") | |
102 | self.assertStatus(401) | |
103 | time.sleep(6) | |
104 | self._ceph_cmd(['dashboard', 'set-jwt-token-ttl', '28800']) | |
105 | self.set_jwt_token(None) | |
106 | self._post("/api/auth", {'username': 'admin', 'password': 'admin'}) | |
107 | self.assertStatus(201) | |
108 | self.set_jwt_token(self.jsonBody()['token']) | |
109 | # the following call removes expired tokens from the blacklist | |
110 | self._post("/api/auth/logout") | |
111 | self.assertStatus(200) | |
112 | ||
113 | def test_unauthorized(self): | |
114 | self._get("/api/host") | |
115 | self.assertStatus(401) | |
116 | ||
117 | def test_invalidate_token_by_admin(self): | |
118 | self._get("/api/host") | |
119 | self.assertStatus(401) | |
120 | self.create_user('user', 'user', ['read-only']) | |
121 | time.sleep(1) | |
122 | self._post("/api/auth", {'username': 'user', 'password': 'user'}) | |
123 | self.assertStatus(201) | |
124 | self.set_jwt_token(self.jsonBody()['token']) | |
125 | self._get("/api/host") | |
126 | self.assertStatus(200) | |
127 | time.sleep(1) | |
128 | self._ceph_cmd(['dashboard', 'ac-user-set-password', 'user', 'user2']) | |
129 | time.sleep(1) | |
130 | self._get("/api/host") | |
131 | self.assertStatus(401) | |
132 | self.set_jwt_token(None) | |
133 | self._post("/api/auth", {'username': 'user', 'password': 'user2'}) | |
134 | self.assertStatus(201) | |
135 | self.set_jwt_token(self.jsonBody()['token']) | |
136 | self._get("/api/host") | |
137 | self.assertStatus(200) | |
138 | self.delete_user("user") |