]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | # -*- coding: utf-8 -*- |
2 | # pylint: disable=dangerous-default-value,too-many-public-methods | |
3 | from __future__ import absolute_import | |
4 | ||
5 | import errno | |
6 | import json | |
7 | import time | |
8 | import unittest | |
9 | ||
9f95a23c TL |
10 | from datetime import datetime, timedelta |
11 | ||
11fdf7f2 TL |
12 | from . import CmdException, CLICommandTestMixin |
13 | from .. import mgr | |
14 | from ..security import Scope, Permission | |
15 | from ..services.access_control import load_access_control_db, \ | |
16 | password_hash, AccessControlDB, \ | |
9f95a23c TL |
17 | SYSTEM_ROLES, PasswordPolicy |
18 | from ..settings import Settings | |
11fdf7f2 TL |
19 | |
20 | ||
21 | class AccessControlTest(unittest.TestCase, CLICommandTestMixin): | |
22 | ||
23 | @classmethod | |
24 | def setUpClass(cls): | |
25 | cls.mock_kv_store() | |
26 | mgr.ACCESS_CONTROL_DB = None | |
27 | ||
28 | def setUp(self): | |
29 | self.CONFIG_KEY_DICT.clear() | |
30 | load_access_control_db() | |
31 | ||
32 | def load_persistent_db(self): | |
33 | config_key = AccessControlDB.accessdb_config_key() | |
34 | self.assertIn(config_key, self.CONFIG_KEY_DICT) | |
35 | db_json = self.CONFIG_KEY_DICT[config_key] | |
36 | db = json.loads(db_json) | |
37 | return db | |
38 | ||
39 | # The DB is written to persistent storage the first time it is saved. | |
40 | # However, should an operation fail due to <reasons>, we may end up in | |
41 | # a state where we have a completely empty CONFIG_KEY_DICT (our mock | |
42 | # equivalent to the persistent state). While this works for most of the | |
43 | # tests in this class, that would prevent us from testing things like | |
44 | # "run a command that is expected to fail, and then ensure nothing | |
45 | # happened", because we'd be asserting in `load_persistent_db()` due to | |
46 | # the map being empty. | |
47 | # | |
48 | # This function will therefore force state to be written to our mock | |
49 | # persistent state. We could have added this extra step to | |
50 | # `load_persistent_db()` directly, but that would conflict with the | |
51 | # upgrade tests. This way, we can selectively enforce this requirement | |
52 | # where we believe it to be necessary; generically speaking, this should | |
53 | # not be needed unless we're testing very specific behaviors. | |
54 | # | |
55 | def setup_and_load_persistent_db(self): | |
56 | mgr.ACCESS_CTRL_DB.save() | |
57 | self.load_persistent_db() | |
58 | ||
59 | def validate_persistent_role(self, rolename, scopes_permissions, | |
60 | description=None): | |
61 | db = self.load_persistent_db() | |
62 | self.assertIn('roles', db) | |
63 | self.assertIn(rolename, db['roles']) | |
64 | self.assertEqual(db['roles'][rolename]['name'], rolename) | |
65 | self.assertEqual(db['roles'][rolename]['description'], description) | |
66 | self.assertDictEqual(db['roles'][rolename]['scopes_permissions'], | |
67 | scopes_permissions) | |
68 | ||
69 | def validate_persistent_no_role(self, rolename): | |
70 | db = self.load_persistent_db() | |
71 | self.assertIn('roles', db) | |
72 | self.assertNotIn(rolename, db['roles']) | |
73 | ||
74 | def validate_persistent_user(self, username, roles, password=None, | |
9f95a23c TL |
75 | name=None, email=None, last_update=None, |
76 | enabled=True, pwdExpirationDate=None): | |
11fdf7f2 TL |
77 | db = self.load_persistent_db() |
78 | self.assertIn('users', db) | |
79 | self.assertIn(username, db['users']) | |
80 | self.assertEqual(db['users'][username]['username'], username) | |
81 | self.assertListEqual(db['users'][username]['roles'], roles) | |
82 | if password: | |
83 | self.assertEqual(db['users'][username]['password'], password) | |
84 | if name: | |
85 | self.assertEqual(db['users'][username]['name'], name) | |
86 | if email: | |
87 | self.assertEqual(db['users'][username]['email'], email) | |
9f95a23c TL |
88 | if last_update: |
89 | self.assertEqual(db['users'][username]['lastUpdate'], last_update) | |
90 | if pwdExpirationDate: | |
91 | self.assertEqual(db['users'][username]['pwdExpirationDate'], pwdExpirationDate) | |
92 | self.assertEqual(db['users'][username]['enabled'], enabled) | |
11fdf7f2 TL |
93 | |
94 | def validate_persistent_no_user(self, username): | |
95 | db = self.load_persistent_db() | |
96 | self.assertIn('users', db) | |
97 | self.assertNotIn(username, db['users']) | |
98 | ||
99 | def test_create_role(self): | |
100 | role = self.exec_cmd('ac-role-create', rolename='test_role') | |
101 | self.assertDictEqual(role, {'name': 'test_role', 'description': None, | |
102 | 'scopes_permissions': {}}) | |
103 | self.validate_persistent_role('test_role', {}) | |
104 | ||
105 | def test_create_role_with_desc(self): | |
106 | role = self.exec_cmd('ac-role-create', rolename='test_role', | |
107 | description='Test Role') | |
108 | self.assertDictEqual(role, {'name': 'test_role', | |
109 | 'description': 'Test Role', | |
110 | 'scopes_permissions': {}}) | |
111 | self.validate_persistent_role('test_role', {}, 'Test Role') | |
112 | ||
113 | def test_create_duplicate_role(self): | |
114 | self.test_create_role() | |
115 | ||
116 | with self.assertRaises(CmdException) as ctx: | |
117 | self.exec_cmd('ac-role-create', rolename='test_role') | |
118 | ||
119 | self.assertEqual(ctx.exception.retcode, -errno.EEXIST) | |
120 | self.assertEqual(str(ctx.exception), "Role 'test_role' already exists") | |
121 | ||
122 | def test_delete_role(self): | |
123 | self.test_create_role() | |
124 | out = self.exec_cmd('ac-role-delete', rolename='test_role') | |
125 | self.assertEqual(out, "Role 'test_role' deleted") | |
126 | self.validate_persistent_no_role('test_role') | |
127 | ||
128 | def test_delete_nonexistent_role(self): | |
129 | with self.assertRaises(CmdException) as ctx: | |
130 | self.exec_cmd('ac-role-delete', rolename='test_role') | |
131 | ||
132 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
133 | self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist") | |
134 | ||
135 | def test_show_single_role(self): | |
136 | self.test_create_role() | |
137 | role = self.exec_cmd('ac-role-show', rolename='test_role') | |
138 | self.assertDictEqual(role, {'name': 'test_role', 'description': None, | |
139 | 'scopes_permissions': {}}) | |
140 | ||
141 | def test_show_nonexistent_role(self): | |
142 | with self.assertRaises(CmdException) as ctx: | |
143 | self.exec_cmd('ac-role-show', rolename='test_role') | |
144 | ||
145 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
146 | self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist") | |
147 | ||
148 | def test_show_system_roles(self): | |
149 | roles = self.exec_cmd('ac-role-show') | |
150 | self.assertEqual(len(roles), len(SYSTEM_ROLES)) | |
151 | for role in roles: | |
152 | self.assertIn(role, SYSTEM_ROLES) | |
153 | ||
154 | def test_show_system_role(self): | |
155 | role = self.exec_cmd('ac-role-show', rolename="read-only") | |
156 | self.assertEqual(role['name'], 'read-only') | |
157 | self.assertEqual(role['description'], 'Read-Only') | |
158 | ||
159 | def test_delete_system_role(self): | |
160 | with self.assertRaises(CmdException) as ctx: | |
161 | self.exec_cmd('ac-role-delete', rolename='administrator') | |
162 | ||
163 | self.assertEqual(ctx.exception.retcode, -errno.EPERM) | |
164 | self.assertEqual(str(ctx.exception), | |
165 | "Cannot delete system role 'administrator'") | |
166 | ||
167 | def test_add_role_scope_perms(self): | |
168 | self.test_create_role() | |
169 | self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', | |
170 | scopename=Scope.POOL, | |
171 | permissions=[Permission.READ, Permission.DELETE]) | |
172 | role = self.exec_cmd('ac-role-show', rolename='test_role') | |
173 | self.assertDictEqual(role, {'name': 'test_role', | |
174 | 'description': None, | |
175 | 'scopes_permissions': { | |
176 | Scope.POOL: [Permission.DELETE, | |
177 | Permission.READ] | |
178 | }}) | |
179 | self.validate_persistent_role('test_role', { | |
180 | Scope.POOL: [Permission.DELETE, Permission.READ] | |
181 | }) | |
182 | ||
183 | def test_del_role_scope_perms(self): | |
184 | self.test_add_role_scope_perms() | |
185 | self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', | |
186 | scopename=Scope.MONITOR, | |
187 | permissions=[Permission.READ, Permission.CREATE]) | |
188 | self.validate_persistent_role('test_role', { | |
189 | Scope.POOL: [Permission.DELETE, Permission.READ], | |
190 | Scope.MONITOR: [Permission.CREATE, Permission.READ] | |
191 | }) | |
192 | self.exec_cmd('ac-role-del-scope-perms', rolename='test_role', | |
193 | scopename=Scope.POOL) | |
194 | role = self.exec_cmd('ac-role-show', rolename='test_role') | |
195 | self.assertDictEqual(role, {'name': 'test_role', | |
196 | 'description': None, | |
197 | 'scopes_permissions': { | |
198 | Scope.MONITOR: [Permission.CREATE, | |
199 | Permission.READ] | |
200 | }}) | |
201 | self.validate_persistent_role('test_role', { | |
202 | Scope.MONITOR: [Permission.CREATE, Permission.READ] | |
203 | }) | |
204 | ||
205 | def test_add_role_scope_perms_nonexistent_role(self): | |
206 | ||
207 | with self.assertRaises(CmdException) as ctx: | |
208 | self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', | |
209 | scopename='pool', | |
210 | permissions=['read', 'delete']) | |
211 | ||
212 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
213 | self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist") | |
214 | ||
215 | def test_add_role_invalid_scope_perms(self): | |
216 | self.test_create_role() | |
217 | ||
218 | with self.assertRaises(CmdException) as ctx: | |
219 | self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', | |
220 | scopename='invalidscope', | |
221 | permissions=['read', 'delete']) | |
222 | ||
223 | self.assertEqual(ctx.exception.retcode, -errno.EINVAL) | |
224 | self.assertEqual(str(ctx.exception), | |
225 | "Scope 'invalidscope' is not valid\n Possible values: " | |
226 | "{}".format(Scope.all_scopes())) | |
227 | ||
228 | def test_add_role_scope_invalid_perms(self): | |
229 | self.test_create_role() | |
230 | ||
231 | with self.assertRaises(CmdException) as ctx: | |
232 | self.exec_cmd('ac-role-add-scope-perms', rolename='test_role', | |
233 | scopename='pool', permissions=['invalidperm']) | |
234 | ||
235 | self.assertEqual(ctx.exception.retcode, -errno.EINVAL) | |
236 | self.assertEqual(str(ctx.exception), | |
237 | "Permission 'invalidperm' is not valid\n Possible " | |
238 | "values: {}".format(Permission.all_permissions())) | |
239 | ||
240 | def test_del_role_scope_perms_nonexistent_role(self): | |
241 | ||
242 | with self.assertRaises(CmdException) as ctx: | |
243 | self.exec_cmd('ac-role-del-scope-perms', rolename='test_role', | |
244 | scopename='pool') | |
245 | ||
246 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
247 | self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist") | |
248 | ||
249 | def test_del_role_nonexistent_scope_perms(self): | |
250 | self.test_add_role_scope_perms() | |
251 | ||
252 | with self.assertRaises(CmdException) as ctx: | |
253 | self.exec_cmd('ac-role-del-scope-perms', rolename='test_role', | |
254 | scopename='nonexistentscope') | |
255 | ||
256 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
257 | self.assertEqual(str(ctx.exception), | |
258 | "There are no permissions for scope 'nonexistentscope' " | |
259 | "in role 'test_role'") | |
260 | ||
261 | def test_not_permitted_add_role_scope_perms(self): | |
262 | with self.assertRaises(CmdException) as ctx: | |
263 | self.exec_cmd('ac-role-add-scope-perms', rolename='read-only', | |
264 | scopename='pool', permissions=['read', 'delete']) | |
265 | ||
266 | self.assertEqual(ctx.exception.retcode, -errno.EPERM) | |
267 | self.assertEqual(str(ctx.exception), | |
268 | "Cannot update system role 'read-only'") | |
269 | ||
270 | def test_not_permitted_del_role_scope_perms(self): | |
271 | with self.assertRaises(CmdException) as ctx: | |
272 | self.exec_cmd('ac-role-del-scope-perms', rolename='read-only', | |
273 | scopename='pool') | |
274 | ||
275 | self.assertEqual(ctx.exception.retcode, -errno.EPERM) | |
276 | self.assertEqual(str(ctx.exception), | |
277 | "Cannot update system role 'read-only'") | |
278 | ||
9f95a23c TL |
279 | def test_create_user(self, username='admin', rolename=None, enabled=True, |
280 | pwdExpirationDate=None): | |
11fdf7f2 TL |
281 | user = self.exec_cmd('ac-user-create', username=username, |
282 | rolename=rolename, password='admin', | |
283 | name='{} User'.format(username), | |
9f95a23c TL |
284 | email='{}@user.com'.format(username), |
285 | enabled=enabled, force_password=True, | |
286 | pwd_expiration_date=pwdExpirationDate) | |
11fdf7f2 TL |
287 | |
288 | pass_hash = password_hash('admin', user['password']) | |
289 | self.assertDictEqual(user, { | |
290 | 'username': username, | |
291 | 'password': pass_hash, | |
9f95a23c TL |
292 | 'pwdExpirationDate': pwdExpirationDate, |
293 | 'pwdUpdateRequired': False, | |
11fdf7f2 TL |
294 | 'lastUpdate': user['lastUpdate'], |
295 | 'name': '{} User'.format(username), | |
296 | 'email': '{}@user.com'.format(username), | |
9f95a23c TL |
297 | 'roles': [rolename] if rolename else [], |
298 | 'enabled': enabled | |
11fdf7f2 TL |
299 | }) |
300 | self.validate_persistent_user(username, [rolename] if rolename else [], | |
301 | pass_hash, '{} User'.format(username), | |
302 | '{}@user.com'.format(username), | |
9f95a23c | 303 | user['lastUpdate'], enabled) |
11fdf7f2 TL |
304 | return user |
305 | ||
9f95a23c TL |
306 | def test_create_disabled_user(self): |
307 | self.test_create_user(enabled=False) | |
308 | ||
309 | def test_create_user_pwd_expiration_date(self): | |
310 | expiration_date = datetime.utcnow() + timedelta(days=10) | |
311 | expiration_date = int(time.mktime(expiration_date.timetuple())) | |
312 | self.test_create_user(pwdExpirationDate=expiration_date) | |
313 | ||
11fdf7f2 TL |
314 | def test_create_user_with_role(self): |
315 | self.test_add_role_scope_perms() | |
316 | self.test_create_user(rolename='test_role') | |
317 | ||
318 | def test_create_user_with_system_role(self): | |
319 | self.test_create_user(rolename='administrator') | |
320 | ||
321 | def test_delete_user(self): | |
322 | self.test_create_user() | |
323 | out = self.exec_cmd('ac-user-delete', username='admin') | |
324 | self.assertEqual(out, "User 'admin' deleted") | |
325 | users = self.exec_cmd('ac-user-show') | |
326 | self.assertEqual(len(users), 0) | |
327 | self.validate_persistent_no_user('admin') | |
328 | ||
329 | def test_create_duplicate_user(self): | |
330 | self.test_create_user() | |
801d1391 TL |
331 | ret = self.exec_cmd('ac-user-create', username='admin', password='admin', |
332 | force_password=True) | |
333 | self.assertEqual(ret, "User 'admin' already exists") | |
11fdf7f2 TL |
334 | |
335 | def test_create_users_with_dne_role(self): | |
336 | # one time call to setup our persistent db | |
337 | self.setup_and_load_persistent_db() | |
338 | ||
339 | # create a user with a role that does not exist; expect a failure | |
340 | try: | |
341 | self.exec_cmd('ac-user-create', username='foo', | |
342 | rolename='dne_role', password='foopass', | |
9f95a23c TL |
343 | name='foo User', email='foo@user.com', |
344 | force_password=True) | |
11fdf7f2 TL |
345 | except CmdException as e: |
346 | self.assertEqual(e.retcode, -errno.ENOENT) | |
347 | ||
348 | db = self.load_persistent_db() | |
349 | if 'users' in db: | |
350 | self.assertNotIn('foo', db['users']) | |
351 | ||
352 | # We could just finish our test here, given we ensured that the user | |
353 | # with a non-existent role is not in persistent storage. However, | |
354 | # we're going to test the database's consistency, making sure that | |
355 | # side-effects are not written to persistent storage once we commit | |
356 | # an unrelated operation. To ensure this, we'll issue another | |
357 | # operation that is sharing the same code path, and will check whether | |
358 | # the next operation commits dirty state. | |
359 | ||
360 | # create a role (this will be 'test_role') | |
361 | self.test_create_role() | |
362 | self.exec_cmd('ac-user-create', username='bar', | |
363 | rolename='test_role', password='barpass', | |
9f95a23c TL |
364 | name='bar User', email='bar@user.com', |
365 | force_password=True) | |
11fdf7f2 TL |
366 | |
367 | # validate db: | |
368 | # user 'foo' should not exist | |
369 | # user 'bar' should exist and have role 'test_role' | |
370 | self.validate_persistent_user('bar', ['test_role']) | |
371 | ||
372 | db = self.load_persistent_db() | |
373 | self.assertIn('users', db) | |
374 | self.assertNotIn('foo', db['users']) | |
375 | ||
376 | def test_delete_nonexistent_user(self): | |
377 | with self.assertRaises(CmdException) as ctx: | |
378 | self.exec_cmd('ac-user-delete', username='admin') | |
379 | ||
380 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
381 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
382 | ||
383 | def test_add_user_roles(self, username='admin', | |
384 | roles=['pool-manager', 'block-manager']): | |
385 | user_orig = self.test_create_user(username) | |
386 | uroles = [] | |
387 | for role in roles: | |
388 | uroles.append(role) | |
389 | uroles.sort() | |
390 | user = self.exec_cmd('ac-user-add-roles', username=username, | |
391 | roles=[role]) | |
392 | self.assertDictContainsSubset({'roles': uroles}, user) | |
393 | self.validate_persistent_user(username, uroles) | |
394 | self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) | |
395 | ||
396 | def test_add_user_roles2(self): | |
397 | user_orig = self.test_create_user() | |
398 | user = self.exec_cmd('ac-user-add-roles', username="admin", | |
399 | roles=['pool-manager', 'block-manager']) | |
400 | self.assertDictContainsSubset( | |
401 | {'roles': ['block-manager', 'pool-manager']}, user) | |
402 | self.validate_persistent_user('admin', ['block-manager', | |
403 | 'pool-manager']) | |
404 | self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) | |
405 | ||
406 | def test_add_user_roles_not_existent_user(self): | |
407 | with self.assertRaises(CmdException) as ctx: | |
408 | self.exec_cmd('ac-user-add-roles', username="admin", | |
409 | roles=['pool-manager', 'block-manager']) | |
410 | ||
411 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
412 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
413 | ||
414 | def test_add_user_roles_not_existent_role(self): | |
415 | self.test_create_user() | |
416 | with self.assertRaises(CmdException) as ctx: | |
417 | self.exec_cmd('ac-user-add-roles', username="admin", | |
418 | roles=['Invalid Role']) | |
419 | ||
420 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
421 | self.assertEqual(str(ctx.exception), | |
422 | "Role 'Invalid Role' does not exist") | |
423 | ||
424 | def test_set_user_roles(self): | |
425 | user_orig = self.test_create_user() | |
426 | user = self.exec_cmd('ac-user-add-roles', username="admin", | |
427 | roles=['pool-manager']) | |
428 | self.assertDictContainsSubset( | |
429 | {'roles': ['pool-manager']}, user) | |
430 | self.validate_persistent_user('admin', ['pool-manager']) | |
431 | self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) | |
432 | user2 = self.exec_cmd('ac-user-set-roles', username="admin", | |
433 | roles=['rgw-manager', 'block-manager']) | |
434 | self.assertDictContainsSubset( | |
435 | {'roles': ['block-manager', 'rgw-manager']}, user2) | |
436 | self.validate_persistent_user('admin', ['block-manager', | |
437 | 'rgw-manager']) | |
438 | self.assertGreaterEqual(user2['lastUpdate'], user['lastUpdate']) | |
439 | ||
440 | def test_set_user_roles_not_existent_user(self): | |
441 | with self.assertRaises(CmdException) as ctx: | |
442 | self.exec_cmd('ac-user-set-roles', username="admin", | |
443 | roles=['pool-manager', 'block-manager']) | |
444 | ||
445 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
446 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
447 | ||
448 | def test_set_user_roles_not_existent_role(self): | |
449 | self.test_create_user() | |
450 | with self.assertRaises(CmdException) as ctx: | |
451 | self.exec_cmd('ac-user-set-roles', username="admin", | |
452 | roles=['Invalid Role']) | |
453 | ||
454 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
455 | self.assertEqual(str(ctx.exception), | |
456 | "Role 'Invalid Role' does not exist") | |
457 | ||
458 | def test_del_user_roles(self): | |
459 | self.test_add_user_roles() | |
460 | user = self.exec_cmd('ac-user-del-roles', username="admin", | |
461 | roles=['pool-manager']) | |
462 | self.assertDictContainsSubset( | |
463 | {'roles': ['block-manager']}, user) | |
464 | self.validate_persistent_user('admin', ['block-manager']) | |
465 | ||
466 | def test_del_user_roles_not_existent_user(self): | |
467 | with self.assertRaises(CmdException) as ctx: | |
468 | self.exec_cmd('ac-user-del-roles', username="admin", | |
469 | roles=['pool-manager', 'block-manager']) | |
470 | ||
471 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
472 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
473 | ||
474 | def test_del_user_roles_not_existent_role(self): | |
475 | self.test_create_user() | |
476 | with self.assertRaises(CmdException) as ctx: | |
477 | self.exec_cmd('ac-user-del-roles', username="admin", | |
478 | roles=['Invalid Role']) | |
479 | ||
480 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
481 | self.assertEqual(str(ctx.exception), | |
482 | "Role 'Invalid Role' does not exist") | |
483 | ||
484 | def test_del_user_roles_not_associated_role(self): | |
485 | self.test_create_user() | |
486 | with self.assertRaises(CmdException) as ctx: | |
487 | self.exec_cmd('ac-user-del-roles', username="admin", | |
488 | roles=['rgw-manager']) | |
489 | ||
490 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
491 | self.assertEqual(str(ctx.exception), | |
492 | "Role 'rgw-manager' is not associated with user " | |
493 | "'admin'") | |
494 | ||
495 | def test_show_user(self): | |
496 | self.test_add_user_roles() | |
497 | user = self.exec_cmd('ac-user-show', username='admin') | |
498 | pass_hash = password_hash('admin', user['password']) | |
499 | self.assertDictEqual(user, { | |
500 | 'username': 'admin', | |
501 | 'lastUpdate': user['lastUpdate'], | |
502 | 'password': pass_hash, | |
9f95a23c TL |
503 | 'pwdExpirationDate': None, |
504 | 'pwdUpdateRequired': False, | |
11fdf7f2 TL |
505 | 'name': 'admin User', |
506 | 'email': 'admin@user.com', | |
9f95a23c TL |
507 | 'roles': ['block-manager', 'pool-manager'], |
508 | 'enabled': True | |
11fdf7f2 TL |
509 | }) |
510 | ||
511 | def test_show_nonexistent_user(self): | |
512 | with self.assertRaises(CmdException) as ctx: | |
513 | self.exec_cmd('ac-user-show', username='admin') | |
514 | ||
515 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
516 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
517 | ||
518 | def test_show_all_users(self): | |
519 | self.test_add_user_roles('admin', ['administrator']) | |
520 | self.test_add_user_roles('guest', ['read-only']) | |
521 | users = self.exec_cmd('ac-user-show') | |
522 | self.assertEqual(len(users), 2) | |
523 | for user in users: | |
524 | self.assertIn(user, ['admin', 'guest']) | |
525 | ||
526 | def test_del_role_associated_with_user(self): | |
527 | self.test_create_role() | |
528 | self.test_add_user_roles('guest', ['test_role']) | |
529 | ||
530 | with self.assertRaises(CmdException) as ctx: | |
531 | self.exec_cmd('ac-role-delete', rolename='test_role') | |
532 | ||
533 | self.assertEqual(ctx.exception.retcode, -errno.EPERM) | |
534 | self.assertEqual(str(ctx.exception), | |
535 | "Role 'test_role' is still associated with user " | |
536 | "'guest'") | |
537 | ||
538 | def test_set_user_info(self): | |
539 | user_orig = self.test_create_user() | |
540 | user = self.exec_cmd('ac-user-set-info', username='admin', | |
541 | name='Admin Name', email='admin@admin.com') | |
542 | pass_hash = password_hash('admin', user['password']) | |
543 | self.assertDictEqual(user, { | |
544 | 'username': 'admin', | |
545 | 'password': pass_hash, | |
9f95a23c TL |
546 | 'pwdExpirationDate': None, |
547 | 'pwdUpdateRequired': False, | |
11fdf7f2 TL |
548 | 'name': 'Admin Name', |
549 | 'email': 'admin@admin.com', | |
550 | 'lastUpdate': user['lastUpdate'], | |
9f95a23c TL |
551 | 'roles': [], |
552 | 'enabled': True | |
11fdf7f2 TL |
553 | }) |
554 | self.validate_persistent_user('admin', [], pass_hash, 'Admin Name', | |
555 | 'admin@admin.com') | |
556 | self.assertEqual(user['lastUpdate'], user_orig['lastUpdate']) | |
557 | ||
558 | def test_set_user_info_nonexistent_user(self): | |
559 | with self.assertRaises(CmdException) as ctx: | |
560 | self.exec_cmd('ac-user-set-info', username='admin', | |
561 | name='Admin Name', email='admin@admin.com') | |
562 | ||
563 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
564 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
565 | ||
566 | def test_set_user_password(self): | |
567 | user_orig = self.test_create_user() | |
568 | user = self.exec_cmd('ac-user-set-password', username='admin', | |
9f95a23c | 569 | password='newpass', force_password=True) |
11fdf7f2 TL |
570 | pass_hash = password_hash('newpass', user['password']) |
571 | self.assertDictEqual(user, { | |
572 | 'username': 'admin', | |
573 | 'password': pass_hash, | |
9f95a23c TL |
574 | 'pwdExpirationDate': None, |
575 | 'pwdUpdateRequired': False, | |
11fdf7f2 TL |
576 | 'name': 'admin User', |
577 | 'email': 'admin@user.com', | |
578 | 'lastUpdate': user['lastUpdate'], | |
9f95a23c TL |
579 | 'roles': [], |
580 | 'enabled': True | |
11fdf7f2 TL |
581 | }) |
582 | self.validate_persistent_user('admin', [], pass_hash, 'admin User', | |
583 | 'admin@user.com') | |
584 | self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) | |
585 | ||
586 | def test_set_user_password_nonexistent_user(self): | |
587 | with self.assertRaises(CmdException) as ctx: | |
588 | self.exec_cmd('ac-user-set-password', username='admin', | |
9f95a23c TL |
589 | password='newpass', force_password=True) |
590 | ||
591 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
592 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
593 | ||
594 | def test_set_user_password_hash(self): | |
595 | user_orig = self.test_create_user() | |
596 | user = self.exec_cmd('ac-user-set-password-hash', username='admin', | |
597 | hashed_password='$2b$12$Pt3Vq/rDt2y9glTPSV.' | |
598 | 'VFegiLkQeIpddtkhoFetNApYmIJOY8gau2') | |
599 | pass_hash = password_hash('newpass', user['password']) | |
600 | self.assertDictEqual(user, { | |
601 | 'username': 'admin', | |
602 | 'password': pass_hash, | |
603 | 'pwdExpirationDate': None, | |
604 | 'pwdUpdateRequired': False, | |
605 | 'name': 'admin User', | |
606 | 'email': 'admin@user.com', | |
607 | 'lastUpdate': user['lastUpdate'], | |
608 | 'roles': [], | |
609 | 'enabled': True | |
610 | }) | |
611 | self.validate_persistent_user('admin', [], pass_hash, 'admin User', | |
612 | 'admin@user.com') | |
613 | self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate']) | |
614 | ||
615 | def test_set_user_password_hash_nonexistent_user(self): | |
616 | with self.assertRaises(CmdException) as ctx: | |
617 | self.exec_cmd('ac-user-set-password-hash', username='admin', | |
618 | hashed_password='$2b$12$Pt3Vq/rDt2y9glTPSV.' | |
619 | 'VFegiLkQeIpddtkhoFetNApYmIJOY8gau2') | |
11fdf7f2 TL |
620 | |
621 | self.assertEqual(ctx.exception.retcode, -errno.ENOENT) | |
622 | self.assertEqual(str(ctx.exception), "User 'admin' does not exist") | |
623 | ||
9f95a23c TL |
624 | def test_set_user_password_hash_broken_hash(self): |
625 | self.test_create_user() | |
626 | with self.assertRaises(CmdException) as ctx: | |
627 | self.exec_cmd('ac-user-set-password-hash', username='admin', | |
628 | hashed_password='') | |
629 | ||
630 | self.assertEqual(ctx.exception.retcode, -errno.EINVAL) | |
631 | self.assertEqual(str(ctx.exception), 'Invalid password hash') | |
632 | ||
11fdf7f2 TL |
633 | def test_set_login_credentials(self): |
634 | self.exec_cmd('set-login-credentials', username='admin', | |
635 | password='admin') | |
636 | user = self.exec_cmd('ac-user-show', username='admin') | |
637 | pass_hash = password_hash('admin', user['password']) | |
638 | self.assertDictEqual(user, { | |
639 | 'username': 'admin', | |
640 | 'password': pass_hash, | |
9f95a23c TL |
641 | 'pwdExpirationDate': None, |
642 | 'pwdUpdateRequired': False, | |
11fdf7f2 TL |
643 | 'name': None, |
644 | 'email': None, | |
645 | 'lastUpdate': user['lastUpdate'], | |
9f95a23c TL |
646 | 'roles': ['administrator'], |
647 | 'enabled': True, | |
11fdf7f2 TL |
648 | }) |
649 | self.validate_persistent_user('admin', ['administrator'], pass_hash, | |
650 | None, None) | |
651 | ||
652 | def test_set_login_credentials_for_existing_user(self): | |
653 | self.test_add_user_roles('admin', ['read-only']) | |
654 | self.exec_cmd('set-login-credentials', username='admin', | |
655 | password='admin2') | |
656 | user = self.exec_cmd('ac-user-show', username='admin') | |
657 | pass_hash = password_hash('admin2', user['password']) | |
658 | self.assertDictEqual(user, { | |
659 | 'username': 'admin', | |
660 | 'password': pass_hash, | |
9f95a23c TL |
661 | 'pwdExpirationDate': None, |
662 | 'pwdUpdateRequired': False, | |
11fdf7f2 TL |
663 | 'name': 'admin User', |
664 | 'email': 'admin@user.com', | |
665 | 'lastUpdate': user['lastUpdate'], | |
9f95a23c TL |
666 | 'roles': ['read-only'], |
667 | 'enabled': True | |
11fdf7f2 TL |
668 | }) |
669 | self.validate_persistent_user('admin', ['read-only'], pass_hash, | |
670 | 'admin User', 'admin@user.com') | |
671 | ||
672 | def test_load_v1(self): | |
673 | self.CONFIG_KEY_DICT['accessdb_v1'] = ''' | |
674 | {{ | |
675 | "users": {{ | |
676 | "admin": {{ | |
677 | "username": "admin", | |
678 | "password": | |
679 | "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", | |
680 | "roles": ["block-manager", "test_role"], | |
681 | "name": "admin User", | |
682 | "email": "admin@user.com", | |
683 | "lastUpdate": {} | |
684 | }} | |
685 | }}, | |
686 | "roles": {{ | |
687 | "test_role": {{ | |
688 | "name": "test_role", | |
689 | "description": "Test Role", | |
690 | "scopes_permissions": {{ | |
691 | "{}": ["{}", "{}"], | |
692 | "{}": ["{}"] | |
693 | }} | |
694 | }} | |
695 | }}, | |
696 | "version": 1 | |
697 | }} | |
698 | '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ, | |
699 | Permission.UPDATE, Scope.POOL, Permission.CREATE) | |
700 | ||
701 | load_access_control_db() | |
702 | role = self.exec_cmd('ac-role-show', rolename="test_role") | |
703 | self.assertDictEqual(role, { | |
704 | 'name': 'test_role', | |
705 | 'description': "Test Role", | |
706 | 'scopes_permissions': { | |
707 | Scope.ISCSI: [Permission.READ, Permission.UPDATE], | |
708 | Scope.POOL: [Permission.CREATE] | |
709 | } | |
710 | }) | |
711 | user = self.exec_cmd('ac-user-show', username="admin") | |
712 | self.assertDictEqual(user, { | |
713 | 'username': 'admin', | |
714 | 'lastUpdate': user['lastUpdate'], | |
715 | 'password': | |
716 | "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", | |
9f95a23c TL |
717 | 'pwdExpirationDate': None, |
718 | 'pwdUpdateRequired': False, | |
719 | 'name': 'admin User', | |
720 | 'email': 'admin@user.com', | |
721 | 'roles': ['block-manager', 'test_role'], | |
722 | 'enabled': True | |
723 | }) | |
724 | ||
725 | def test_load_v2(self): | |
726 | self.CONFIG_KEY_DICT['accessdb_v2'] = ''' | |
727 | {{ | |
728 | "users": {{ | |
729 | "admin": {{ | |
730 | "username": "admin", | |
731 | "password": | |
732 | "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", | |
733 | "pwdExpirationDate": null, | |
734 | "pwdUpdateRequired": false, | |
735 | "roles": ["block-manager", "test_role"], | |
736 | "name": "admin User", | |
737 | "email": "admin@user.com", | |
738 | "lastUpdate": {}, | |
739 | "enabled": true | |
740 | }} | |
741 | }}, | |
742 | "roles": {{ | |
743 | "test_role": {{ | |
744 | "name": "test_role", | |
745 | "description": "Test Role", | |
746 | "scopes_permissions": {{ | |
747 | "{}": ["{}", "{}"], | |
748 | "{}": ["{}"] | |
749 | }} | |
750 | }} | |
751 | }}, | |
752 | "version": 2 | |
753 | }} | |
754 | '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ, | |
755 | Permission.UPDATE, Scope.POOL, Permission.CREATE) | |
756 | ||
757 | load_access_control_db() | |
758 | role = self.exec_cmd('ac-role-show', rolename="test_role") | |
759 | self.assertDictEqual(role, { | |
760 | 'name': 'test_role', | |
761 | 'description': "Test Role", | |
762 | 'scopes_permissions': { | |
763 | Scope.ISCSI: [Permission.READ, Permission.UPDATE], | |
764 | Scope.POOL: [Permission.CREATE] | |
765 | } | |
766 | }) | |
767 | user = self.exec_cmd('ac-user-show', username="admin") | |
768 | self.assertDictEqual(user, { | |
769 | 'username': 'admin', | |
770 | 'lastUpdate': user['lastUpdate'], | |
771 | 'password': | |
772 | "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", | |
773 | 'pwdExpirationDate': None, | |
774 | 'pwdUpdateRequired': False, | |
11fdf7f2 TL |
775 | 'name': 'admin User', |
776 | 'email': 'admin@user.com', | |
9f95a23c TL |
777 | 'roles': ['block-manager', 'test_role'], |
778 | 'enabled': True | |
11fdf7f2 TL |
779 | }) |
780 | ||
781 | def test_update_from_previous_version_v1(self): | |
782 | self.CONFIG_KEY_DICT['username'] = 'admin' | |
783 | self.CONFIG_KEY_DICT['password'] = \ | |
784 | '$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK' | |
785 | load_access_control_db() | |
786 | user = self.exec_cmd('ac-user-show', username="admin") | |
787 | self.assertDictEqual(user, { | |
788 | 'username': 'admin', | |
789 | 'lastUpdate': user['lastUpdate'], | |
790 | 'password': | |
791 | "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK", | |
9f95a23c TL |
792 | 'pwdExpirationDate': None, |
793 | 'pwdUpdateRequired': False, | |
11fdf7f2 TL |
794 | 'name': None, |
795 | 'email': None, | |
9f95a23c TL |
796 | 'roles': ['administrator'], |
797 | 'enabled': True | |
11fdf7f2 | 798 | }) |
9f95a23c TL |
799 | |
800 | def test_password_policy_pw_length(self): | |
801 | Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True | |
802 | Settings.PWD_POLICY_MIN_LENGTH = 3 | |
803 | pw_policy = PasswordPolicy('foo') | |
804 | self.assertTrue(pw_policy.check_password_length()) | |
805 | ||
806 | def test_password_policy_pw_length_fail(self): | |
807 | Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True | |
808 | pw_policy = PasswordPolicy('bar') | |
809 | self.assertFalse(pw_policy.check_password_length()) | |
810 | ||
811 | def test_password_policy_credits_too_weak(self): | |
812 | Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True | |
813 | pw_policy = PasswordPolicy('foo') | |
814 | pw_credits = pw_policy.check_password_complexity() | |
815 | self.assertEqual(pw_credits, 3) | |
816 | ||
817 | def test_password_policy_credits_weak(self): | |
818 | Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True | |
819 | pw_policy = PasswordPolicy('mypassword1') | |
820 | pw_credits = pw_policy.check_password_complexity() | |
821 | self.assertEqual(pw_credits, 11) | |
822 | ||
823 | def test_password_policy_credits_ok(self): | |
824 | Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True | |
825 | pw_policy = PasswordPolicy('mypassword1!@') | |
826 | pw_credits = pw_policy.check_password_complexity() | |
827 | self.assertEqual(pw_credits, 17) | |
828 | ||
829 | def test_password_policy_credits_strong(self): | |
830 | Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True | |
831 | pw_policy = PasswordPolicy('testpassword0047!@') | |
832 | pw_credits = pw_policy.check_password_complexity() | |
833 | self.assertEqual(pw_credits, 22) | |
834 | ||
835 | def test_password_policy_credits_very_strong(self): | |
836 | Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True | |
837 | pw_policy = PasswordPolicy('testpassword#!$!@$') | |
838 | pw_credits = pw_policy.check_password_complexity() | |
839 | self.assertEqual(pw_credits, 30) | |
840 | ||
841 | def test_password_policy_forbidden_words(self): | |
842 | Settings.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED = True | |
843 | pw_policy = PasswordPolicy('!@$testdashboard#!$') | |
844 | self.assertTrue(pw_policy.check_if_contains_forbidden_words()) | |
845 | ||
846 | def test_password_policy_forbidden_words_custom(self): | |
847 | Settings.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED = True | |
848 | Settings.PWD_POLICY_EXCLUSION_LIST = 'foo,bar' | |
849 | pw_policy = PasswordPolicy('foo123bar') | |
850 | self.assertTrue(pw_policy.check_if_contains_forbidden_words()) | |
851 | ||
852 | def test_password_policy_sequential_chars(self): | |
853 | Settings.PWD_POLICY_CHECK_SEQUENTIAL_CHARS_ENABLED = True | |
854 | pw_policy = PasswordPolicy('!@$test123#!$') | |
855 | self.assertTrue(pw_policy.check_if_sequential_characters()) | |
856 | ||
857 | def test_password_policy_repetitive_chars(self): | |
858 | Settings.PWD_POLICY_CHECK_REPETITIVE_CHARS_ENABLED = True | |
859 | pw_policy = PasswordPolicy('!@$testfooo#!$') | |
860 | self.assertTrue(pw_policy.check_if_repetitive_characters()) | |
861 | ||
862 | def test_password_policy_contain_username(self): | |
863 | Settings.PWD_POLICY_CHECK_USERNAME_ENABLED = True | |
864 | pw_policy = PasswordPolicy('%admin135)', 'admin') | |
865 | self.assertTrue(pw_policy.check_if_contains_username()) | |
866 | ||
867 | def test_password_policy_is_old_pwd(self): | |
868 | Settings.PWD_POLICY_CHECK_OLDPWD_ENABLED = True | |
869 | pw_policy = PasswordPolicy('foo', old_password='foo') | |
870 | self.assertTrue(pw_policy.check_is_old_password()) |