1 # -*- coding: utf-8 -*-
2 # pylint: disable=dangerous-default-value,too-many-public-methods
3 from __future__
import absolute_import
10 from datetime
import datetime
, timedelta
12 from . import CmdException
, CLICommandTestMixin
14 from ..security
import Scope
, Permission
15 from ..services
.access_control
import load_access_control_db
, \
16 password_hash
, AccessControlDB
, \
17 SYSTEM_ROLES
, PasswordPolicy
18 from ..settings
import Settings
21 class AccessControlTest(unittest
.TestCase
, CLICommandTestMixin
):
26 mgr
.ACCESS_CONTROL_DB
= None
29 self
.CONFIG_KEY_DICT
.clear()
30 load_access_control_db()
32 def load_persistent_db(self
):
33 config_key
= AccessControlDB
.accessdb_config_key()
34 self
.assertIn(config_key
, self
.CONFIG_KEY_DICT
)
35 db_json
= self
.CONFIG_KEY_DICT
[config_key
]
36 db
= json
.loads(db_json
)
39 # The DB is written to persistent storage the first time it is saved.
40 # However, should an operation fail due to <reasons>, we may end up in
41 # a state where we have a completely empty CONFIG_KEY_DICT (our mock
42 # equivalent to the persistent state). While this works for most of the
43 # tests in this class, that would prevent us from testing things like
44 # "run a command that is expected to fail, and then ensure nothing
45 # happened", because we'd be asserting in `load_persistent_db()` due to
46 # the map being empty.
48 # This function will therefore force state to be written to our mock
49 # persistent state. We could have added this extra step to
50 # `load_persistent_db()` directly, but that would conflict with the
51 # upgrade tests. This way, we can selectively enforce this requirement
52 # where we believe it to be necessary; generically speaking, this should
53 # not be needed unless we're testing very specific behaviors.
55 def setup_and_load_persistent_db(self
):
56 mgr
.ACCESS_CTRL_DB
.save()
57 self
.load_persistent_db()
59 def validate_persistent_role(self
, rolename
, scopes_permissions
,
61 db
= self
.load_persistent_db()
62 self
.assertIn('roles', db
)
63 self
.assertIn(rolename
, db
['roles'])
64 self
.assertEqual(db
['roles'][rolename
]['name'], rolename
)
65 self
.assertEqual(db
['roles'][rolename
]['description'], description
)
66 self
.assertDictEqual(db
['roles'][rolename
]['scopes_permissions'],
69 def validate_persistent_no_role(self
, rolename
):
70 db
= self
.load_persistent_db()
71 self
.assertIn('roles', db
)
72 self
.assertNotIn(rolename
, db
['roles'])
74 def validate_persistent_user(self
, username
, roles
, password
=None,
75 name
=None, email
=None, last_update
=None,
76 enabled
=True, pwdExpirationDate
=None):
77 db
= self
.load_persistent_db()
78 self
.assertIn('users', db
)
79 self
.assertIn(username
, db
['users'])
80 self
.assertEqual(db
['users'][username
]['username'], username
)
81 self
.assertListEqual(db
['users'][username
]['roles'], roles
)
83 self
.assertEqual(db
['users'][username
]['password'], password
)
85 self
.assertEqual(db
['users'][username
]['name'], name
)
87 self
.assertEqual(db
['users'][username
]['email'], email
)
89 self
.assertEqual(db
['users'][username
]['lastUpdate'], last_update
)
91 self
.assertEqual(db
['users'][username
]['pwdExpirationDate'], pwdExpirationDate
)
92 self
.assertEqual(db
['users'][username
]['enabled'], enabled
)
94 def validate_persistent_no_user(self
, username
):
95 db
= self
.load_persistent_db()
96 self
.assertIn('users', db
)
97 self
.assertNotIn(username
, db
['users'])
99 def test_create_role(self
):
100 role
= self
.exec_cmd('ac-role-create', rolename
='test_role')
101 self
.assertDictEqual(role
, {'name': 'test_role', 'description': None,
102 'scopes_permissions': {}})
103 self
.validate_persistent_role('test_role', {})
105 def test_create_role_with_desc(self
):
106 role
= self
.exec_cmd('ac-role-create', rolename
='test_role',
107 description
='Test Role')
108 self
.assertDictEqual(role
, {'name': 'test_role',
109 'description': 'Test Role',
110 'scopes_permissions': {}})
111 self
.validate_persistent_role('test_role', {}, 'Test Role')
113 def test_create_duplicate_role(self
):
114 self
.test_create_role()
116 with self
.assertRaises(CmdException
) as ctx
:
117 self
.exec_cmd('ac-role-create', rolename
='test_role')
119 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EEXIST
)
120 self
.assertEqual(str(ctx
.exception
), "Role 'test_role' already exists")
122 def test_delete_role(self
):
123 self
.test_create_role()
124 out
= self
.exec_cmd('ac-role-delete', rolename
='test_role')
125 self
.assertEqual(out
, "Role 'test_role' deleted")
126 self
.validate_persistent_no_role('test_role')
128 def test_delete_nonexistent_role(self
):
129 with self
.assertRaises(CmdException
) as ctx
:
130 self
.exec_cmd('ac-role-delete', rolename
='test_role')
132 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
133 self
.assertEqual(str(ctx
.exception
), "Role 'test_role' does not exist")
135 def test_show_single_role(self
):
136 self
.test_create_role()
137 role
= self
.exec_cmd('ac-role-show', rolename
='test_role')
138 self
.assertDictEqual(role
, {'name': 'test_role', 'description': None,
139 'scopes_permissions': {}})
141 def test_show_nonexistent_role(self
):
142 with self
.assertRaises(CmdException
) as ctx
:
143 self
.exec_cmd('ac-role-show', rolename
='test_role')
145 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
146 self
.assertEqual(str(ctx
.exception
), "Role 'test_role' does not exist")
148 def test_show_system_roles(self
):
149 roles
= self
.exec_cmd('ac-role-show')
150 self
.assertEqual(len(roles
), len(SYSTEM_ROLES
))
152 self
.assertIn(role
, SYSTEM_ROLES
)
154 def test_show_system_role(self
):
155 role
= self
.exec_cmd('ac-role-show', rolename
="read-only")
156 self
.assertEqual(role
['name'], 'read-only')
157 self
.assertEqual(role
['description'], 'Read-Only')
159 def test_delete_system_role(self
):
160 with self
.assertRaises(CmdException
) as ctx
:
161 self
.exec_cmd('ac-role-delete', rolename
='administrator')
163 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EPERM
)
164 self
.assertEqual(str(ctx
.exception
),
165 "Cannot delete system role 'administrator'")
167 def test_add_role_scope_perms(self
):
168 self
.test_create_role()
169 self
.exec_cmd('ac-role-add-scope-perms', rolename
='test_role',
170 scopename
=Scope
.POOL
,
171 permissions
=[Permission
.READ
, Permission
.DELETE
])
172 role
= self
.exec_cmd('ac-role-show', rolename
='test_role')
173 self
.assertDictEqual(role
, {'name': 'test_role',
175 'scopes_permissions': {
176 Scope
.POOL
: [Permission
.DELETE
,
179 self
.validate_persistent_role('test_role', {
180 Scope
.POOL
: [Permission
.DELETE
, Permission
.READ
]
183 def test_del_role_scope_perms(self
):
184 self
.test_add_role_scope_perms()
185 self
.exec_cmd('ac-role-add-scope-perms', rolename
='test_role',
186 scopename
=Scope
.MONITOR
,
187 permissions
=[Permission
.READ
, Permission
.CREATE
])
188 self
.validate_persistent_role('test_role', {
189 Scope
.POOL
: [Permission
.DELETE
, Permission
.READ
],
190 Scope
.MONITOR
: [Permission
.CREATE
, Permission
.READ
]
192 self
.exec_cmd('ac-role-del-scope-perms', rolename
='test_role',
193 scopename
=Scope
.POOL
)
194 role
= self
.exec_cmd('ac-role-show', rolename
='test_role')
195 self
.assertDictEqual(role
, {'name': 'test_role',
197 'scopes_permissions': {
198 Scope
.MONITOR
: [Permission
.CREATE
,
201 self
.validate_persistent_role('test_role', {
202 Scope
.MONITOR
: [Permission
.CREATE
, Permission
.READ
]
205 def test_add_role_scope_perms_nonexistent_role(self
):
207 with self
.assertRaises(CmdException
) as ctx
:
208 self
.exec_cmd('ac-role-add-scope-perms', rolename
='test_role',
210 permissions
=['read', 'delete'])
212 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
213 self
.assertEqual(str(ctx
.exception
), "Role 'test_role' does not exist")
215 def test_add_role_invalid_scope_perms(self
):
216 self
.test_create_role()
218 with self
.assertRaises(CmdException
) as ctx
:
219 self
.exec_cmd('ac-role-add-scope-perms', rolename
='test_role',
220 scopename
='invalidscope',
221 permissions
=['read', 'delete'])
223 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EINVAL
)
224 self
.assertEqual(str(ctx
.exception
),
225 "Scope 'invalidscope' is not valid\n Possible values: "
226 "{}".format(Scope
.all_scopes()))
228 def test_add_role_scope_invalid_perms(self
):
229 self
.test_create_role()
231 with self
.assertRaises(CmdException
) as ctx
:
232 self
.exec_cmd('ac-role-add-scope-perms', rolename
='test_role',
233 scopename
='pool', permissions
=['invalidperm'])
235 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EINVAL
)
236 self
.assertEqual(str(ctx
.exception
),
237 "Permission 'invalidperm' is not valid\n Possible "
238 "values: {}".format(Permission
.all_permissions()))
240 def test_del_role_scope_perms_nonexistent_role(self
):
242 with self
.assertRaises(CmdException
) as ctx
:
243 self
.exec_cmd('ac-role-del-scope-perms', rolename
='test_role',
246 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
247 self
.assertEqual(str(ctx
.exception
), "Role 'test_role' does not exist")
249 def test_del_role_nonexistent_scope_perms(self
):
250 self
.test_add_role_scope_perms()
252 with self
.assertRaises(CmdException
) as ctx
:
253 self
.exec_cmd('ac-role-del-scope-perms', rolename
='test_role',
254 scopename
='nonexistentscope')
256 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
257 self
.assertEqual(str(ctx
.exception
),
258 "There are no permissions for scope 'nonexistentscope' "
259 "in role 'test_role'")
261 def test_not_permitted_add_role_scope_perms(self
):
262 with self
.assertRaises(CmdException
) as ctx
:
263 self
.exec_cmd('ac-role-add-scope-perms', rolename
='read-only',
264 scopename
='pool', permissions
=['read', 'delete'])
266 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EPERM
)
267 self
.assertEqual(str(ctx
.exception
),
268 "Cannot update system role 'read-only'")
270 def test_not_permitted_del_role_scope_perms(self
):
271 with self
.assertRaises(CmdException
) as ctx
:
272 self
.exec_cmd('ac-role-del-scope-perms', rolename
='read-only',
275 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EPERM
)
276 self
.assertEqual(str(ctx
.exception
),
277 "Cannot update system role 'read-only'")
279 def test_create_user(self
, username
='admin', rolename
=None, enabled
=True,
280 pwdExpirationDate
=None):
281 user
= self
.exec_cmd('ac-user-create', username
=username
,
282 rolename
=rolename
, password
='admin',
283 name
='{} User'.format(username
),
284 email
='{}@user.com'.format(username
),
285 enabled
=enabled
, force_password
=True,
286 pwd_expiration_date
=pwdExpirationDate
)
288 pass_hash
= password_hash('admin', user
['password'])
289 self
.assertDictEqual(user
, {
290 'username': username
,
291 'password': pass_hash
,
292 'pwdExpirationDate': pwdExpirationDate
,
293 'pwdUpdateRequired': False,
294 'lastUpdate': user
['lastUpdate'],
295 'name': '{} User'.format(username
),
296 'email': '{}@user.com'.format(username
),
297 'roles': [rolename
] if rolename
else [],
300 self
.validate_persistent_user(username
, [rolename
] if rolename
else [],
301 pass_hash
, '{} User'.format(username
),
302 '{}@user.com'.format(username
),
303 user
['lastUpdate'], enabled
)
306 def test_create_disabled_user(self
):
307 self
.test_create_user(enabled
=False)
309 def test_create_user_pwd_expiration_date(self
):
310 expiration_date
= datetime
.utcnow() + timedelta(days
=10)
311 expiration_date
= int(time
.mktime(expiration_date
.timetuple()))
312 self
.test_create_user(pwdExpirationDate
=expiration_date
)
314 def test_create_user_with_role(self
):
315 self
.test_add_role_scope_perms()
316 self
.test_create_user(rolename
='test_role')
318 def test_create_user_with_system_role(self
):
319 self
.test_create_user(rolename
='administrator')
321 def test_delete_user(self
):
322 self
.test_create_user()
323 out
= self
.exec_cmd('ac-user-delete', username
='admin')
324 self
.assertEqual(out
, "User 'admin' deleted")
325 users
= self
.exec_cmd('ac-user-show')
326 self
.assertEqual(len(users
), 0)
327 self
.validate_persistent_no_user('admin')
329 def test_create_duplicate_user(self
):
330 self
.test_create_user()
331 ret
= self
.exec_cmd('ac-user-create', username
='admin', password
='admin',
333 self
.assertEqual(ret
, "User 'admin' already exists")
335 def test_create_users_with_dne_role(self
):
336 # one time call to setup our persistent db
337 self
.setup_and_load_persistent_db()
339 # create a user with a role that does not exist; expect a failure
341 self
.exec_cmd('ac-user-create', username
='foo',
342 rolename
='dne_role', password
='foopass',
343 name
='foo User', email
='foo@user.com',
345 except CmdException
as e
:
346 self
.assertEqual(e
.retcode
, -errno
.ENOENT
)
348 db
= self
.load_persistent_db()
350 self
.assertNotIn('foo', db
['users'])
352 # We could just finish our test here, given we ensured that the user
353 # with a non-existent role is not in persistent storage. However,
354 # we're going to test the database's consistency, making sure that
355 # side-effects are not written to persistent storage once we commit
356 # an unrelated operation. To ensure this, we'll issue another
357 # operation that is sharing the same code path, and will check whether
358 # the next operation commits dirty state.
360 # create a role (this will be 'test_role')
361 self
.test_create_role()
362 self
.exec_cmd('ac-user-create', username
='bar',
363 rolename
='test_role', password
='barpass',
364 name
='bar User', email
='bar@user.com',
368 # user 'foo' should not exist
369 # user 'bar' should exist and have role 'test_role'
370 self
.validate_persistent_user('bar', ['test_role'])
372 db
= self
.load_persistent_db()
373 self
.assertIn('users', db
)
374 self
.assertNotIn('foo', db
['users'])
376 def test_delete_nonexistent_user(self
):
377 with self
.assertRaises(CmdException
) as ctx
:
378 self
.exec_cmd('ac-user-delete', username
='admin')
380 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
381 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
383 def test_add_user_roles(self
, username
='admin',
384 roles
=['pool-manager', 'block-manager']):
385 user_orig
= self
.test_create_user(username
)
390 user
= self
.exec_cmd('ac-user-add-roles', username
=username
,
392 self
.assertDictContainsSubset({'roles': uroles
}, user
)
393 self
.validate_persistent_user(username
, uroles
)
394 self
.assertGreaterEqual(user
['lastUpdate'], user_orig
['lastUpdate'])
396 def test_add_user_roles2(self
):
397 user_orig
= self
.test_create_user()
398 user
= self
.exec_cmd('ac-user-add-roles', username
="admin",
399 roles
=['pool-manager', 'block-manager'])
400 self
.assertDictContainsSubset(
401 {'roles': ['block-manager', 'pool-manager']}, user
)
402 self
.validate_persistent_user('admin', ['block-manager',
404 self
.assertGreaterEqual(user
['lastUpdate'], user_orig
['lastUpdate'])
406 def test_add_user_roles_not_existent_user(self
):
407 with self
.assertRaises(CmdException
) as ctx
:
408 self
.exec_cmd('ac-user-add-roles', username
="admin",
409 roles
=['pool-manager', 'block-manager'])
411 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
412 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
414 def test_add_user_roles_not_existent_role(self
):
415 self
.test_create_user()
416 with self
.assertRaises(CmdException
) as ctx
:
417 self
.exec_cmd('ac-user-add-roles', username
="admin",
418 roles
=['Invalid Role'])
420 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
421 self
.assertEqual(str(ctx
.exception
),
422 "Role 'Invalid Role' does not exist")
424 def test_set_user_roles(self
):
425 user_orig
= self
.test_create_user()
426 user
= self
.exec_cmd('ac-user-add-roles', username
="admin",
427 roles
=['pool-manager'])
428 self
.assertDictContainsSubset(
429 {'roles': ['pool-manager']}, user
)
430 self
.validate_persistent_user('admin', ['pool-manager'])
431 self
.assertGreaterEqual(user
['lastUpdate'], user_orig
['lastUpdate'])
432 user2
= self
.exec_cmd('ac-user-set-roles', username
="admin",
433 roles
=['rgw-manager', 'block-manager'])
434 self
.assertDictContainsSubset(
435 {'roles': ['block-manager', 'rgw-manager']}, user2
)
436 self
.validate_persistent_user('admin', ['block-manager',
438 self
.assertGreaterEqual(user2
['lastUpdate'], user
['lastUpdate'])
440 def test_set_user_roles_not_existent_user(self
):
441 with self
.assertRaises(CmdException
) as ctx
:
442 self
.exec_cmd('ac-user-set-roles', username
="admin",
443 roles
=['pool-manager', 'block-manager'])
445 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
446 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
448 def test_set_user_roles_not_existent_role(self
):
449 self
.test_create_user()
450 with self
.assertRaises(CmdException
) as ctx
:
451 self
.exec_cmd('ac-user-set-roles', username
="admin",
452 roles
=['Invalid Role'])
454 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
455 self
.assertEqual(str(ctx
.exception
),
456 "Role 'Invalid Role' does not exist")
458 def test_del_user_roles(self
):
459 self
.test_add_user_roles()
460 user
= self
.exec_cmd('ac-user-del-roles', username
="admin",
461 roles
=['pool-manager'])
462 self
.assertDictContainsSubset(
463 {'roles': ['block-manager']}, user
)
464 self
.validate_persistent_user('admin', ['block-manager'])
466 def test_del_user_roles_not_existent_user(self
):
467 with self
.assertRaises(CmdException
) as ctx
:
468 self
.exec_cmd('ac-user-del-roles', username
="admin",
469 roles
=['pool-manager', 'block-manager'])
471 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
472 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
474 def test_del_user_roles_not_existent_role(self
):
475 self
.test_create_user()
476 with self
.assertRaises(CmdException
) as ctx
:
477 self
.exec_cmd('ac-user-del-roles', username
="admin",
478 roles
=['Invalid Role'])
480 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
481 self
.assertEqual(str(ctx
.exception
),
482 "Role 'Invalid Role' does not exist")
484 def test_del_user_roles_not_associated_role(self
):
485 self
.test_create_user()
486 with self
.assertRaises(CmdException
) as ctx
:
487 self
.exec_cmd('ac-user-del-roles', username
="admin",
488 roles
=['rgw-manager'])
490 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
491 self
.assertEqual(str(ctx
.exception
),
492 "Role 'rgw-manager' is not associated with user "
495 def test_show_user(self
):
496 self
.test_add_user_roles()
497 user
= self
.exec_cmd('ac-user-show', username
='admin')
498 pass_hash
= password_hash('admin', user
['password'])
499 self
.assertDictEqual(user
, {
501 'lastUpdate': user
['lastUpdate'],
502 'password': pass_hash
,
503 'pwdExpirationDate': None,
504 'pwdUpdateRequired': False,
505 'name': 'admin User',
506 'email': 'admin@user.com',
507 'roles': ['block-manager', 'pool-manager'],
511 def test_show_nonexistent_user(self
):
512 with self
.assertRaises(CmdException
) as ctx
:
513 self
.exec_cmd('ac-user-show', username
='admin')
515 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
516 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
518 def test_show_all_users(self
):
519 self
.test_add_user_roles('admin', ['administrator'])
520 self
.test_add_user_roles('guest', ['read-only'])
521 users
= self
.exec_cmd('ac-user-show')
522 self
.assertEqual(len(users
), 2)
524 self
.assertIn(user
, ['admin', 'guest'])
526 def test_del_role_associated_with_user(self
):
527 self
.test_create_role()
528 self
.test_add_user_roles('guest', ['test_role'])
530 with self
.assertRaises(CmdException
) as ctx
:
531 self
.exec_cmd('ac-role-delete', rolename
='test_role')
533 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EPERM
)
534 self
.assertEqual(str(ctx
.exception
),
535 "Role 'test_role' is still associated with user "
538 def test_set_user_info(self
):
539 user_orig
= self
.test_create_user()
540 user
= self
.exec_cmd('ac-user-set-info', username
='admin',
541 name
='Admin Name', email
='admin@admin.com')
542 pass_hash
= password_hash('admin', user
['password'])
543 self
.assertDictEqual(user
, {
545 'password': pass_hash
,
546 'pwdExpirationDate': None,
547 'pwdUpdateRequired': False,
548 'name': 'Admin Name',
549 'email': 'admin@admin.com',
550 'lastUpdate': user
['lastUpdate'],
554 self
.validate_persistent_user('admin', [], pass_hash
, 'Admin Name',
556 self
.assertEqual(user
['lastUpdate'], user_orig
['lastUpdate'])
558 def test_set_user_info_nonexistent_user(self
):
559 with self
.assertRaises(CmdException
) as ctx
:
560 self
.exec_cmd('ac-user-set-info', username
='admin',
561 name
='Admin Name', email
='admin@admin.com')
563 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
564 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
566 def test_set_user_password(self
):
567 user_orig
= self
.test_create_user()
568 user
= self
.exec_cmd('ac-user-set-password', username
='admin',
569 password
='newpass', force_password
=True)
570 pass_hash
= password_hash('newpass', user
['password'])
571 self
.assertDictEqual(user
, {
573 'password': pass_hash
,
574 'pwdExpirationDate': None,
575 'pwdUpdateRequired': False,
576 'name': 'admin User',
577 'email': 'admin@user.com',
578 'lastUpdate': user
['lastUpdate'],
582 self
.validate_persistent_user('admin', [], pass_hash
, 'admin User',
584 self
.assertGreaterEqual(user
['lastUpdate'], user_orig
['lastUpdate'])
586 def test_set_user_password_nonexistent_user(self
):
587 with self
.assertRaises(CmdException
) as ctx
:
588 self
.exec_cmd('ac-user-set-password', username
='admin',
589 password
='newpass', force_password
=True)
591 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
592 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
594 def test_set_user_password_hash(self
):
595 user_orig
= self
.test_create_user()
596 user
= self
.exec_cmd('ac-user-set-password-hash', username
='admin',
597 hashed_password
='$2b$12$Pt3Vq/rDt2y9glTPSV.'
598 'VFegiLkQeIpddtkhoFetNApYmIJOY8gau2')
599 pass_hash
= password_hash('newpass', user
['password'])
600 self
.assertDictEqual(user
, {
602 'password': pass_hash
,
603 'pwdExpirationDate': None,
604 'pwdUpdateRequired': False,
605 'name': 'admin User',
606 'email': 'admin@user.com',
607 'lastUpdate': user
['lastUpdate'],
611 self
.validate_persistent_user('admin', [], pass_hash
, 'admin User',
613 self
.assertGreaterEqual(user
['lastUpdate'], user_orig
['lastUpdate'])
615 def test_set_user_password_hash_nonexistent_user(self
):
616 with self
.assertRaises(CmdException
) as ctx
:
617 self
.exec_cmd('ac-user-set-password-hash', username
='admin',
618 hashed_password
='$2b$12$Pt3Vq/rDt2y9glTPSV.'
619 'VFegiLkQeIpddtkhoFetNApYmIJOY8gau2')
621 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
622 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
624 def test_set_user_password_hash_broken_hash(self
):
625 self
.test_create_user()
626 with self
.assertRaises(CmdException
) as ctx
:
627 self
.exec_cmd('ac-user-set-password-hash', username
='admin',
630 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EINVAL
)
631 self
.assertEqual(str(ctx
.exception
), 'Invalid password hash')
633 def test_set_login_credentials(self
):
634 self
.exec_cmd('set-login-credentials', username
='admin',
636 user
= self
.exec_cmd('ac-user-show', username
='admin')
637 pass_hash
= password_hash('admin', user
['password'])
638 self
.assertDictEqual(user
, {
640 'password': pass_hash
,
641 'pwdExpirationDate': None,
642 'pwdUpdateRequired': False,
645 'lastUpdate': user
['lastUpdate'],
646 'roles': ['administrator'],
649 self
.validate_persistent_user('admin', ['administrator'], pass_hash
,
652 def test_set_login_credentials_for_existing_user(self
):
653 self
.test_add_user_roles('admin', ['read-only'])
654 self
.exec_cmd('set-login-credentials', username
='admin',
656 user
= self
.exec_cmd('ac-user-show', username
='admin')
657 pass_hash
= password_hash('admin2', user
['password'])
658 self
.assertDictEqual(user
, {
660 'password': pass_hash
,
661 'pwdExpirationDate': None,
662 'pwdUpdateRequired': False,
663 'name': 'admin User',
664 'email': 'admin@user.com',
665 'lastUpdate': user
['lastUpdate'],
666 'roles': ['read-only'],
669 self
.validate_persistent_user('admin', ['read-only'], pass_hash
,
670 'admin User', 'admin@user.com')
672 def test_load_v1(self
):
673 self
.CONFIG_KEY_DICT
['accessdb_v1'] = '''
679 "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
680 "roles": ["block-manager", "test_role"],
681 "name": "admin User",
682 "email": "admin@user.com",
689 "description": "Test Role",
690 "scopes_permissions": {{
698 '''.format(int(round(time
.time())), Scope
.ISCSI
, Permission
.READ
,
699 Permission
.UPDATE
, Scope
.POOL
, Permission
.CREATE
)
701 load_access_control_db()
702 role
= self
.exec_cmd('ac-role-show', rolename
="test_role")
703 self
.assertDictEqual(role
, {
705 'description': "Test Role",
706 'scopes_permissions': {
707 Scope
.ISCSI
: [Permission
.READ
, Permission
.UPDATE
],
708 Scope
.POOL
: [Permission
.CREATE
]
711 user
= self
.exec_cmd('ac-user-show', username
="admin")
712 self
.assertDictEqual(user
, {
714 'lastUpdate': user
['lastUpdate'],
716 "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
717 'pwdExpirationDate': None,
718 'pwdUpdateRequired': False,
719 'name': 'admin User',
720 'email': 'admin@user.com',
721 'roles': ['block-manager', 'test_role'],
725 def test_load_v2(self
):
726 self
.CONFIG_KEY_DICT
['accessdb_v2'] = '''
732 "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
733 "pwdExpirationDate": null,
734 "pwdUpdateRequired": false,
735 "roles": ["block-manager", "test_role"],
736 "name": "admin User",
737 "email": "admin@user.com",
745 "description": "Test Role",
746 "scopes_permissions": {{
754 '''.format(int(round(time
.time())), Scope
.ISCSI
, Permission
.READ
,
755 Permission
.UPDATE
, Scope
.POOL
, Permission
.CREATE
)
757 load_access_control_db()
758 role
= self
.exec_cmd('ac-role-show', rolename
="test_role")
759 self
.assertDictEqual(role
, {
761 'description': "Test Role",
762 'scopes_permissions': {
763 Scope
.ISCSI
: [Permission
.READ
, Permission
.UPDATE
],
764 Scope
.POOL
: [Permission
.CREATE
]
767 user
= self
.exec_cmd('ac-user-show', username
="admin")
768 self
.assertDictEqual(user
, {
770 'lastUpdate': user
['lastUpdate'],
772 "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
773 'pwdExpirationDate': None,
774 'pwdUpdateRequired': False,
775 'name': 'admin User',
776 'email': 'admin@user.com',
777 'roles': ['block-manager', 'test_role'],
781 def test_update_from_previous_version_v1(self
):
782 self
.CONFIG_KEY_DICT
['username'] = 'admin'
783 self
.CONFIG_KEY_DICT
['password'] = \
784 '$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK'
785 load_access_control_db()
786 user
= self
.exec_cmd('ac-user-show', username
="admin")
787 self
.assertDictEqual(user
, {
789 'lastUpdate': user
['lastUpdate'],
791 "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
792 'pwdExpirationDate': None,
793 'pwdUpdateRequired': False,
796 'roles': ['administrator'],
800 def test_password_policy_pw_length(self
):
801 Settings
.PWD_POLICY_CHECK_LENGTH_ENABLED
= True
802 Settings
.PWD_POLICY_MIN_LENGTH
= 3
803 pw_policy
= PasswordPolicy('foo')
804 self
.assertTrue(pw_policy
.check_password_length())
806 def test_password_policy_pw_length_fail(self
):
807 Settings
.PWD_POLICY_CHECK_LENGTH_ENABLED
= True
808 pw_policy
= PasswordPolicy('bar')
809 self
.assertFalse(pw_policy
.check_password_length())
811 def test_password_policy_credits_too_weak(self
):
812 Settings
.PWD_POLICY_CHECK_COMPLEXITY_ENABLED
= True
813 pw_policy
= PasswordPolicy('foo')
814 pw_credits
= pw_policy
.check_password_complexity()
815 self
.assertEqual(pw_credits
, 3)
817 def test_password_policy_credits_weak(self
):
818 Settings
.PWD_POLICY_CHECK_COMPLEXITY_ENABLED
= True
819 pw_policy
= PasswordPolicy('mypassword1')
820 pw_credits
= pw_policy
.check_password_complexity()
821 self
.assertEqual(pw_credits
, 11)
823 def test_password_policy_credits_ok(self
):
824 Settings
.PWD_POLICY_CHECK_COMPLEXITY_ENABLED
= True
825 pw_policy
= PasswordPolicy('mypassword1!@')
826 pw_credits
= pw_policy
.check_password_complexity()
827 self
.assertEqual(pw_credits
, 17)
829 def test_password_policy_credits_strong(self
):
830 Settings
.PWD_POLICY_CHECK_COMPLEXITY_ENABLED
= True
831 pw_policy
= PasswordPolicy('testpassword0047!@')
832 pw_credits
= pw_policy
.check_password_complexity()
833 self
.assertEqual(pw_credits
, 22)
835 def test_password_policy_credits_very_strong(self
):
836 Settings
.PWD_POLICY_CHECK_COMPLEXITY_ENABLED
= True
837 pw_policy
= PasswordPolicy('testpassword#!$!@$')
838 pw_credits
= pw_policy
.check_password_complexity()
839 self
.assertEqual(pw_credits
, 30)
841 def test_password_policy_forbidden_words(self
):
842 Settings
.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED
= True
843 pw_policy
= PasswordPolicy('!@$testdashboard#!$')
844 self
.assertTrue(pw_policy
.check_if_contains_forbidden_words())
846 def test_password_policy_forbidden_words_custom(self
):
847 Settings
.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED
= True
848 Settings
.PWD_POLICY_EXCLUSION_LIST
= 'foo,bar'
849 pw_policy
= PasswordPolicy('foo123bar')
850 self
.assertTrue(pw_policy
.check_if_contains_forbidden_words())
852 def test_password_policy_sequential_chars(self
):
853 Settings
.PWD_POLICY_CHECK_SEQUENTIAL_CHARS_ENABLED
= True
854 pw_policy
= PasswordPolicy('!@$test123#!$')
855 self
.assertTrue(pw_policy
.check_if_sequential_characters())
857 def test_password_policy_repetitive_chars(self
):
858 Settings
.PWD_POLICY_CHECK_REPETITIVE_CHARS_ENABLED
= True
859 pw_policy
= PasswordPolicy('!@$testfooo#!$')
860 self
.assertTrue(pw_policy
.check_if_repetitive_characters())
862 def test_password_policy_contain_username(self
):
863 Settings
.PWD_POLICY_CHECK_USERNAME_ENABLED
= True
864 pw_policy
= PasswordPolicy('%admin135)', 'admin')
865 self
.assertTrue(pw_policy
.check_if_contains_username())
867 def test_password_policy_is_old_pwd(self
):
868 Settings
.PWD_POLICY_CHECK_OLDPWD_ENABLED
= True
869 pw_policy
= PasswordPolicy('foo', old_password
='foo')
870 self
.assertTrue(pw_policy
.check_is_old_password())