1 # -*- coding: utf-8 -*-
2 # pylint: disable=dangerous-default-value,too-many-public-methods
3 from __future__
import absolute_import
9 from datetime
import datetime
, timedelta
11 from mgr_module
import ERROR_MSG_EMPTY_INPUT_FILE
14 from ..security
import Permission
, Scope
15 from ..services
.access_control
import SYSTEM_ROLES
, AccessControlDB
, \
16 PasswordPolicy
, load_access_control_db
, password_hash
17 from ..settings
import Settings
18 from . import CLICommandTestMixin
, CmdException
# pylint: disable=no-name-in-module
21 class AccessControlTest(unittest
.TestCase
, CLICommandTestMixin
):
26 mgr
.ACCESS_CONTROL_DB
= None
29 self
.CONFIG_KEY_DICT
.clear()
30 load_access_control_db()
32 def load_persistent_db(self
):
33 config_key
= AccessControlDB
.accessdb_config_key()
34 self
.assertIn(config_key
, self
.CONFIG_KEY_DICT
)
35 db_json
= self
.CONFIG_KEY_DICT
[config_key
]
36 db
= json
.loads(db_json
)
39 # The DB is written to persistent storage the first time it is saved.
40 # However, should an operation fail due to <reasons>, we may end up in
41 # a state where we have a completely empty CONFIG_KEY_DICT (our mock
42 # equivalent to the persistent state). While this works for most of the
43 # tests in this class, that would prevent us from testing things like
44 # "run a command that is expected to fail, and then ensure nothing
45 # happened", because we'd be asserting in `load_persistent_db()` due to
46 # the map being empty.
48 # This function will therefore force state to be written to our mock
49 # persistent state. We could have added this extra step to
50 # `load_persistent_db()` directly, but that would conflict with the
51 # upgrade tests. This way, we can selectively enforce this requirement
52 # where we believe it to be necessary; generically speaking, this should
53 # not be needed unless we're testing very specific behaviors.
55 def setup_and_load_persistent_db(self
):
56 mgr
.ACCESS_CTRL_DB
.save()
57 self
.load_persistent_db()
59 def validate_persistent_role(self
, rolename
, scopes_permissions
,
61 db
= self
.load_persistent_db()
62 self
.assertIn('roles', db
)
63 self
.assertIn(rolename
, db
['roles'])
64 self
.assertEqual(db
['roles'][rolename
]['name'], rolename
)
65 self
.assertEqual(db
['roles'][rolename
]['description'], description
)
66 self
.assertDictEqual(db
['roles'][rolename
]['scopes_permissions'],
69 def validate_persistent_no_role(self
, rolename
):
70 db
= self
.load_persistent_db()
71 self
.assertIn('roles', db
)
72 self
.assertNotIn(rolename
, db
['roles'])
74 def validate_persistent_user(self
, username
, roles
, password
=None,
75 name
=None, email
=None, last_update
=None,
76 enabled
=True, pwdExpirationDate
=None):
77 db
= self
.load_persistent_db()
78 self
.assertIn('users', db
)
79 self
.assertIn(username
, db
['users'])
80 self
.assertEqual(db
['users'][username
]['username'], username
)
81 self
.assertListEqual(db
['users'][username
]['roles'], roles
)
83 self
.assertEqual(db
['users'][username
]['password'], password
)
85 self
.assertEqual(db
['users'][username
]['name'], name
)
87 self
.assertEqual(db
['users'][username
]['email'], email
)
89 self
.assertEqual(db
['users'][username
]['lastUpdate'], last_update
)
91 self
.assertEqual(db
['users'][username
]['pwdExpirationDate'], pwdExpirationDate
)
92 self
.assertEqual(db
['users'][username
]['enabled'], enabled
)
94 def validate_persistent_no_user(self
, username
):
95 db
= self
.load_persistent_db()
96 self
.assertIn('users', db
)
97 self
.assertNotIn(username
, db
['users'])
99 def test_create_role(self
):
100 role
= self
.exec_cmd('ac-role-create', rolename
='test_role')
101 self
.assertDictEqual(role
, {'name': 'test_role', 'description': None,
102 'scopes_permissions': {}})
103 self
.validate_persistent_role('test_role', {})
105 def test_create_role_with_desc(self
):
106 role
= self
.exec_cmd('ac-role-create', rolename
='test_role',
107 description
='Test Role')
108 self
.assertDictEqual(role
, {'name': 'test_role',
109 'description': 'Test Role',
110 'scopes_permissions': {}})
111 self
.validate_persistent_role('test_role', {}, 'Test Role')
113 def test_create_duplicate_role(self
):
114 self
.test_create_role()
116 with self
.assertRaises(CmdException
) as ctx
:
117 self
.exec_cmd('ac-role-create', rolename
='test_role')
119 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EEXIST
)
120 self
.assertEqual(str(ctx
.exception
), "Role 'test_role' already exists")
122 def test_delete_role(self
):
123 self
.test_create_role()
124 out
= self
.exec_cmd('ac-role-delete', rolename
='test_role')
125 self
.assertEqual(out
, "Role 'test_role' deleted")
126 self
.validate_persistent_no_role('test_role')
128 def test_delete_nonexistent_role(self
):
129 with self
.assertRaises(CmdException
) as ctx
:
130 self
.exec_cmd('ac-role-delete', rolename
='test_role')
132 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
133 self
.assertEqual(str(ctx
.exception
), "Role 'test_role' does not exist")
135 def test_show_single_role(self
):
136 self
.test_create_role()
137 role
= self
.exec_cmd('ac-role-show', rolename
='test_role')
138 self
.assertDictEqual(role
, {'name': 'test_role', 'description': None,
139 'scopes_permissions': {}})
141 def test_show_nonexistent_role(self
):
142 with self
.assertRaises(CmdException
) as ctx
:
143 self
.exec_cmd('ac-role-show', rolename
='test_role')
145 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
146 self
.assertEqual(str(ctx
.exception
), "Role 'test_role' does not exist")
148 def test_show_system_roles(self
):
149 roles
= self
.exec_cmd('ac-role-show')
150 self
.assertEqual(len(roles
), len(SYSTEM_ROLES
))
152 self
.assertIn(role
, SYSTEM_ROLES
)
154 def test_show_system_role(self
):
155 role
= self
.exec_cmd('ac-role-show', rolename
="read-only")
156 self
.assertEqual(role
['name'], 'read-only')
159 'allows read permission for all security scope except dashboard settings and config-opt'
162 def test_delete_system_role(self
):
163 with self
.assertRaises(CmdException
) as ctx
:
164 self
.exec_cmd('ac-role-delete', rolename
='administrator')
166 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EPERM
)
167 self
.assertEqual(str(ctx
.exception
),
168 "Cannot delete system role 'administrator'")
170 def test_add_role_scope_perms(self
):
171 self
.test_create_role()
172 self
.exec_cmd('ac-role-add-scope-perms', rolename
='test_role',
173 scopename
=Scope
.POOL
,
174 permissions
=[Permission
.READ
, Permission
.DELETE
])
175 role
= self
.exec_cmd('ac-role-show', rolename
='test_role')
176 self
.assertDictEqual(role
, {'name': 'test_role',
178 'scopes_permissions': {
179 Scope
.POOL
: [Permission
.DELETE
,
182 self
.validate_persistent_role('test_role', {
183 Scope
.POOL
: [Permission
.DELETE
, Permission
.READ
]
186 def test_del_role_scope_perms(self
):
187 self
.test_add_role_scope_perms()
188 self
.exec_cmd('ac-role-add-scope-perms', rolename
='test_role',
189 scopename
=Scope
.MONITOR
,
190 permissions
=[Permission
.READ
, Permission
.CREATE
])
191 self
.validate_persistent_role('test_role', {
192 Scope
.POOL
: [Permission
.DELETE
, Permission
.READ
],
193 Scope
.MONITOR
: [Permission
.CREATE
, Permission
.READ
]
195 self
.exec_cmd('ac-role-del-scope-perms', rolename
='test_role',
196 scopename
=Scope
.POOL
)
197 role
= self
.exec_cmd('ac-role-show', rolename
='test_role')
198 self
.assertDictEqual(role
, {'name': 'test_role',
200 'scopes_permissions': {
201 Scope
.MONITOR
: [Permission
.CREATE
,
204 self
.validate_persistent_role('test_role', {
205 Scope
.MONITOR
: [Permission
.CREATE
, Permission
.READ
]
208 def test_add_role_scope_perms_nonexistent_role(self
):
210 with self
.assertRaises(CmdException
) as ctx
:
211 self
.exec_cmd('ac-role-add-scope-perms', rolename
='test_role',
213 permissions
=['read', 'delete'])
215 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
216 self
.assertEqual(str(ctx
.exception
), "Role 'test_role' does not exist")
218 def test_add_role_invalid_scope_perms(self
):
219 self
.test_create_role()
221 with self
.assertRaises(CmdException
) as ctx
:
222 self
.exec_cmd('ac-role-add-scope-perms', rolename
='test_role',
223 scopename
='invalidscope',
224 permissions
=['read', 'delete'])
226 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EINVAL
)
227 self
.assertEqual(str(ctx
.exception
),
228 "Scope 'invalidscope' is not valid\n Possible values: "
229 "{}".format(Scope
.all_scopes()))
231 def test_add_role_scope_invalid_perms(self
):
232 self
.test_create_role()
234 with self
.assertRaises(CmdException
) as ctx
:
235 self
.exec_cmd('ac-role-add-scope-perms', rolename
='test_role',
236 scopename
='pool', permissions
=['invalidperm'])
238 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EINVAL
)
239 self
.assertEqual(str(ctx
.exception
),
240 "Permission 'invalidperm' is not valid\n Possible "
241 "values: {}".format(Permission
.all_permissions()))
243 def test_del_role_scope_perms_nonexistent_role(self
):
245 with self
.assertRaises(CmdException
) as ctx
:
246 self
.exec_cmd('ac-role-del-scope-perms', rolename
='test_role',
249 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
250 self
.assertEqual(str(ctx
.exception
), "Role 'test_role' does not exist")
252 def test_del_role_nonexistent_scope_perms(self
):
253 self
.test_add_role_scope_perms()
255 with self
.assertRaises(CmdException
) as ctx
:
256 self
.exec_cmd('ac-role-del-scope-perms', rolename
='test_role',
257 scopename
='nonexistentscope')
259 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
260 self
.assertEqual(str(ctx
.exception
),
261 "There are no permissions for scope 'nonexistentscope' "
262 "in role 'test_role'")
264 def test_not_permitted_add_role_scope_perms(self
):
265 with self
.assertRaises(CmdException
) as ctx
:
266 self
.exec_cmd('ac-role-add-scope-perms', rolename
='read-only',
267 scopename
='pool', permissions
=['read', 'delete'])
269 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EPERM
)
270 self
.assertEqual(str(ctx
.exception
),
271 "Cannot update system role 'read-only'")
273 def test_not_permitted_del_role_scope_perms(self
):
274 with self
.assertRaises(CmdException
) as ctx
:
275 self
.exec_cmd('ac-role-del-scope-perms', rolename
='read-only',
278 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EPERM
)
279 self
.assertEqual(str(ctx
.exception
),
280 "Cannot update system role 'read-only'")
282 def test_create_user(self
, username
='admin', rolename
=None, enabled
=True,
283 pwdExpirationDate
=None):
284 user
= self
.exec_cmd('ac-user-create', username
=username
,
285 rolename
=rolename
, inbuf
='admin',
286 name
='{} User'.format(username
),
287 email
='{}@user.com'.format(username
),
288 enabled
=enabled
, force_password
=True,
289 pwd_expiration_date
=pwdExpirationDate
)
291 pass_hash
= password_hash('admin', user
['password'])
292 self
.assertDictEqual(user
, {
293 'username': username
,
294 'password': pass_hash
,
295 'pwdExpirationDate': pwdExpirationDate
,
296 'pwdUpdateRequired': False,
297 'lastUpdate': user
['lastUpdate'],
298 'name': '{} User'.format(username
),
299 'email': '{}@user.com'.format(username
),
300 'roles': [rolename
] if rolename
else [],
303 self
.validate_persistent_user(username
, [rolename
] if rolename
else [],
304 pass_hash
, '{} User'.format(username
),
305 '{}@user.com'.format(username
),
306 user
['lastUpdate'], enabled
)
309 def test_create_disabled_user(self
):
310 self
.test_create_user(enabled
=False)
312 def test_create_user_pwd_expiration_date(self
):
313 expiration_date
= datetime
.utcnow() + timedelta(days
=10)
314 expiration_date
= int(time
.mktime(expiration_date
.timetuple()))
315 self
.test_create_user(pwdExpirationDate
=expiration_date
)
317 def test_create_user_with_role(self
):
318 self
.test_add_role_scope_perms()
319 self
.test_create_user(rolename
='test_role')
321 def test_create_user_with_system_role(self
):
322 self
.test_create_user(rolename
='administrator')
324 def test_delete_user(self
):
325 self
.test_create_user()
326 out
= self
.exec_cmd('ac-user-delete', username
='admin')
327 self
.assertEqual(out
, "User 'admin' deleted")
328 users
= self
.exec_cmd('ac-user-show')
329 self
.assertEqual(len(users
), 0)
330 self
.validate_persistent_no_user('admin')
332 def test_create_duplicate_user(self
):
333 self
.test_create_user()
334 ret
= self
.exec_cmd('ac-user-create', username
='admin', inbuf
='admin',
336 self
.assertEqual(ret
, "User 'admin' already exists")
338 def test_create_users_with_dne_role(self
):
339 # one time call to setup our persistent db
340 self
.setup_and_load_persistent_db()
342 # create a user with a role that does not exist; expect a failure
344 self
.exec_cmd('ac-user-create', username
='foo',
345 rolename
='dne_role', inbuf
='foopass',
346 name
='foo User', email
='foo@user.com',
348 except CmdException
as e
:
349 self
.assertEqual(e
.retcode
, -errno
.ENOENT
)
351 db
= self
.load_persistent_db()
353 self
.assertNotIn('foo', db
['users'])
355 # We could just finish our test here, given we ensured that the user
356 # with a non-existent role is not in persistent storage. However,
357 # we're going to test the database's consistency, making sure that
358 # side-effects are not written to persistent storage once we commit
359 # an unrelated operation. To ensure this, we'll issue another
360 # operation that is sharing the same code path, and will check whether
361 # the next operation commits dirty state.
363 # create a role (this will be 'test_role')
364 self
.test_create_role()
365 self
.exec_cmd('ac-user-create', username
='bar',
366 rolename
='test_role', inbuf
='barpass',
367 name
='bar User', email
='bar@user.com',
371 # user 'foo' should not exist
372 # user 'bar' should exist and have role 'test_role'
373 self
.validate_persistent_user('bar', ['test_role'])
375 db
= self
.load_persistent_db()
376 self
.assertIn('users', db
)
377 self
.assertNotIn('foo', db
['users'])
379 def test_delete_nonexistent_user(self
):
380 with self
.assertRaises(CmdException
) as ctx
:
381 self
.exec_cmd('ac-user-delete', username
='admin')
383 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
384 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
386 def test_add_user_roles(self
, username
='admin',
387 roles
=['pool-manager', 'block-manager']):
388 user_orig
= self
.test_create_user(username
)
393 user
= self
.exec_cmd('ac-user-add-roles', username
=username
,
395 self
.assertLessEqual(uroles
, user
['roles'])
396 self
.validate_persistent_user(username
, uroles
)
397 self
.assertGreaterEqual(user
['lastUpdate'], user_orig
['lastUpdate'])
399 def test_add_user_roles2(self
):
400 user_orig
= self
.test_create_user()
401 user
= self
.exec_cmd('ac-user-add-roles', username
="admin",
402 roles
=['pool-manager', 'block-manager'])
403 self
.assertLessEqual(['block-manager', 'pool-manager'],
405 self
.validate_persistent_user('admin', ['block-manager',
407 self
.assertGreaterEqual(user
['lastUpdate'], user_orig
['lastUpdate'])
409 def test_add_user_roles_not_existent_user(self
):
410 with self
.assertRaises(CmdException
) as ctx
:
411 self
.exec_cmd('ac-user-add-roles', username
="admin",
412 roles
=['pool-manager', 'block-manager'])
414 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
415 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
417 def test_add_user_roles_not_existent_role(self
):
418 self
.test_create_user()
419 with self
.assertRaises(CmdException
) as ctx
:
420 self
.exec_cmd('ac-user-add-roles', username
="admin",
421 roles
=['Invalid Role'])
423 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
424 self
.assertEqual(str(ctx
.exception
),
425 "Role 'Invalid Role' does not exist")
427 def test_set_user_roles(self
):
428 user_orig
= self
.test_create_user()
429 user
= self
.exec_cmd('ac-user-add-roles', username
="admin",
430 roles
=['pool-manager'])
431 self
.assertLessEqual(['pool-manager'], user
['roles'])
432 self
.validate_persistent_user('admin', ['pool-manager'])
433 self
.assertGreaterEqual(user
['lastUpdate'], user_orig
['lastUpdate'])
434 user2
= self
.exec_cmd('ac-user-set-roles', username
="admin",
435 roles
=['rgw-manager', 'block-manager'])
436 self
.assertLessEqual(['block-manager', 'rgw-manager'],
438 self
.validate_persistent_user('admin', ['block-manager',
440 self
.assertGreaterEqual(user2
['lastUpdate'], user
['lastUpdate'])
442 def test_set_user_roles_not_existent_user(self
):
443 with self
.assertRaises(CmdException
) as ctx
:
444 self
.exec_cmd('ac-user-set-roles', username
="admin",
445 roles
=['pool-manager', 'block-manager'])
447 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
448 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
450 def test_set_user_roles_not_existent_role(self
):
451 self
.test_create_user()
452 with self
.assertRaises(CmdException
) as ctx
:
453 self
.exec_cmd('ac-user-set-roles', username
="admin",
454 roles
=['Invalid Role'])
456 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
457 self
.assertEqual(str(ctx
.exception
),
458 "Role 'Invalid Role' does not exist")
460 def test_del_user_roles(self
):
461 self
.test_add_user_roles()
462 user
= self
.exec_cmd('ac-user-del-roles', username
="admin",
463 roles
=['pool-manager'])
464 self
.assertLessEqual(['block-manager'], user
['roles'])
465 self
.validate_persistent_user('admin', ['block-manager'])
467 def test_del_user_roles_not_existent_user(self
):
468 with self
.assertRaises(CmdException
) as ctx
:
469 self
.exec_cmd('ac-user-del-roles', username
="admin",
470 roles
=['pool-manager', 'block-manager'])
472 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
473 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
475 def test_del_user_roles_not_existent_role(self
):
476 self
.test_create_user()
477 with self
.assertRaises(CmdException
) as ctx
:
478 self
.exec_cmd('ac-user-del-roles', username
="admin",
479 roles
=['Invalid Role'])
481 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
482 self
.assertEqual(str(ctx
.exception
),
483 "Role 'Invalid Role' does not exist")
485 def test_del_user_roles_not_associated_role(self
):
486 self
.test_create_user()
487 with self
.assertRaises(CmdException
) as ctx
:
488 self
.exec_cmd('ac-user-del-roles', username
="admin",
489 roles
=['rgw-manager'])
491 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
492 self
.assertEqual(str(ctx
.exception
),
493 "Role 'rgw-manager' is not associated with user "
496 def test_show_user(self
):
497 self
.test_add_user_roles()
498 user
= self
.exec_cmd('ac-user-show', username
='admin')
499 pass_hash
= password_hash('admin', user
['password'])
500 self
.assertDictEqual(user
, {
502 'lastUpdate': user
['lastUpdate'],
503 'password': pass_hash
,
504 'pwdExpirationDate': None,
505 'pwdUpdateRequired': False,
506 'name': 'admin User',
507 'email': 'admin@user.com',
508 'roles': ['block-manager', 'pool-manager'],
512 def test_show_nonexistent_user(self
):
513 with self
.assertRaises(CmdException
) as ctx
:
514 self
.exec_cmd('ac-user-show', username
='admin')
516 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
517 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
519 def test_show_all_users(self
):
520 self
.test_add_user_roles('admin', ['administrator'])
521 self
.test_add_user_roles('guest', ['read-only'])
522 users
= self
.exec_cmd('ac-user-show')
523 self
.assertEqual(len(users
), 2)
525 self
.assertIn(user
, ['admin', 'guest'])
527 def test_del_role_associated_with_user(self
):
528 self
.test_create_role()
529 self
.test_add_user_roles('guest', ['test_role'])
531 with self
.assertRaises(CmdException
) as ctx
:
532 self
.exec_cmd('ac-role-delete', rolename
='test_role')
534 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EPERM
)
535 self
.assertEqual(str(ctx
.exception
),
536 "Role 'test_role' is still associated with user "
539 def test_set_user_info(self
):
540 user_orig
= self
.test_create_user()
541 user
= self
.exec_cmd('ac-user-set-info', username
='admin',
542 name
='Admin Name', email
='admin@admin.com')
543 pass_hash
= password_hash('admin', user
['password'])
544 self
.assertDictEqual(user
, {
546 'password': pass_hash
,
547 'pwdExpirationDate': None,
548 'pwdUpdateRequired': False,
549 'name': 'Admin Name',
550 'email': 'admin@admin.com',
551 'lastUpdate': user
['lastUpdate'],
555 self
.validate_persistent_user('admin', [], pass_hash
, 'Admin Name',
557 self
.assertEqual(user
['lastUpdate'], user_orig
['lastUpdate'])
559 def test_set_user_info_nonexistent_user(self
):
560 with self
.assertRaises(CmdException
) as ctx
:
561 self
.exec_cmd('ac-user-set-info', username
='admin',
562 name
='Admin Name', email
='admin@admin.com')
564 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
565 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
567 def test_set_user_password(self
):
568 user_orig
= self
.test_create_user()
569 user
= self
.exec_cmd('ac-user-set-password', username
='admin',
570 inbuf
='newpass', force_password
=True)
571 pass_hash
= password_hash('newpass', user
['password'])
572 self
.assertDictEqual(user
, {
574 'password': pass_hash
,
575 'pwdExpirationDate': None,
576 'pwdUpdateRequired': False,
577 'name': 'admin User',
578 'email': 'admin@user.com',
579 'lastUpdate': user
['lastUpdate'],
583 self
.validate_persistent_user('admin', [], pass_hash
, 'admin User',
585 self
.assertGreaterEqual(user
['lastUpdate'], user_orig
['lastUpdate'])
587 def test_sanitize_password(self
):
588 self
.test_create_user()
589 password
= 'myPass\\n\\r\\n'
590 with
open('/tmp/test_sanitize_password.txt', 'w+') as pwd_file
:
591 # Add new line separators (like some text editors when a file is saved).
592 pwd_file
.write('{}{}'.format(password
, '\n\r\n\n'))
594 user
= self
.exec_cmd('ac-user-set-password', username
='admin',
595 inbuf
=pwd_file
.read(), force_password
=True)
596 pass_hash
= password_hash(password
, user
['password'])
597 self
.assertEqual(user
['password'], pass_hash
)
599 def test_set_user_password_nonexistent_user(self
):
600 with self
.assertRaises(CmdException
) as ctx
:
601 self
.exec_cmd('ac-user-set-password', username
='admin',
602 inbuf
='newpass', force_password
=True)
604 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
605 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
607 def test_set_user_password_empty(self
):
608 with self
.assertRaises(CmdException
) as ctx
:
609 self
.exec_cmd('ac-user-set-password', username
='admin', inbuf
='\n',
612 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EINVAL
)
613 self
.assertEqual(str(ctx
.exception
), ERROR_MSG_EMPTY_INPUT_FILE
)
615 def test_set_user_password_hash(self
):
616 user_orig
= self
.test_create_user()
617 user
= self
.exec_cmd('ac-user-set-password-hash', username
='admin',
618 inbuf
='$2b$12$Pt3Vq/rDt2y9glTPSV.VFegiLkQeIpddtkhoFetNApYmIJOY8gau2')
619 pass_hash
= password_hash('newpass', user
['password'])
620 self
.assertDictEqual(user
, {
622 'password': pass_hash
,
623 'pwdExpirationDate': None,
624 'pwdUpdateRequired': False,
625 'name': 'admin User',
626 'email': 'admin@user.com',
627 'lastUpdate': user
['lastUpdate'],
631 self
.validate_persistent_user('admin', [], pass_hash
, 'admin User',
633 self
.assertGreaterEqual(user
['lastUpdate'], user_orig
['lastUpdate'])
635 def test_set_user_password_hash_nonexistent_user(self
):
636 with self
.assertRaises(CmdException
) as ctx
:
637 self
.exec_cmd('ac-user-set-password-hash', username
='admin',
638 inbuf
='$2b$12$Pt3Vq/rDt2y9glTPSV.VFegiLkQeIpddtkhoFetNApYmIJOY8gau2')
640 self
.assertEqual(ctx
.exception
.retcode
, -errno
.ENOENT
)
641 self
.assertEqual(str(ctx
.exception
), "User 'admin' does not exist")
643 def test_set_user_password_hash_broken_hash(self
):
644 self
.test_create_user()
645 with self
.assertRaises(CmdException
) as ctx
:
646 self
.exec_cmd('ac-user-set-password-hash', username
='admin',
649 self
.assertEqual(ctx
.exception
.retcode
, -errno
.EINVAL
)
650 self
.assertEqual(str(ctx
.exception
), 'Invalid password hash')
652 def test_set_login_credentials(self
):
653 self
.exec_cmd('set-login-credentials', username
='admin',
655 user
= self
.exec_cmd('ac-user-show', username
='admin')
656 pass_hash
= password_hash('admin', user
['password'])
657 self
.assertDictEqual(user
, {
659 'password': pass_hash
,
660 'pwdExpirationDate': None,
661 'pwdUpdateRequired': False,
664 'lastUpdate': user
['lastUpdate'],
665 'roles': ['administrator'],
668 self
.validate_persistent_user('admin', ['administrator'], pass_hash
,
671 def test_set_login_credentials_for_existing_user(self
):
672 self
.test_add_user_roles('admin', ['read-only'])
673 self
.exec_cmd('set-login-credentials', username
='admin',
675 user
= self
.exec_cmd('ac-user-show', username
='admin')
676 pass_hash
= password_hash('admin2', user
['password'])
677 self
.assertDictEqual(user
, {
679 'password': pass_hash
,
680 'pwdExpirationDate': None,
681 'pwdUpdateRequired': False,
682 'name': 'admin User',
683 'email': 'admin@user.com',
684 'lastUpdate': user
['lastUpdate'],
685 'roles': ['read-only'],
688 self
.validate_persistent_user('admin', ['read-only'], pass_hash
,
689 'admin User', 'admin@user.com')
691 def test_load_v1(self
):
692 self
.CONFIG_KEY_DICT
['accessdb_v1'] = '''
698 "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
699 "roles": ["block-manager", "test_role"],
700 "name": "admin User",
701 "email": "admin@user.com",
708 "description": "Test Role",
709 "scopes_permissions": {{
717 '''.format(int(round(time
.time())), Scope
.ISCSI
, Permission
.READ
,
718 Permission
.UPDATE
, Scope
.POOL
, Permission
.CREATE
)
720 load_access_control_db()
721 role
= self
.exec_cmd('ac-role-show', rolename
="test_role")
722 self
.assertDictEqual(role
, {
724 'description': "Test Role",
725 'scopes_permissions': {
726 Scope
.ISCSI
: [Permission
.READ
, Permission
.UPDATE
],
727 Scope
.POOL
: [Permission
.CREATE
]
730 user
= self
.exec_cmd('ac-user-show', username
="admin")
731 self
.assertDictEqual(user
, {
733 'lastUpdate': user
['lastUpdate'],
735 "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
736 'pwdExpirationDate': None,
737 'pwdUpdateRequired': False,
738 'name': 'admin User',
739 'email': 'admin@user.com',
740 'roles': ['block-manager', 'test_role'],
744 def test_load_v2(self
):
745 self
.CONFIG_KEY_DICT
['accessdb_v2'] = '''
751 "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
752 "pwdExpirationDate": null,
753 "pwdUpdateRequired": false,
754 "roles": ["block-manager", "test_role"],
755 "name": "admin User",
756 "email": "admin@user.com",
764 "description": "Test Role",
765 "scopes_permissions": {{
773 '''.format(int(round(time
.time())), Scope
.ISCSI
, Permission
.READ
,
774 Permission
.UPDATE
, Scope
.POOL
, Permission
.CREATE
)
776 load_access_control_db()
777 role
= self
.exec_cmd('ac-role-show', rolename
="test_role")
778 self
.assertDictEqual(role
, {
780 'description': "Test Role",
781 'scopes_permissions': {
782 Scope
.ISCSI
: [Permission
.READ
, Permission
.UPDATE
],
783 Scope
.POOL
: [Permission
.CREATE
]
786 user
= self
.exec_cmd('ac-user-show', username
="admin")
787 self
.assertDictEqual(user
, {
789 'lastUpdate': user
['lastUpdate'],
791 "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
792 'pwdExpirationDate': None,
793 'pwdUpdateRequired': False,
794 'name': 'admin User',
795 'email': 'admin@user.com',
796 'roles': ['block-manager', 'test_role'],
800 def test_password_policy_pw_length(self
):
801 Settings
.PWD_POLICY_CHECK_LENGTH_ENABLED
= True
802 Settings
.PWD_POLICY_MIN_LENGTH
= 3
803 pw_policy
= PasswordPolicy('foo')
804 self
.assertTrue(pw_policy
.check_password_length())
806 def test_password_policy_pw_length_fail(self
):
807 Settings
.PWD_POLICY_CHECK_LENGTH_ENABLED
= True
808 pw_policy
= PasswordPolicy('bar')
809 self
.assertFalse(pw_policy
.check_password_length())
811 def test_password_policy_credits_too_weak(self
):
812 Settings
.PWD_POLICY_CHECK_COMPLEXITY_ENABLED
= True
813 pw_policy
= PasswordPolicy('foo')
814 pw_credits
= pw_policy
.check_password_complexity()
815 self
.assertEqual(pw_credits
, 3)
817 def test_password_policy_credits_weak(self
):
818 Settings
.PWD_POLICY_CHECK_COMPLEXITY_ENABLED
= True
819 pw_policy
= PasswordPolicy('mypassword1')
820 pw_credits
= pw_policy
.check_password_complexity()
821 self
.assertEqual(pw_credits
, 11)
823 def test_password_policy_credits_ok(self
):
824 Settings
.PWD_POLICY_CHECK_COMPLEXITY_ENABLED
= True
825 pw_policy
= PasswordPolicy('mypassword1!@')
826 pw_credits
= pw_policy
.check_password_complexity()
827 self
.assertEqual(pw_credits
, 17)
829 def test_password_policy_credits_strong(self
):
830 Settings
.PWD_POLICY_CHECK_COMPLEXITY_ENABLED
= True
831 pw_policy
= PasswordPolicy('testpassword0047!@')
832 pw_credits
= pw_policy
.check_password_complexity()
833 self
.assertEqual(pw_credits
, 22)
835 def test_password_policy_credits_very_strong(self
):
836 Settings
.PWD_POLICY_CHECK_COMPLEXITY_ENABLED
= True
837 pw_policy
= PasswordPolicy('testpassword#!$!@$')
838 pw_credits
= pw_policy
.check_password_complexity()
839 self
.assertEqual(pw_credits
, 30)
841 def test_password_policy_forbidden_words(self
):
842 Settings
.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED
= True
843 pw_policy
= PasswordPolicy('!@$testdashboard#!$')
844 self
.assertTrue(pw_policy
.check_if_contains_forbidden_words())
846 def test_password_policy_forbidden_words_custom(self
):
847 Settings
.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED
= True
848 Settings
.PWD_POLICY_EXCLUSION_LIST
= 'foo,bar'
849 pw_policy
= PasswordPolicy('foo123bar')
850 self
.assertTrue(pw_policy
.check_if_contains_forbidden_words())
852 def test_password_policy_sequential_chars(self
):
853 Settings
.PWD_POLICY_CHECK_SEQUENTIAL_CHARS_ENABLED
= True
854 pw_policy
= PasswordPolicy('!@$test123#!$')
855 self
.assertTrue(pw_policy
.check_if_sequential_characters())
857 def test_password_policy_repetitive_chars(self
):
858 Settings
.PWD_POLICY_CHECK_REPETITIVE_CHARS_ENABLED
= True
859 pw_policy
= PasswordPolicy('!@$testfooo#!$')
860 self
.assertTrue(pw_policy
.check_if_repetitive_characters())
862 def test_password_policy_contain_username(self
):
863 Settings
.PWD_POLICY_CHECK_USERNAME_ENABLED
= True
864 pw_policy
= PasswordPolicy('%admin135)', 'admin')
865 self
.assertTrue(pw_policy
.check_if_contains_username())
867 def test_password_policy_is_old_pwd(self
):
868 Settings
.PWD_POLICY_CHECK_OLDPWD_ENABLED
= True
869 pw_policy
= PasswordPolicy('foo', old_password
='foo')
870 self
.assertTrue(pw_policy
.check_is_old_password())