]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | # -*- coding: utf-8 -*- |
2 | # pylint: disable=dangerous-default-value,too-many-public-methods | |
11fdf7f2 TL |
3 | |
4 | import errno | |
5 | import json | |
20effc67 | 6 | import tempfile |
11fdf7f2 TL |
7 | import time |
8 | import unittest | |
9f95a23c TL |
9 | from datetime import datetime, timedelta |
10 | ||
cd265ab1 TL |
11 | from mgr_module import ERROR_MSG_EMPTY_INPUT_FILE |
12 | ||
11fdf7f2 | 13 | from .. import mgr |
f67539c2 TL |
14 | from ..security import Permission, Scope |
15 | from ..services.access_control import SYSTEM_ROLES, AccessControlDB, \ | |
16 | PasswordPolicy, load_access_control_db, password_hash | |
9f95a23c | 17 | from ..settings import Settings |
a4b75251 | 18 | from ..tests import CLICommandTestMixin, CmdException |
11fdf7f2 TL |
19 | |
20 | ||
21 | class AccessControlTest(unittest.TestCase, CLICommandTestMixin): | |
22 | ||
23 | @classmethod | |
24 | def setUpClass(cls): | |
25 | cls.mock_kv_store() | |
26 | mgr.ACCESS_CONTROL_DB = None | |
27 | ||
28 | def setUp(self): | |
29 | self.CONFIG_KEY_DICT.clear() | |
30 | load_access_control_db() | |
31 | ||
32 | def load_persistent_db(self): | |
33 | config_key = AccessControlDB.accessdb_config_key() | |
34 | self.assertIn(config_key, self.CONFIG_KEY_DICT) | |
35 | db_json = self.CONFIG_KEY_DICT[config_key] | |
36 | db = json.loads(db_json) | |
37 | return db | |
38 | ||
39 | # The DB is written to persistent storage the first time it is saved. | |
40 | # However, should an operation fail due to <reasons>, we may end up in | |
41 | # a state where we have a completely empty CONFIG_KEY_DICT (our mock | |
42 | # equivalent to the persistent state). While this works for most of the | |
43 | # tests in this class, that would prevent us from testing things like | |
44 | # "run a command that is expected to fail, and then ensure nothing | |
45 | # happened", because we'd be asserting in `load_persistent_db()` due to | |
46 | # the map being empty. | |
47 | # | |
48 | # This function will therefore force state to be written to our mock | |
49 | # persistent state. We could have added this extra step to | |
50 | # `load_persistent_db()` directly, but that would conflict with the | |
51 | # upgrade tests. This way, we can selectively enforce this requirement | |
52 | # where we believe it to be necessary; generically speaking, this should | |
53 | # not be needed unless we're testing very specific behaviors. | |
54 | # | |
55 | def setup_and_load_persistent_db(self): | |
56 | mgr.ACCESS_CTRL_DB.save() | |
57 | self.load_persistent_db() | |
58 | ||
59 | def validate_persistent_role(self, rolename, scopes_permissions, | |
60 | description=None): | |
61 | db = self.load_persistent_db() | |
62 | self.assertIn('roles', db) | |
63 | self.assertIn(rolename, db['roles']) | |
64 | self.assertEqual(db['roles'][rolename]['name'], rolename) | |
65 | self.assertEqual(db['roles'][rolename]['description'], description) | |
66 | self.assertDictEqual(db['roles'][rolename]['scopes_permissions'], | |
67 | scopes_permissions) | |
68 | ||
69 | def validate_persistent_no_role(self, rolename): | |
70 | db = self.load_persistent_db() | |
71 | self.assertIn('roles', db) | |
72 | self.assertNotIn(rolename, db['roles']) | |
73 | ||
74 | def validate_persistent_user(self, username, roles, password=None, | |
9f95a23c TL |
75 | name=None, email=None, last_update=None, |
76 | enabled=True, pwdExpirationDate=None): | |
11fdf7f2 TL |
77 | db = self.load_persistent_db() |
78 | self.assertIn('users', db) | |
79 | self.assertIn(username, db['users']) | |
80 | self.assertEqual(db['users'][username]['username'], username) | |
81 | self.assertListEqual(db['users'][username]['roles'], roles) | |
82 | if password: | |
83 | self.assertEqual(db['users'][username]['password'], password) | |
84 | if name: | |
85 | self.assertEqual(db['users'][username]['name'], name) | |
86 | if email: | |
87 | self.assertEqual(db['users'][username]['email'], email) | |
9f95a23c TL |
88 | if last_update: |
89 | self.assertEqual(db['users'][username]['lastUpdate'], last_update) | |
90 | if pwdExpirationDate: | |
91 | self.assertEqual(db['users'][username]['pwdExpirationDate'], pwdExpirationDate) | |
92 | self.assertEqual(db['users'][username]['enabled'], enabled) | |
11fdf7f2 TL |
93 | |
94 | def validate_persistent_no_user(self, username): | |
95 | db = self.load_persistent_db() | |
96 | self.assertIn('users', db) | |
97 | self.assertNotIn(username, db['users']) | |
98 | ||
99 | def test_create_role(self): | |
100 | role = self.exec_cmd('ac-role-create', rolename='test_role') | |
101 | self.assertDictEqual(role, {'name': 'test_role', 'description': None, | |
102 | 'scopes_permissions': {}}) | |
103 | self.validate_persistent_role('test_role', {}) | |
104 | ||
105 | def test_create_role_with_desc(self): | |
106 | role = self.exec_cmd('ac-role-create', rolename='test_role', | |
107 | description='Test Role') | |
108 | self.assertDictEqual(role, {'name': 'test_role', | |
109 | 'description': 'Test Role', | |
110 | 'scopes_permissions': {}}) | |
111 | self.validate_persistent_role('test_role', {}, 'Test Role') | |
112 | ||
113 | def test_create_duplicate_role(self): | |
114 | self.test_create_role() | |
115 | ||
116 | with self.assertRaises(CmdException) as ctx: | |
117 | self.exec_cmd('ac-role-create', rolename='test_role') | |
118 | ||
119 | self.assertEqual(ctx.exception.retcode, -errno.EEXIST) | |
120 | self.assertEqual(str(ctx.exception), "Role 'test_role' already exists") | |
121 | ||
122 | def test_delete_role(self): | |
123 | self.test_create_role() | |
124 | out = self.exec_cmd('ac-role-delete', rolename='test_role') | |
125 | self.assertEqual(out, "Role 'test_role' deleted") | |
126 | self.validate_persistent_no_role('test_role') | |
127 | ||
128 | def test_delete_nonexistent_role(self): | |
129 | with self.assertRaises(CmdException) as ctx: | |
130 | self.exec_cmd('ac-role-delete', rolename='test_role') | |
131 | ||
132 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
133 | self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist") | |
134 | ||
135 | def test_show_single_role(self): | |
136 | self.test_create_role() | |
137 | role = self.exec_cmd('ac-role-show', rolename='test_role') | |
138 | self.assertDictEqual(role, {'name': 'test_role', 'description': None, | |
139 | 'scopes_permissions': {}}) | |
140 | ||
141 | def test_show_nonexistent_role(self): | |
142 | with self.assertRaises(CmdException) as ctx: | |
143 | self.exec_cmd('ac-role-show', rolename='test_role') | |
144 | ||
145 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
146 | self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist") | |
147 | ||
148 | def test_show_system_roles(self): | |
149 | roles = self.exec_cmd('ac-role-show') | |
150 | self.assertEqual(len(roles), len(SYSTEM_ROLES)) | |
151 | for role in roles: | |
152 | self.assertIn(role, SYSTEM_ROLES) | |
153 | ||
154 | def test_show_system_role(self): | |
155 | role = self.exec_cmd('ac-role-show', rolename="read-only") | |
156 | self.assertEqual(role['name'], 'read-only') | |
f67539c2 TL |
157 | self.assertEqual( |
158 | role['description'], | |
159 | 'allows read permission for all security scope except dashboard settings and config-opt' | |
160 | ) | |
11fdf7f2 TL |
161 | |
162 | def test_delete_system_role(self): | |
163 | with self.assertRaises(CmdException) as ctx: | |
164 | self.exec_cmd('ac-role-delete', rolename='administrator') | |
165 | ||
166 | self.assertEqual(ctx.exception.retcode, -errno.EPERM) | |
167 | self.assertEqual(str(ctx.exception), | |
168 | "Cannot delete system role 'administrator'") | |
169 | ||
170 | def test_add_role_scope_perms(self): | |
171 | self.test_create_role() | |
172 | self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', | |
173 | scopename=Scope.POOL, | |
174 | permissions=[Permission.READ, Permission.DELETE]) | |
175 | role = self.exec_cmd('ac-role-show', rolename='test_role') | |
176 | self.assertDictEqual(role, {'name': 'test_role', | |
177 | 'description': None, | |
178 | 'scopes_permissions': { | |
179 | Scope.POOL: [Permission.DELETE, | |
180 | Permission.READ] | |
181 | }}) | |
182 | self.validate_persistent_role('test_role', { | |
183 | Scope.POOL: [Permission.DELETE, Permission.READ] | |
184 | }) | |
185 | ||
186 | def test_del_role_scope_perms(self): | |
187 | self.test_add_role_scope_perms() | |
188 | self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', | |
189 | scopename=Scope.MONITOR, | |
190 | permissions=[Permission.READ, Permission.CREATE]) | |
191 | self.validate_persistent_role('test_role', { | |
192 | Scope.POOL: [Permission.DELETE, Permission.READ], | |
193 | Scope.MONITOR: [Permission.CREATE, Permission.READ] | |
194 | }) | |
195 | self.exec_cmd('ac-role-del-scope-perms', rolename='test_role', | |
196 | scopename=Scope.POOL) | |
197 | role = self.exec_cmd('ac-role-show', rolename='test_role') | |
198 | self.assertDictEqual(role, {'name': 'test_role', | |
199 | 'description': None, | |
200 | 'scopes_permissions': { | |
201 | Scope.MONITOR: [Permission.CREATE, | |
202 | Permission.READ] | |
203 | }}) | |
204 | self.validate_persistent_role('test_role', { | |
205 | Scope.MONITOR: [Permission.CREATE, Permission.READ] | |
206 | }) | |
207 | ||
208 | def test_add_role_scope_perms_nonexistent_role(self): | |
209 | ||
210 | with self.assertRaises(CmdException) as ctx: | |
211 | self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', | |
212 | scopename='pool', | |
213 | permissions=['read', 'delete']) | |
214 | ||
215 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
216 | self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist") | |
217 | ||
218 | def test_add_role_invalid_scope_perms(self): | |
219 | self.test_create_role() | |
220 | ||
221 | with self.assertRaises(CmdException) as ctx: | |
222 | self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', | |
223 | scopename='invalidscope', | |
224 | permissions=['read', 'delete']) | |
225 | ||
226 | self.assertEqual(ctx.exception.retcode, -errno.EINVAL) | |
227 | self.assertEqual(str(ctx.exception), | |
228 | "Scope 'invalidscope' is not valid\n Possible values: " | |
229 | "{}".format(Scope.all_scopes())) | |
230 | ||
231 | def test_add_role_scope_invalid_perms(self): | |
232 | self.test_create_role() | |
233 | ||
234 | with self.assertRaises(CmdException) as ctx: | |
235 | self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', | |
236 | scopename='pool', permissions=['invalidperm']) | |
237 | ||
238 | self.assertEqual(ctx.exception.retcode, -errno.EINVAL) | |
239 | self.assertEqual(str(ctx.exception), | |
240 | "Permission 'invalidperm' is not valid\n Possible " | |
241 | "values: {}".format(Permission.all_permissions())) | |
242 | ||
243 | def test_del_role_scope_perms_nonexistent_role(self): | |
244 | ||
245 | with self.assertRaises(CmdException) as ctx: | |
246 | self.exec_cmd('ac-role-del-scope-perms', rolename='test_role', | |
247 | scopename='pool') | |
248 | ||
249 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
250 | self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist") | |
251 | ||
252 | def test_del_role_nonexistent_scope_perms(self): | |
253 | self.test_add_role_scope_perms() | |
254 | ||
255 | with self.assertRaises(CmdException) as ctx: | |
256 | self.exec_cmd('ac-role-del-scope-perms', rolename='test_role', | |
257 | scopename='nonexistentscope') | |
258 | ||
259 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
260 | self.assertEqual(str(ctx.exception), | |
261 | "There are no permissions for scope 'nonexistentscope' " | |
262 | "in role 'test_role'") | |
263 | ||
264 | def test_not_permitted_add_role_scope_perms(self): | |
265 | with self.assertRaises(CmdException) as ctx: | |
266 | self.exec_cmd('ac-role-add-scope-perms', rolename='read-only', | |
267 | scopename='pool', permissions=['read', 'delete']) | |
268 | ||
269 | self.assertEqual(ctx.exception.retcode, -errno.EPERM) | |
270 | self.assertEqual(str(ctx.exception), | |
271 | "Cannot update system role 'read-only'") | |
272 | ||
273 | def test_not_permitted_del_role_scope_perms(self): | |
274 | with self.assertRaises(CmdException) as ctx: | |
275 | self.exec_cmd('ac-role-del-scope-perms', rolename='read-only', | |
276 | scopename='pool') | |
277 | ||
278 | self.assertEqual(ctx.exception.retcode, -errno.EPERM) | |
279 | self.assertEqual(str(ctx.exception), | |
280 | "Cannot update system role 'read-only'") | |
281 | ||
9f95a23c TL |
282 | def test_create_user(self, username='admin', rolename=None, enabled=True, |
283 | pwdExpirationDate=None): | |
11fdf7f2 | 284 | user = self.exec_cmd('ac-user-create', username=username, |
cd265ab1 | 285 | rolename=rolename, inbuf='admin', |
11fdf7f2 | 286 | name='{} User'.format(username), |
9f95a23c TL |
287 | email='{}@user.com'.format(username), |
288 | enabled=enabled, force_password=True, | |
289 | pwd_expiration_date=pwdExpirationDate) | |
11fdf7f2 TL |
290 | |
291 | pass_hash = password_hash('admin', user['password']) | |
292 | self.assertDictEqual(user, { | |
293 | 'username': username, | |
294 | 'password': pass_hash, | |
9f95a23c TL |
295 | 'pwdExpirationDate': pwdExpirationDate, |
296 | 'pwdUpdateRequired': False, | |
11fdf7f2 TL |
297 | 'lastUpdate': user['lastUpdate'], |
298 | 'name': '{} User'.format(username), | |
299 | 'email': '{}@user.com'.format(username), | |
9f95a23c TL |
300 | 'roles': [rolename] if rolename else [], |
301 | 'enabled': enabled | |
11fdf7f2 TL |
302 | }) |
303 | self.validate_persistent_user(username, [rolename] if rolename else [], | |
304 | pass_hash, '{} User'.format(username), | |
305 | '{}@user.com'.format(username), | |
9f95a23c | 306 | user['lastUpdate'], enabled) |
11fdf7f2 TL |
307 | return user |
308 | ||
9f95a23c TL |
309 | def test_create_disabled_user(self): |
310 | self.test_create_user(enabled=False) | |
311 | ||
312 | def test_create_user_pwd_expiration_date(self): | |
313 | expiration_date = datetime.utcnow() + timedelta(days=10) | |
314 | expiration_date = int(time.mktime(expiration_date.timetuple())) | |
315 | self.test_create_user(pwdExpirationDate=expiration_date) | |
316 | ||
11fdf7f2 TL |
317 | def test_create_user_with_role(self): |
318 | self.test_add_role_scope_perms() | |
319 | self.test_create_user(rolename='test_role') | |
320 | ||
321 | def test_create_user_with_system_role(self): | |
322 | self.test_create_user(rolename='administrator') | |
323 | ||
324 | def test_delete_user(self): | |
325 | self.test_create_user() | |
326 | out = self.exec_cmd('ac-user-delete', username='admin') | |
327 | self.assertEqual(out, "User 'admin' deleted") | |
328 | users = self.exec_cmd('ac-user-show') | |
329 | self.assertEqual(len(users), 0) | |
330 | self.validate_persistent_no_user('admin') | |
331 | ||
332 | def test_create_duplicate_user(self): | |
333 | self.test_create_user() | |
cd265ab1 | 334 | ret = self.exec_cmd('ac-user-create', username='admin', inbuf='admin', |
801d1391 TL |
335 | force_password=True) |
336 | self.assertEqual(ret, "User 'admin' already exists") | |
11fdf7f2 TL |
337 | |
338 | def test_create_users_with_dne_role(self): | |
339 | # one time call to setup our persistent db | |
340 | self.setup_and_load_persistent_db() | |
341 | ||
342 | # create a user with a role that does not exist; expect a failure | |
343 | try: | |
344 | self.exec_cmd('ac-user-create', username='foo', | |
cd265ab1 | 345 | rolename='dne_role', inbuf='foopass', |
9f95a23c TL |
346 | name='foo User', email='foo@user.com', |
347 | force_password=True) | |
11fdf7f2 TL |
348 | except CmdException as e: |
349 | self.assertEqual(e.retcode, -errno.ENOENT) | |
350 | ||
351 | db = self.load_persistent_db() | |
352 | if 'users' in db: | |
353 | self.assertNotIn('foo', db['users']) | |
354 | ||
355 | # We could just finish our test here, given we ensured that the user | |
356 | # with a non-existent role is not in persistent storage. However, | |
357 | # we're going to test the database's consistency, making sure that | |
358 | # side-effects are not written to persistent storage once we commit | |
359 | # an unrelated operation. To ensure this, we'll issue another | |
360 | # operation that is sharing the same code path, and will check whether | |
361 | # the next operation commits dirty state. | |
362 | ||
363 | # create a role (this will be 'test_role') | |
364 | self.test_create_role() | |
365 | self.exec_cmd('ac-user-create', username='bar', | |
cd265ab1 | 366 | rolename='test_role', inbuf='barpass', |
9f95a23c TL |
367 | name='bar User', email='bar@user.com', |
368 | force_password=True) | |
11fdf7f2 TL |
369 | |
370 | # validate db: | |
371 | # user 'foo' should not exist | |
372 | # user 'bar' should exist and have role 'test_role' | |
373 | self.validate_persistent_user('bar', ['test_role']) | |
374 | ||
375 | db = self.load_persistent_db() | |
376 | self.assertIn('users', db) | |
377 | self.assertNotIn('foo', db['users']) | |
378 | ||
379 | def test_delete_nonexistent_user(self): | |
380 | with self.assertRaises(CmdException) as ctx: | |
381 | self.exec_cmd('ac-user-delete', username='admin') | |
382 | ||
383 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
384 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
385 | ||
386 | def test_add_user_roles(self, username='admin', | |
387 | roles=['pool-manager', 'block-manager']): | |
388 | user_orig = self.test_create_user(username) | |
389 | uroles = [] | |
390 | for role in roles: | |
391 | uroles.append(role) | |
392 | uroles.sort() | |
393 | user = self.exec_cmd('ac-user-add-roles', username=username, | |
394 | roles=[role]) | |
f67539c2 | 395 | self.assertLessEqual(uroles, user['roles']) |
11fdf7f2 TL |
396 | self.validate_persistent_user(username, uroles) |
397 | self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) | |
398 | ||
399 | def test_add_user_roles2(self): | |
400 | user_orig = self.test_create_user() | |
401 | user = self.exec_cmd('ac-user-add-roles', username="admin", | |
402 | roles=['pool-manager', 'block-manager']) | |
f67539c2 TL |
403 | self.assertLessEqual(['block-manager', 'pool-manager'], |
404 | user['roles']) | |
11fdf7f2 TL |
405 | self.validate_persistent_user('admin', ['block-manager', |
406 | 'pool-manager']) | |
407 | self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) | |
408 | ||
409 | def test_add_user_roles_not_existent_user(self): | |
410 | with self.assertRaises(CmdException) as ctx: | |
411 | self.exec_cmd('ac-user-add-roles', username="admin", | |
412 | roles=['pool-manager', 'block-manager']) | |
413 | ||
414 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
415 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
416 | ||
417 | def test_add_user_roles_not_existent_role(self): | |
418 | self.test_create_user() | |
419 | with self.assertRaises(CmdException) as ctx: | |
420 | self.exec_cmd('ac-user-add-roles', username="admin", | |
421 | roles=['Invalid Role']) | |
422 | ||
423 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
424 | self.assertEqual(str(ctx.exception), | |
425 | "Role 'Invalid Role' does not exist") | |
426 | ||
427 | def test_set_user_roles(self): | |
428 | user_orig = self.test_create_user() | |
429 | user = self.exec_cmd('ac-user-add-roles', username="admin", | |
430 | roles=['pool-manager']) | |
f67539c2 | 431 | self.assertLessEqual(['pool-manager'], user['roles']) |
11fdf7f2 TL |
432 | self.validate_persistent_user('admin', ['pool-manager']) |
433 | self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) | |
434 | user2 = self.exec_cmd('ac-user-set-roles', username="admin", | |
435 | roles=['rgw-manager', 'block-manager']) | |
f67539c2 TL |
436 | self.assertLessEqual(['block-manager', 'rgw-manager'], |
437 | user2['roles']) | |
11fdf7f2 TL |
438 | self.validate_persistent_user('admin', ['block-manager', |
439 | 'rgw-manager']) | |
440 | self.assertGreaterEqual(user2['lastUpdate'], user['lastUpdate']) | |
441 | ||
442 | def test_set_user_roles_not_existent_user(self): | |
443 | with self.assertRaises(CmdException) as ctx: | |
444 | self.exec_cmd('ac-user-set-roles', username="admin", | |
445 | roles=['pool-manager', 'block-manager']) | |
446 | ||
447 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
448 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
449 | ||
450 | def test_set_user_roles_not_existent_role(self): | |
451 | self.test_create_user() | |
452 | with self.assertRaises(CmdException) as ctx: | |
453 | self.exec_cmd('ac-user-set-roles', username="admin", | |
454 | roles=['Invalid Role']) | |
455 | ||
456 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
457 | self.assertEqual(str(ctx.exception), | |
458 | "Role 'Invalid Role' does not exist") | |
459 | ||
460 | def test_del_user_roles(self): | |
461 | self.test_add_user_roles() | |
462 | user = self.exec_cmd('ac-user-del-roles', username="admin", | |
463 | roles=['pool-manager']) | |
f67539c2 | 464 | self.assertLessEqual(['block-manager'], user['roles']) |
11fdf7f2 TL |
465 | self.validate_persistent_user('admin', ['block-manager']) |
466 | ||
467 | def test_del_user_roles_not_existent_user(self): | |
468 | with self.assertRaises(CmdException) as ctx: | |
469 | self.exec_cmd('ac-user-del-roles', username="admin", | |
470 | roles=['pool-manager', 'block-manager']) | |
471 | ||
472 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
473 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
474 | ||
475 | def test_del_user_roles_not_existent_role(self): | |
476 | self.test_create_user() | |
477 | with self.assertRaises(CmdException) as ctx: | |
478 | self.exec_cmd('ac-user-del-roles', username="admin", | |
479 | roles=['Invalid Role']) | |
480 | ||
481 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
482 | self.assertEqual(str(ctx.exception), | |
483 | "Role 'Invalid Role' does not exist") | |
484 | ||
485 | def test_del_user_roles_not_associated_role(self): | |
486 | self.test_create_user() | |
487 | with self.assertRaises(CmdException) as ctx: | |
488 | self.exec_cmd('ac-user-del-roles', username="admin", | |
489 | roles=['rgw-manager']) | |
490 | ||
491 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
492 | self.assertEqual(str(ctx.exception), | |
493 | "Role 'rgw-manager' is not associated with user " | |
494 | "'admin'") | |
495 | ||
496 | def test_show_user(self): | |
497 | self.test_add_user_roles() | |
498 | user = self.exec_cmd('ac-user-show', username='admin') | |
499 | pass_hash = password_hash('admin', user['password']) | |
500 | self.assertDictEqual(user, { | |
501 | 'username': 'admin', | |
502 | 'lastUpdate': user['lastUpdate'], | |
503 | 'password': pass_hash, | |
9f95a23c TL |
504 | 'pwdExpirationDate': None, |
505 | 'pwdUpdateRequired': False, | |
11fdf7f2 TL |
506 | 'name': 'admin User', |
507 | 'email': 'admin@user.com', | |
9f95a23c TL |
508 | 'roles': ['block-manager', 'pool-manager'], |
509 | 'enabled': True | |
11fdf7f2 TL |
510 | }) |
511 | ||
512 | def test_show_nonexistent_user(self): | |
513 | with self.assertRaises(CmdException) as ctx: | |
514 | self.exec_cmd('ac-user-show', username='admin') | |
515 | ||
516 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
517 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
518 | ||
519 | def test_show_all_users(self): | |
520 | self.test_add_user_roles('admin', ['administrator']) | |
521 | self.test_add_user_roles('guest', ['read-only']) | |
522 | users = self.exec_cmd('ac-user-show') | |
523 | self.assertEqual(len(users), 2) | |
524 | for user in users: | |
525 | self.assertIn(user, ['admin', 'guest']) | |
526 | ||
527 | def test_del_role_associated_with_user(self): | |
528 | self.test_create_role() | |
529 | self.test_add_user_roles('guest', ['test_role']) | |
530 | ||
531 | with self.assertRaises(CmdException) as ctx: | |
532 | self.exec_cmd('ac-role-delete', rolename='test_role') | |
533 | ||
534 | self.assertEqual(ctx.exception.retcode, -errno.EPERM) | |
535 | self.assertEqual(str(ctx.exception), | |
536 | "Role 'test_role' is still associated with user " | |
537 | "'guest'") | |
538 | ||
539 | def test_set_user_info(self): | |
540 | user_orig = self.test_create_user() | |
541 | user = self.exec_cmd('ac-user-set-info', username='admin', | |
542 | name='Admin Name', email='admin@admin.com') | |
543 | pass_hash = password_hash('admin', user['password']) | |
544 | self.assertDictEqual(user, { | |
545 | 'username': 'admin', | |
546 | 'password': pass_hash, | |
9f95a23c TL |
547 | 'pwdExpirationDate': None, |
548 | 'pwdUpdateRequired': False, | |
11fdf7f2 TL |
549 | 'name': 'Admin Name', |
550 | 'email': 'admin@admin.com', | |
551 | 'lastUpdate': user['lastUpdate'], | |
9f95a23c TL |
552 | 'roles': [], |
553 | 'enabled': True | |
11fdf7f2 TL |
554 | }) |
555 | self.validate_persistent_user('admin', [], pass_hash, 'Admin Name', | |
556 | 'admin@admin.com') | |
557 | self.assertEqual(user['lastUpdate'], user_orig['lastUpdate']) | |
558 | ||
559 | def test_set_user_info_nonexistent_user(self): | |
560 | with self.assertRaises(CmdException) as ctx: | |
561 | self.exec_cmd('ac-user-set-info', username='admin', | |
562 | name='Admin Name', email='admin@admin.com') | |
563 | ||
564 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
565 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
566 | ||
567 | def test_set_user_password(self): | |
568 | user_orig = self.test_create_user() | |
569 | user = self.exec_cmd('ac-user-set-password', username='admin', | |
cd265ab1 | 570 | inbuf='newpass', force_password=True) |
11fdf7f2 TL |
571 | pass_hash = password_hash('newpass', user['password']) |
572 | self.assertDictEqual(user, { | |
573 | 'username': 'admin', | |
574 | 'password': pass_hash, | |
9f95a23c TL |
575 | 'pwdExpirationDate': None, |
576 | 'pwdUpdateRequired': False, | |
11fdf7f2 TL |
577 | 'name': 'admin User', |
578 | 'email': 'admin@user.com', | |
579 | 'lastUpdate': user['lastUpdate'], | |
9f95a23c TL |
580 | 'roles': [], |
581 | 'enabled': True | |
11fdf7f2 TL |
582 | }) |
583 | self.validate_persistent_user('admin', [], pass_hash, 'admin User', | |
584 | 'admin@user.com') | |
585 | self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) | |
586 | ||
cd265ab1 TL |
587 | def test_sanitize_password(self): |
588 | self.test_create_user() | |
589 | password = 'myPass\\n\\r\\n' | |
20effc67 | 590 | with tempfile.TemporaryFile(mode='w+') as pwd_file: |
cd265ab1 TL |
591 | # Add new line separators (like some text editors when a file is saved). |
592 | pwd_file.write('{}{}'.format(password, '\n\r\n\n')) | |
593 | pwd_file.seek(0) | |
594 | user = self.exec_cmd('ac-user-set-password', username='admin', | |
595 | inbuf=pwd_file.read(), force_password=True) | |
596 | pass_hash = password_hash(password, user['password']) | |
597 | self.assertEqual(user['password'], pass_hash) | |
598 | ||
11fdf7f2 TL |
599 | def test_set_user_password_nonexistent_user(self): |
600 | with self.assertRaises(CmdException) as ctx: | |
601 | self.exec_cmd('ac-user-set-password', username='admin', | |
cd265ab1 | 602 | inbuf='newpass', force_password=True) |
9f95a23c TL |
603 | |
604 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
605 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
606 | ||
cd265ab1 TL |
607 | def test_set_user_password_empty(self): |
608 | with self.assertRaises(CmdException) as ctx: | |
609 | self.exec_cmd('ac-user-set-password', username='admin', inbuf='\n', | |
610 | force_password=True) | |
611 | ||
612 | self.assertEqual(ctx.exception.retcode, -errno.EINVAL) | |
522d829b | 613 | self.assertIn(ERROR_MSG_EMPTY_INPUT_FILE, str(ctx.exception)) |
cd265ab1 | 614 | |
9f95a23c TL |
615 | def test_set_user_password_hash(self): |
616 | user_orig = self.test_create_user() | |
617 | user = self.exec_cmd('ac-user-set-password-hash', username='admin', | |
cd265ab1 | 618 | inbuf='$2b$12$Pt3Vq/rDt2y9glTPSV.VFegiLkQeIpddtkhoFetNApYmIJOY8gau2') |
9f95a23c TL |
619 | pass_hash = password_hash('newpass', user['password']) |
620 | self.assertDictEqual(user, { | |
621 | 'username': 'admin', | |
622 | 'password': pass_hash, | |
623 | 'pwdExpirationDate': None, | |
624 | 'pwdUpdateRequired': False, | |
625 | 'name': 'admin User', | |
626 | 'email': 'admin@user.com', | |
627 | 'lastUpdate': user['lastUpdate'], | |
628 | 'roles': [], | |
629 | 'enabled': True | |
630 | }) | |
631 | self.validate_persistent_user('admin', [], pass_hash, 'admin User', | |
632 | 'admin@user.com') | |
633 | self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) | |
634 | ||
635 | def test_set_user_password_hash_nonexistent_user(self): | |
636 | with self.assertRaises(CmdException) as ctx: | |
637 | self.exec_cmd('ac-user-set-password-hash', username='admin', | |
cd265ab1 | 638 | inbuf='$2b$12$Pt3Vq/rDt2y9glTPSV.VFegiLkQeIpddtkhoFetNApYmIJOY8gau2') |
11fdf7f2 TL |
639 | |
640 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
641 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
642 | ||
9f95a23c TL |
643 | def test_set_user_password_hash_broken_hash(self): |
644 | self.test_create_user() | |
645 | with self.assertRaises(CmdException) as ctx: | |
646 | self.exec_cmd('ac-user-set-password-hash', username='admin', | |
cd265ab1 | 647 | inbuf='1') |
9f95a23c TL |
648 | |
649 | self.assertEqual(ctx.exception.retcode, -errno.EINVAL) | |
650 | self.assertEqual(str(ctx.exception), 'Invalid password hash') | |
651 | ||
11fdf7f2 TL |
652 | def test_set_login_credentials(self): |
653 | self.exec_cmd('set-login-credentials', username='admin', | |
cd265ab1 | 654 | inbuf='admin') |
11fdf7f2 TL |
655 | user = self.exec_cmd('ac-user-show', username='admin') |
656 | pass_hash = password_hash('admin', user['password']) | |
657 | self.assertDictEqual(user, { | |
658 | 'username': 'admin', | |
659 | 'password': pass_hash, | |
9f95a23c TL |
660 | 'pwdExpirationDate': None, |
661 | 'pwdUpdateRequired': False, | |
11fdf7f2 TL |
662 | 'name': None, |
663 | 'email': None, | |
664 | 'lastUpdate': user['lastUpdate'], | |
9f95a23c TL |
665 | 'roles': ['administrator'], |
666 | 'enabled': True, | |
11fdf7f2 TL |
667 | }) |
668 | self.validate_persistent_user('admin', ['administrator'], pass_hash, | |
669 | None, None) | |
670 | ||
671 | def test_set_login_credentials_for_existing_user(self): | |
672 | self.test_add_user_roles('admin', ['read-only']) | |
673 | self.exec_cmd('set-login-credentials', username='admin', | |
cd265ab1 | 674 | inbuf='admin2') |
11fdf7f2 TL |
675 | user = self.exec_cmd('ac-user-show', username='admin') |
676 | pass_hash = password_hash('admin2', user['password']) | |
677 | self.assertDictEqual(user, { | |
678 | 'username': 'admin', | |
679 | 'password': pass_hash, | |
9f95a23c TL |
680 | 'pwdExpirationDate': None, |
681 | 'pwdUpdateRequired': False, | |
11fdf7f2 TL |
682 | 'name': 'admin User', |
683 | 'email': 'admin@user.com', | |
684 | 'lastUpdate': user['lastUpdate'], | |
9f95a23c TL |
685 | 'roles': ['read-only'], |
686 | 'enabled': True | |
11fdf7f2 TL |
687 | }) |
688 | self.validate_persistent_user('admin', ['read-only'], pass_hash, | |
689 | 'admin User', 'admin@user.com') | |
690 | ||
b3b6e05e TL |
691 | def test_load_v1(self): |
692 | self.CONFIG_KEY_DICT['accessdb_v1'] = ''' | |
693 | {{ | |
694 | "users": {{ | |
695 | "admin": {{ | |
696 | "username": "admin", | |
697 | "password": | |
698 | "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", | |
699 | "roles": ["block-manager", "test_role"], | |
700 | "name": "admin User", | |
701 | "email": "admin@user.com", | |
702 | "lastUpdate": {} | |
703 | }} | |
704 | }}, | |
705 | "roles": {{ | |
706 | "test_role": {{ | |
707 | "name": "test_role", | |
708 | "description": "Test Role", | |
709 | "scopes_permissions": {{ | |
710 | "{}": ["{}", "{}"], | |
711 | "{}": ["{}"] | |
712 | }} | |
713 | }} | |
714 | }}, | |
715 | "version": 1 | |
716 | }} | |
717 | '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ, | |
718 | Permission.UPDATE, Scope.POOL, Permission.CREATE) | |
719 | ||
720 | load_access_control_db() | |
721 | role = self.exec_cmd('ac-role-show', rolename="test_role") | |
722 | self.assertDictEqual(role, { | |
723 | 'name': 'test_role', | |
724 | 'description': "Test Role", | |
725 | 'scopes_permissions': { | |
726 | Scope.ISCSI: [Permission.READ, Permission.UPDATE], | |
727 | Scope.POOL: [Permission.CREATE] | |
728 | } | |
729 | }) | |
730 | user = self.exec_cmd('ac-user-show', username="admin") | |
731 | self.assertDictEqual(user, { | |
732 | 'username': 'admin', | |
733 | 'lastUpdate': user['lastUpdate'], | |
734 | 'password': | |
735 | "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", | |
736 | 'pwdExpirationDate': None, | |
737 | 'pwdUpdateRequired': False, | |
738 | 'name': 'admin User', | |
739 | 'email': 'admin@user.com', | |
740 | 'roles': ['block-manager', 'test_role'], | |
741 | 'enabled': True | |
742 | }) | |
743 | ||
744 | def test_load_v2(self): | |
745 | self.CONFIG_KEY_DICT['accessdb_v2'] = ''' | |
746 | {{ | |
747 | "users": {{ | |
748 | "admin": {{ | |
749 | "username": "admin", | |
750 | "password": | |
751 | "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", | |
752 | "pwdExpirationDate": null, | |
753 | "pwdUpdateRequired": false, | |
754 | "roles": ["block-manager", "test_role"], | |
755 | "name": "admin User", | |
756 | "email": "admin@user.com", | |
757 | "lastUpdate": {}, | |
758 | "enabled": true | |
759 | }} | |
760 | }}, | |
761 | "roles": {{ | |
762 | "test_role": {{ | |
763 | "name": "test_role", | |
764 | "description": "Test Role", | |
765 | "scopes_permissions": {{ | |
766 | "{}": ["{}", "{}"], | |
767 | "{}": ["{}"] | |
768 | }} | |
769 | }} | |
770 | }}, | |
771 | "version": 2 | |
772 | }} | |
773 | '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ, | |
774 | Permission.UPDATE, Scope.POOL, Permission.CREATE) | |
775 | ||
776 | load_access_control_db() | |
777 | role = self.exec_cmd('ac-role-show', rolename="test_role") | |
778 | self.assertDictEqual(role, { | |
779 | 'name': 'test_role', | |
780 | 'description': "Test Role", | |
781 | 'scopes_permissions': { | |
782 | Scope.ISCSI: [Permission.READ, Permission.UPDATE], | |
783 | Scope.POOL: [Permission.CREATE] | |
784 | } | |
785 | }) | |
786 | user = self.exec_cmd('ac-user-show', username="admin") | |
787 | self.assertDictEqual(user, { | |
788 | 'username': 'admin', | |
789 | 'lastUpdate': user['lastUpdate'], | |
790 | 'password': | |
791 | "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", | |
792 | 'pwdExpirationDate': None, | |
793 | 'pwdUpdateRequired': False, | |
794 | 'name': 'admin User', | |
795 | 'email': 'admin@user.com', | |
796 | 'roles': ['block-manager', 'test_role'], | |
797 | 'enabled': True | |
798 | }) | |
799 | ||
9f95a23c TL |
800 | def test_password_policy_pw_length(self): |
801 | Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True | |
802 | Settings.PWD_POLICY_MIN_LENGTH = 3 | |
803 | pw_policy = PasswordPolicy('foo') | |
804 | self.assertTrue(pw_policy.check_password_length()) | |
805 | ||
806 | def test_password_policy_pw_length_fail(self): | |
807 | Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True | |
808 | pw_policy = PasswordPolicy('bar') | |
809 | self.assertFalse(pw_policy.check_password_length()) | |
810 | ||
811 | def test_password_policy_credits_too_weak(self): | |
812 | Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True | |
813 | pw_policy = PasswordPolicy('foo') | |
814 | pw_credits = pw_policy.check_password_complexity() | |
815 | self.assertEqual(pw_credits, 3) | |
816 | ||
817 | def test_password_policy_credits_weak(self): | |
818 | Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True | |
819 | pw_policy = PasswordPolicy('mypassword1') | |
820 | pw_credits = pw_policy.check_password_complexity() | |
821 | self.assertEqual(pw_credits, 11) | |
822 | ||
823 | def test_password_policy_credits_ok(self): | |
824 | Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True | |
825 | pw_policy = PasswordPolicy('mypassword1!@') | |
826 | pw_credits = pw_policy.check_password_complexity() | |
827 | self.assertEqual(pw_credits, 17) | |
828 | ||
829 | def test_password_policy_credits_strong(self): | |
830 | Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True | |
831 | pw_policy = PasswordPolicy('testpassword0047!@') | |
832 | pw_credits = pw_policy.check_password_complexity() | |
833 | self.assertEqual(pw_credits, 22) | |
834 | ||
835 | def test_password_policy_credits_very_strong(self): | |
836 | Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True | |
837 | pw_policy = PasswordPolicy('testpassword#!$!@$') | |
838 | pw_credits = pw_policy.check_password_complexity() | |
839 | self.assertEqual(pw_credits, 30) | |
840 | ||
841 | def test_password_policy_forbidden_words(self): | |
842 | Settings.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED = True | |
843 | pw_policy = PasswordPolicy('!@$testdashboard#!$') | |
844 | self.assertTrue(pw_policy.check_if_contains_forbidden_words()) | |
845 | ||
846 | def test_password_policy_forbidden_words_custom(self): | |
847 | Settings.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED = True | |
848 | Settings.PWD_POLICY_EXCLUSION_LIST = 'foo,bar' | |
849 | pw_policy = PasswordPolicy('foo123bar') | |
850 | self.assertTrue(pw_policy.check_if_contains_forbidden_words()) | |
851 | ||
852 | def test_password_policy_sequential_chars(self): | |
853 | Settings.PWD_POLICY_CHECK_SEQUENTIAL_CHARS_ENABLED = True | |
854 | pw_policy = PasswordPolicy('!@$test123#!$') | |
855 | self.assertTrue(pw_policy.check_if_sequential_characters()) | |
856 | ||
857 | def test_password_policy_repetitive_chars(self): | |
858 | Settings.PWD_POLICY_CHECK_REPETITIVE_CHARS_ENABLED = True | |
859 | pw_policy = PasswordPolicy('!@$testfooo#!$') | |
860 | self.assertTrue(pw_policy.check_if_repetitive_characters()) | |
861 | ||
862 | def test_password_policy_contain_username(self): | |
863 | Settings.PWD_POLICY_CHECK_USERNAME_ENABLED = True | |
864 | pw_policy = PasswordPolicy('%admin135)', 'admin') | |
865 | self.assertTrue(pw_policy.check_if_contains_username()) | |
866 | ||
867 | def test_password_policy_is_old_pwd(self): | |
868 | Settings.PWD_POLICY_CHECK_OLDPWD_ENABLED = True | |
869 | pw_policy = PasswordPolicy('foo', old_password='foo') | |
870 | self.assertTrue(pw_policy.check_is_old_password()) |