]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/dashboard/tests/test_access_control.py
import ceph pacific 16.2.5
[ceph.git] / ceph / src / pybind / mgr / dashboard / tests / test_access_control.py
CommitLineData
11fdf7f2
TL
1# -*- coding: utf-8 -*-
2# pylint: disable=dangerous-default-value,too-many-public-methods
3from __future__ import absolute_import
4
5import errno
6import json
7import time
8import unittest
9f95a23c
TL
9from datetime import datetime, timedelta
10
cd265ab1
TL
11from mgr_module import ERROR_MSG_EMPTY_INPUT_FILE
12
11fdf7f2 13from .. import mgr
f67539c2
TL
14from ..security import Permission, Scope
15from ..services.access_control import SYSTEM_ROLES, AccessControlDB, \
16 PasswordPolicy, load_access_control_db, password_hash
9f95a23c 17from ..settings import Settings
f67539c2 18from . import CLICommandTestMixin, CmdException # pylint: disable=no-name-in-module
11fdf7f2
TL
19
20
21class AccessControlTest(unittest.TestCase, CLICommandTestMixin):
22
23 @classmethod
24 def setUpClass(cls):
25 cls.mock_kv_store()
26 mgr.ACCESS_CONTROL_DB = None
27
28 def setUp(self):
29 self.CONFIG_KEY_DICT.clear()
30 load_access_control_db()
31
32 def load_persistent_db(self):
33 config_key = AccessControlDB.accessdb_config_key()
34 self.assertIn(config_key, self.CONFIG_KEY_DICT)
35 db_json = self.CONFIG_KEY_DICT[config_key]
36 db = json.loads(db_json)
37 return db
38
39 # The DB is written to persistent storage the first time it is saved.
40 # However, should an operation fail due to <reasons>, we may end up in
41 # a state where we have a completely empty CONFIG_KEY_DICT (our mock
42 # equivalent to the persistent state). While this works for most of the
43 # tests in this class, that would prevent us from testing things like
44 # "run a command that is expected to fail, and then ensure nothing
45 # happened", because we'd be asserting in `load_persistent_db()` due to
46 # the map being empty.
47 #
48 # This function will therefore force state to be written to our mock
49 # persistent state. We could have added this extra step to
50 # `load_persistent_db()` directly, but that would conflict with the
51 # upgrade tests. This way, we can selectively enforce this requirement
52 # where we believe it to be necessary; generically speaking, this should
53 # not be needed unless we're testing very specific behaviors.
54 #
55 def setup_and_load_persistent_db(self):
56 mgr.ACCESS_CTRL_DB.save()
57 self.load_persistent_db()
58
59 def validate_persistent_role(self, rolename, scopes_permissions,
60 description=None):
61 db = self.load_persistent_db()
62 self.assertIn('roles', db)
63 self.assertIn(rolename, db['roles'])
64 self.assertEqual(db['roles'][rolename]['name'], rolename)
65 self.assertEqual(db['roles'][rolename]['description'], description)
66 self.assertDictEqual(db['roles'][rolename]['scopes_permissions'],
67 scopes_permissions)
68
69 def validate_persistent_no_role(self, rolename):
70 db = self.load_persistent_db()
71 self.assertIn('roles', db)
72 self.assertNotIn(rolename, db['roles'])
73
74 def validate_persistent_user(self, username, roles, password=None,
9f95a23c
TL
75 name=None, email=None, last_update=None,
76 enabled=True, pwdExpirationDate=None):
11fdf7f2
TL
77 db = self.load_persistent_db()
78 self.assertIn('users', db)
79 self.assertIn(username, db['users'])
80 self.assertEqual(db['users'][username]['username'], username)
81 self.assertListEqual(db['users'][username]['roles'], roles)
82 if password:
83 self.assertEqual(db['users'][username]['password'], password)
84 if name:
85 self.assertEqual(db['users'][username]['name'], name)
86 if email:
87 self.assertEqual(db['users'][username]['email'], email)
9f95a23c
TL
88 if last_update:
89 self.assertEqual(db['users'][username]['lastUpdate'], last_update)
90 if pwdExpirationDate:
91 self.assertEqual(db['users'][username]['pwdExpirationDate'], pwdExpirationDate)
92 self.assertEqual(db['users'][username]['enabled'], enabled)
11fdf7f2
TL
93
94 def validate_persistent_no_user(self, username):
95 db = self.load_persistent_db()
96 self.assertIn('users', db)
97 self.assertNotIn(username, db['users'])
98
99 def test_create_role(self):
100 role = self.exec_cmd('ac-role-create', rolename='test_role')
101 self.assertDictEqual(role, {'name': 'test_role', 'description': None,
102 'scopes_permissions': {}})
103 self.validate_persistent_role('test_role', {})
104
105 def test_create_role_with_desc(self):
106 role = self.exec_cmd('ac-role-create', rolename='test_role',
107 description='Test Role')
108 self.assertDictEqual(role, {'name': 'test_role',
109 'description': 'Test Role',
110 'scopes_permissions': {}})
111 self.validate_persistent_role('test_role', {}, 'Test Role')
112
113 def test_create_duplicate_role(self):
114 self.test_create_role()
115
116 with self.assertRaises(CmdException) as ctx:
117 self.exec_cmd('ac-role-create', rolename='test_role')
118
119 self.assertEqual(ctx.exception.retcode, -errno.EEXIST)
120 self.assertEqual(str(ctx.exception), "Role 'test_role' already exists")
121
122 def test_delete_role(self):
123 self.test_create_role()
124 out = self.exec_cmd('ac-role-delete', rolename='test_role')
125 self.assertEqual(out, "Role 'test_role' deleted")
126 self.validate_persistent_no_role('test_role')
127
128 def test_delete_nonexistent_role(self):
129 with self.assertRaises(CmdException) as ctx:
130 self.exec_cmd('ac-role-delete', rolename='test_role')
131
132 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
133 self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist")
134
135 def test_show_single_role(self):
136 self.test_create_role()
137 role = self.exec_cmd('ac-role-show', rolename='test_role')
138 self.assertDictEqual(role, {'name': 'test_role', 'description': None,
139 'scopes_permissions': {}})
140
141 def test_show_nonexistent_role(self):
142 with self.assertRaises(CmdException) as ctx:
143 self.exec_cmd('ac-role-show', rolename='test_role')
144
145 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
146 self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist")
147
148 def test_show_system_roles(self):
149 roles = self.exec_cmd('ac-role-show')
150 self.assertEqual(len(roles), len(SYSTEM_ROLES))
151 for role in roles:
152 self.assertIn(role, SYSTEM_ROLES)
153
154 def test_show_system_role(self):
155 role = self.exec_cmd('ac-role-show', rolename="read-only")
156 self.assertEqual(role['name'], 'read-only')
f67539c2
TL
157 self.assertEqual(
158 role['description'],
159 'allows read permission for all security scope except dashboard settings and config-opt'
160 )
11fdf7f2
TL
161
162 def test_delete_system_role(self):
163 with self.assertRaises(CmdException) as ctx:
164 self.exec_cmd('ac-role-delete', rolename='administrator')
165
166 self.assertEqual(ctx.exception.retcode, -errno.EPERM)
167 self.assertEqual(str(ctx.exception),
168 "Cannot delete system role 'administrator'")
169
170 def test_add_role_scope_perms(self):
171 self.test_create_role()
172 self.exec_cmd('ac-role-add-scope-perms', rolename='test_role',
173 scopename=Scope.POOL,
174 permissions=[Permission.READ, Permission.DELETE])
175 role = self.exec_cmd('ac-role-show', rolename='test_role')
176 self.assertDictEqual(role, {'name': 'test_role',
177 'description': None,
178 'scopes_permissions': {
179 Scope.POOL: [Permission.DELETE,
180 Permission.READ]
181 }})
182 self.validate_persistent_role('test_role', {
183 Scope.POOL: [Permission.DELETE, Permission.READ]
184 })
185
186 def test_del_role_scope_perms(self):
187 self.test_add_role_scope_perms()
188 self.exec_cmd('ac-role-add-scope-perms', rolename='test_role',
189 scopename=Scope.MONITOR,
190 permissions=[Permission.READ, Permission.CREATE])
191 self.validate_persistent_role('test_role', {
192 Scope.POOL: [Permission.DELETE, Permission.READ],
193 Scope.MONITOR: [Permission.CREATE, Permission.READ]
194 })
195 self.exec_cmd('ac-role-del-scope-perms', rolename='test_role',
196 scopename=Scope.POOL)
197 role = self.exec_cmd('ac-role-show', rolename='test_role')
198 self.assertDictEqual(role, {'name': 'test_role',
199 'description': None,
200 'scopes_permissions': {
201 Scope.MONITOR: [Permission.CREATE,
202 Permission.READ]
203 }})
204 self.validate_persistent_role('test_role', {
205 Scope.MONITOR: [Permission.CREATE, Permission.READ]
206 })
207
208 def test_add_role_scope_perms_nonexistent_role(self):
209
210 with self.assertRaises(CmdException) as ctx:
211 self.exec_cmd('ac-role-add-scope-perms', rolename='test_role',
212 scopename='pool',
213 permissions=['read', 'delete'])
214
215 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
216 self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist")
217
218 def test_add_role_invalid_scope_perms(self):
219 self.test_create_role()
220
221 with self.assertRaises(CmdException) as ctx:
222 self.exec_cmd('ac-role-add-scope-perms', rolename='test_role',
223 scopename='invalidscope',
224 permissions=['read', 'delete'])
225
226 self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
227 self.assertEqual(str(ctx.exception),
228 "Scope 'invalidscope' is not valid\n Possible values: "
229 "{}".format(Scope.all_scopes()))
230
231 def test_add_role_scope_invalid_perms(self):
232 self.test_create_role()
233
234 with self.assertRaises(CmdException) as ctx:
235 self.exec_cmd('ac-role-add-scope-perms', rolename='test_role',
236 scopename='pool', permissions=['invalidperm'])
237
238 self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
239 self.assertEqual(str(ctx.exception),
240 "Permission 'invalidperm' is not valid\n Possible "
241 "values: {}".format(Permission.all_permissions()))
242
243 def test_del_role_scope_perms_nonexistent_role(self):
244
245 with self.assertRaises(CmdException) as ctx:
246 self.exec_cmd('ac-role-del-scope-perms', rolename='test_role',
247 scopename='pool')
248
249 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
250 self.assertEqual(str(ctx.exception), "Role 'test_role' does not exist")
251
252 def test_del_role_nonexistent_scope_perms(self):
253 self.test_add_role_scope_perms()
254
255 with self.assertRaises(CmdException) as ctx:
256 self.exec_cmd('ac-role-del-scope-perms', rolename='test_role',
257 scopename='nonexistentscope')
258
259 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
260 self.assertEqual(str(ctx.exception),
261 "There are no permissions for scope 'nonexistentscope' "
262 "in role 'test_role'")
263
264 def test_not_permitted_add_role_scope_perms(self):
265 with self.assertRaises(CmdException) as ctx:
266 self.exec_cmd('ac-role-add-scope-perms', rolename='read-only',
267 scopename='pool', permissions=['read', 'delete'])
268
269 self.assertEqual(ctx.exception.retcode, -errno.EPERM)
270 self.assertEqual(str(ctx.exception),
271 "Cannot update system role 'read-only'")
272
273 def test_not_permitted_del_role_scope_perms(self):
274 with self.assertRaises(CmdException) as ctx:
275 self.exec_cmd('ac-role-del-scope-perms', rolename='read-only',
276 scopename='pool')
277
278 self.assertEqual(ctx.exception.retcode, -errno.EPERM)
279 self.assertEqual(str(ctx.exception),
280 "Cannot update system role 'read-only'")
281
9f95a23c
TL
282 def test_create_user(self, username='admin', rolename=None, enabled=True,
283 pwdExpirationDate=None):
11fdf7f2 284 user = self.exec_cmd('ac-user-create', username=username,
cd265ab1 285 rolename=rolename, inbuf='admin',
11fdf7f2 286 name='{} User'.format(username),
9f95a23c
TL
287 email='{}@user.com'.format(username),
288 enabled=enabled, force_password=True,
289 pwd_expiration_date=pwdExpirationDate)
11fdf7f2
TL
290
291 pass_hash = password_hash('admin', user['password'])
292 self.assertDictEqual(user, {
293 'username': username,
294 'password': pass_hash,
9f95a23c
TL
295 'pwdExpirationDate': pwdExpirationDate,
296 'pwdUpdateRequired': False,
11fdf7f2
TL
297 'lastUpdate': user['lastUpdate'],
298 'name': '{} User'.format(username),
299 'email': '{}@user.com'.format(username),
9f95a23c
TL
300 'roles': [rolename] if rolename else [],
301 'enabled': enabled
11fdf7f2
TL
302 })
303 self.validate_persistent_user(username, [rolename] if rolename else [],
304 pass_hash, '{} User'.format(username),
305 '{}@user.com'.format(username),
9f95a23c 306 user['lastUpdate'], enabled)
11fdf7f2
TL
307 return user
308
9f95a23c
TL
309 def test_create_disabled_user(self):
310 self.test_create_user(enabled=False)
311
312 def test_create_user_pwd_expiration_date(self):
313 expiration_date = datetime.utcnow() + timedelta(days=10)
314 expiration_date = int(time.mktime(expiration_date.timetuple()))
315 self.test_create_user(pwdExpirationDate=expiration_date)
316
11fdf7f2
TL
317 def test_create_user_with_role(self):
318 self.test_add_role_scope_perms()
319 self.test_create_user(rolename='test_role')
320
321 def test_create_user_with_system_role(self):
322 self.test_create_user(rolename='administrator')
323
324 def test_delete_user(self):
325 self.test_create_user()
326 out = self.exec_cmd('ac-user-delete', username='admin')
327 self.assertEqual(out, "User 'admin' deleted")
328 users = self.exec_cmd('ac-user-show')
329 self.assertEqual(len(users), 0)
330 self.validate_persistent_no_user('admin')
331
332 def test_create_duplicate_user(self):
333 self.test_create_user()
cd265ab1 334 ret = self.exec_cmd('ac-user-create', username='admin', inbuf='admin',
801d1391
TL
335 force_password=True)
336 self.assertEqual(ret, "User 'admin' already exists")
11fdf7f2
TL
337
338 def test_create_users_with_dne_role(self):
339 # one time call to setup our persistent db
340 self.setup_and_load_persistent_db()
341
342 # create a user with a role that does not exist; expect a failure
343 try:
344 self.exec_cmd('ac-user-create', username='foo',
cd265ab1 345 rolename='dne_role', inbuf='foopass',
9f95a23c
TL
346 name='foo User', email='foo@user.com',
347 force_password=True)
11fdf7f2
TL
348 except CmdException as e:
349 self.assertEqual(e.retcode, -errno.ENOENT)
350
351 db = self.load_persistent_db()
352 if 'users' in db:
353 self.assertNotIn('foo', db['users'])
354
355 # We could just finish our test here, given we ensured that the user
356 # with a non-existent role is not in persistent storage. However,
357 # we're going to test the database's consistency, making sure that
358 # side-effects are not written to persistent storage once we commit
359 # an unrelated operation. To ensure this, we'll issue another
360 # operation that is sharing the same code path, and will check whether
361 # the next operation commits dirty state.
362
363 # create a role (this will be 'test_role')
364 self.test_create_role()
365 self.exec_cmd('ac-user-create', username='bar',
cd265ab1 366 rolename='test_role', inbuf='barpass',
9f95a23c
TL
367 name='bar User', email='bar@user.com',
368 force_password=True)
11fdf7f2
TL
369
370 # validate db:
371 # user 'foo' should not exist
372 # user 'bar' should exist and have role 'test_role'
373 self.validate_persistent_user('bar', ['test_role'])
374
375 db = self.load_persistent_db()
376 self.assertIn('users', db)
377 self.assertNotIn('foo', db['users'])
378
379 def test_delete_nonexistent_user(self):
380 with self.assertRaises(CmdException) as ctx:
381 self.exec_cmd('ac-user-delete', username='admin')
382
383 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
384 self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
385
386 def test_add_user_roles(self, username='admin',
387 roles=['pool-manager', 'block-manager']):
388 user_orig = self.test_create_user(username)
389 uroles = []
390 for role in roles:
391 uroles.append(role)
392 uroles.sort()
393 user = self.exec_cmd('ac-user-add-roles', username=username,
394 roles=[role])
f67539c2 395 self.assertLessEqual(uroles, user['roles'])
11fdf7f2
TL
396 self.validate_persistent_user(username, uroles)
397 self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate'])
398
399 def test_add_user_roles2(self):
400 user_orig = self.test_create_user()
401 user = self.exec_cmd('ac-user-add-roles', username="admin",
402 roles=['pool-manager', 'block-manager'])
f67539c2
TL
403 self.assertLessEqual(['block-manager', 'pool-manager'],
404 user['roles'])
11fdf7f2
TL
405 self.validate_persistent_user('admin', ['block-manager',
406 'pool-manager'])
407 self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate'])
408
409 def test_add_user_roles_not_existent_user(self):
410 with self.assertRaises(CmdException) as ctx:
411 self.exec_cmd('ac-user-add-roles', username="admin",
412 roles=['pool-manager', 'block-manager'])
413
414 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
415 self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
416
417 def test_add_user_roles_not_existent_role(self):
418 self.test_create_user()
419 with self.assertRaises(CmdException) as ctx:
420 self.exec_cmd('ac-user-add-roles', username="admin",
421 roles=['Invalid Role'])
422
423 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
424 self.assertEqual(str(ctx.exception),
425 "Role 'Invalid Role' does not exist")
426
427 def test_set_user_roles(self):
428 user_orig = self.test_create_user()
429 user = self.exec_cmd('ac-user-add-roles', username="admin",
430 roles=['pool-manager'])
f67539c2 431 self.assertLessEqual(['pool-manager'], user['roles'])
11fdf7f2
TL
432 self.validate_persistent_user('admin', ['pool-manager'])
433 self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate'])
434 user2 = self.exec_cmd('ac-user-set-roles', username="admin",
435 roles=['rgw-manager', 'block-manager'])
f67539c2
TL
436 self.assertLessEqual(['block-manager', 'rgw-manager'],
437 user2['roles'])
11fdf7f2
TL
438 self.validate_persistent_user('admin', ['block-manager',
439 'rgw-manager'])
440 self.assertGreaterEqual(user2['lastUpdate'], user['lastUpdate'])
441
442 def test_set_user_roles_not_existent_user(self):
443 with self.assertRaises(CmdException) as ctx:
444 self.exec_cmd('ac-user-set-roles', username="admin",
445 roles=['pool-manager', 'block-manager'])
446
447 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
448 self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
449
450 def test_set_user_roles_not_existent_role(self):
451 self.test_create_user()
452 with self.assertRaises(CmdException) as ctx:
453 self.exec_cmd('ac-user-set-roles', username="admin",
454 roles=['Invalid Role'])
455
456 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
457 self.assertEqual(str(ctx.exception),
458 "Role 'Invalid Role' does not exist")
459
460 def test_del_user_roles(self):
461 self.test_add_user_roles()
462 user = self.exec_cmd('ac-user-del-roles', username="admin",
463 roles=['pool-manager'])
f67539c2 464 self.assertLessEqual(['block-manager'], user['roles'])
11fdf7f2
TL
465 self.validate_persistent_user('admin', ['block-manager'])
466
467 def test_del_user_roles_not_existent_user(self):
468 with self.assertRaises(CmdException) as ctx:
469 self.exec_cmd('ac-user-del-roles', username="admin",
470 roles=['pool-manager', 'block-manager'])
471
472 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
473 self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
474
475 def test_del_user_roles_not_existent_role(self):
476 self.test_create_user()
477 with self.assertRaises(CmdException) as ctx:
478 self.exec_cmd('ac-user-del-roles', username="admin",
479 roles=['Invalid Role'])
480
481 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
482 self.assertEqual(str(ctx.exception),
483 "Role 'Invalid Role' does not exist")
484
485 def test_del_user_roles_not_associated_role(self):
486 self.test_create_user()
487 with self.assertRaises(CmdException) as ctx:
488 self.exec_cmd('ac-user-del-roles', username="admin",
489 roles=['rgw-manager'])
490
491 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
492 self.assertEqual(str(ctx.exception),
493 "Role 'rgw-manager' is not associated with user "
494 "'admin'")
495
496 def test_show_user(self):
497 self.test_add_user_roles()
498 user = self.exec_cmd('ac-user-show', username='admin')
499 pass_hash = password_hash('admin', user['password'])
500 self.assertDictEqual(user, {
501 'username': 'admin',
502 'lastUpdate': user['lastUpdate'],
503 'password': pass_hash,
9f95a23c
TL
504 'pwdExpirationDate': None,
505 'pwdUpdateRequired': False,
11fdf7f2
TL
506 'name': 'admin User',
507 'email': 'admin@user.com',
9f95a23c
TL
508 'roles': ['block-manager', 'pool-manager'],
509 'enabled': True
11fdf7f2
TL
510 })
511
512 def test_show_nonexistent_user(self):
513 with self.assertRaises(CmdException) as ctx:
514 self.exec_cmd('ac-user-show', username='admin')
515
516 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
517 self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
518
519 def test_show_all_users(self):
520 self.test_add_user_roles('admin', ['administrator'])
521 self.test_add_user_roles('guest', ['read-only'])
522 users = self.exec_cmd('ac-user-show')
523 self.assertEqual(len(users), 2)
524 for user in users:
525 self.assertIn(user, ['admin', 'guest'])
526
527 def test_del_role_associated_with_user(self):
528 self.test_create_role()
529 self.test_add_user_roles('guest', ['test_role'])
530
531 with self.assertRaises(CmdException) as ctx:
532 self.exec_cmd('ac-role-delete', rolename='test_role')
533
534 self.assertEqual(ctx.exception.retcode, -errno.EPERM)
535 self.assertEqual(str(ctx.exception),
536 "Role 'test_role' is still associated with user "
537 "'guest'")
538
539 def test_set_user_info(self):
540 user_orig = self.test_create_user()
541 user = self.exec_cmd('ac-user-set-info', username='admin',
542 name='Admin Name', email='admin@admin.com')
543 pass_hash = password_hash('admin', user['password'])
544 self.assertDictEqual(user, {
545 'username': 'admin',
546 'password': pass_hash,
9f95a23c
TL
547 'pwdExpirationDate': None,
548 'pwdUpdateRequired': False,
11fdf7f2
TL
549 'name': 'Admin Name',
550 'email': 'admin@admin.com',
551 'lastUpdate': user['lastUpdate'],
9f95a23c
TL
552 'roles': [],
553 'enabled': True
11fdf7f2
TL
554 })
555 self.validate_persistent_user('admin', [], pass_hash, 'Admin Name',
556 'admin@admin.com')
557 self.assertEqual(user['lastUpdate'], user_orig['lastUpdate'])
558
559 def test_set_user_info_nonexistent_user(self):
560 with self.assertRaises(CmdException) as ctx:
561 self.exec_cmd('ac-user-set-info', username='admin',
562 name='Admin Name', email='admin@admin.com')
563
564 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
565 self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
566
567 def test_set_user_password(self):
568 user_orig = self.test_create_user()
569 user = self.exec_cmd('ac-user-set-password', username='admin',
cd265ab1 570 inbuf='newpass', force_password=True)
11fdf7f2
TL
571 pass_hash = password_hash('newpass', user['password'])
572 self.assertDictEqual(user, {
573 'username': 'admin',
574 'password': pass_hash,
9f95a23c
TL
575 'pwdExpirationDate': None,
576 'pwdUpdateRequired': False,
11fdf7f2
TL
577 'name': 'admin User',
578 'email': 'admin@user.com',
579 'lastUpdate': user['lastUpdate'],
9f95a23c
TL
580 'roles': [],
581 'enabled': True
11fdf7f2
TL
582 })
583 self.validate_persistent_user('admin', [], pass_hash, 'admin User',
584 'admin@user.com')
585 self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate'])
586
cd265ab1
TL
587 def test_sanitize_password(self):
588 self.test_create_user()
589 password = 'myPass\\n\\r\\n'
590 with open('/tmp/test_sanitize_password.txt', 'w+') as pwd_file:
591 # Add new line separators (like some text editors when a file is saved).
592 pwd_file.write('{}{}'.format(password, '\n\r\n\n'))
593 pwd_file.seek(0)
594 user = self.exec_cmd('ac-user-set-password', username='admin',
595 inbuf=pwd_file.read(), force_password=True)
596 pass_hash = password_hash(password, user['password'])
597 self.assertEqual(user['password'], pass_hash)
598
11fdf7f2
TL
599 def test_set_user_password_nonexistent_user(self):
600 with self.assertRaises(CmdException) as ctx:
601 self.exec_cmd('ac-user-set-password', username='admin',
cd265ab1 602 inbuf='newpass', force_password=True)
9f95a23c
TL
603
604 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
605 self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
606
cd265ab1
TL
607 def test_set_user_password_empty(self):
608 with self.assertRaises(CmdException) as ctx:
609 self.exec_cmd('ac-user-set-password', username='admin', inbuf='\n',
610 force_password=True)
611
612 self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
613 self.assertEqual(str(ctx.exception), ERROR_MSG_EMPTY_INPUT_FILE)
614
9f95a23c
TL
615 def test_set_user_password_hash(self):
616 user_orig = self.test_create_user()
617 user = self.exec_cmd('ac-user-set-password-hash', username='admin',
cd265ab1 618 inbuf='$2b$12$Pt3Vq/rDt2y9glTPSV.VFegiLkQeIpddtkhoFetNApYmIJOY8gau2')
9f95a23c
TL
619 pass_hash = password_hash('newpass', user['password'])
620 self.assertDictEqual(user, {
621 'username': 'admin',
622 'password': pass_hash,
623 'pwdExpirationDate': None,
624 'pwdUpdateRequired': False,
625 'name': 'admin User',
626 'email': 'admin@user.com',
627 'lastUpdate': user['lastUpdate'],
628 'roles': [],
629 'enabled': True
630 })
631 self.validate_persistent_user('admin', [], pass_hash, 'admin User',
632 'admin@user.com')
633 self.assertGreaterEqual(user['lastUpdate'], user_orig['lastUpdate'])
634
635 def test_set_user_password_hash_nonexistent_user(self):
636 with self.assertRaises(CmdException) as ctx:
637 self.exec_cmd('ac-user-set-password-hash', username='admin',
cd265ab1 638 inbuf='$2b$12$Pt3Vq/rDt2y9glTPSV.VFegiLkQeIpddtkhoFetNApYmIJOY8gau2')
11fdf7f2
TL
639
640 self.assertEqual(ctx.exception.retcode, -errno.ENOENT)
641 self.assertEqual(str(ctx.exception), "User 'admin' does not exist")
642
9f95a23c
TL
643 def test_set_user_password_hash_broken_hash(self):
644 self.test_create_user()
645 with self.assertRaises(CmdException) as ctx:
646 self.exec_cmd('ac-user-set-password-hash', username='admin',
cd265ab1 647 inbuf='1')
9f95a23c
TL
648
649 self.assertEqual(ctx.exception.retcode, -errno.EINVAL)
650 self.assertEqual(str(ctx.exception), 'Invalid password hash')
651
11fdf7f2
TL
652 def test_set_login_credentials(self):
653 self.exec_cmd('set-login-credentials', username='admin',
cd265ab1 654 inbuf='admin')
11fdf7f2
TL
655 user = self.exec_cmd('ac-user-show', username='admin')
656 pass_hash = password_hash('admin', user['password'])
657 self.assertDictEqual(user, {
658 'username': 'admin',
659 'password': pass_hash,
9f95a23c
TL
660 'pwdExpirationDate': None,
661 'pwdUpdateRequired': False,
11fdf7f2
TL
662 'name': None,
663 'email': None,
664 'lastUpdate': user['lastUpdate'],
9f95a23c
TL
665 'roles': ['administrator'],
666 'enabled': True,
11fdf7f2
TL
667 })
668 self.validate_persistent_user('admin', ['administrator'], pass_hash,
669 None, None)
670
671 def test_set_login_credentials_for_existing_user(self):
672 self.test_add_user_roles('admin', ['read-only'])
673 self.exec_cmd('set-login-credentials', username='admin',
cd265ab1 674 inbuf='admin2')
11fdf7f2
TL
675 user = self.exec_cmd('ac-user-show', username='admin')
676 pass_hash = password_hash('admin2', user['password'])
677 self.assertDictEqual(user, {
678 'username': 'admin',
679 'password': pass_hash,
9f95a23c
TL
680 'pwdExpirationDate': None,
681 'pwdUpdateRequired': False,
11fdf7f2
TL
682 'name': 'admin User',
683 'email': 'admin@user.com',
684 'lastUpdate': user['lastUpdate'],
9f95a23c
TL
685 'roles': ['read-only'],
686 'enabled': True
11fdf7f2
TL
687 })
688 self.validate_persistent_user('admin', ['read-only'], pass_hash,
689 'admin User', 'admin@user.com')
690
b3b6e05e
TL
691 def test_load_v1(self):
692 self.CONFIG_KEY_DICT['accessdb_v1'] = '''
693 {{
694 "users": {{
695 "admin": {{
696 "username": "admin",
697 "password":
698 "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
699 "roles": ["block-manager", "test_role"],
700 "name": "admin User",
701 "email": "admin@user.com",
702 "lastUpdate": {}
703 }}
704 }},
705 "roles": {{
706 "test_role": {{
707 "name": "test_role",
708 "description": "Test Role",
709 "scopes_permissions": {{
710 "{}": ["{}", "{}"],
711 "{}": ["{}"]
712 }}
713 }}
714 }},
715 "version": 1
716 }}
717 '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ,
718 Permission.UPDATE, Scope.POOL, Permission.CREATE)
719
720 load_access_control_db()
721 role = self.exec_cmd('ac-role-show', rolename="test_role")
722 self.assertDictEqual(role, {
723 'name': 'test_role',
724 'description': "Test Role",
725 'scopes_permissions': {
726 Scope.ISCSI: [Permission.READ, Permission.UPDATE],
727 Scope.POOL: [Permission.CREATE]
728 }
729 })
730 user = self.exec_cmd('ac-user-show', username="admin")
731 self.assertDictEqual(user, {
732 'username': 'admin',
733 'lastUpdate': user['lastUpdate'],
734 'password':
735 "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
736 'pwdExpirationDate': None,
737 'pwdUpdateRequired': False,
738 'name': 'admin User',
739 'email': 'admin@user.com',
740 'roles': ['block-manager', 'test_role'],
741 'enabled': True
742 })
743
744 def test_load_v2(self):
745 self.CONFIG_KEY_DICT['accessdb_v2'] = '''
746 {{
747 "users": {{
748 "admin": {{
749 "username": "admin",
750 "password":
751 "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
752 "pwdExpirationDate": null,
753 "pwdUpdateRequired": false,
754 "roles": ["block-manager", "test_role"],
755 "name": "admin User",
756 "email": "admin@user.com",
757 "lastUpdate": {},
758 "enabled": true
759 }}
760 }},
761 "roles": {{
762 "test_role": {{
763 "name": "test_role",
764 "description": "Test Role",
765 "scopes_permissions": {{
766 "{}": ["{}", "{}"],
767 "{}": ["{}"]
768 }}
769 }}
770 }},
771 "version": 2
772 }}
773 '''.format(int(round(time.time())), Scope.ISCSI, Permission.READ,
774 Permission.UPDATE, Scope.POOL, Permission.CREATE)
775
776 load_access_control_db()
777 role = self.exec_cmd('ac-role-show', rolename="test_role")
778 self.assertDictEqual(role, {
779 'name': 'test_role',
780 'description': "Test Role",
781 'scopes_permissions': {
782 Scope.ISCSI: [Permission.READ, Permission.UPDATE],
783 Scope.POOL: [Permission.CREATE]
784 }
785 })
786 user = self.exec_cmd('ac-user-show', username="admin")
787 self.assertDictEqual(user, {
788 'username': 'admin',
789 'lastUpdate': user['lastUpdate'],
790 'password':
791 "$2b$12$sd0Az7mm3FaJl8kN3b/xwOuztaN0sWUwC1SJqjM4wcDw/s5cmGbLK",
792 'pwdExpirationDate': None,
793 'pwdUpdateRequired': False,
794 'name': 'admin User',
795 'email': 'admin@user.com',
796 'roles': ['block-manager', 'test_role'],
797 'enabled': True
798 })
799
9f95a23c
TL
800 def test_password_policy_pw_length(self):
801 Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True
802 Settings.PWD_POLICY_MIN_LENGTH = 3
803 pw_policy = PasswordPolicy('foo')
804 self.assertTrue(pw_policy.check_password_length())
805
806 def test_password_policy_pw_length_fail(self):
807 Settings.PWD_POLICY_CHECK_LENGTH_ENABLED = True
808 pw_policy = PasswordPolicy('bar')
809 self.assertFalse(pw_policy.check_password_length())
810
811 def test_password_policy_credits_too_weak(self):
812 Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True
813 pw_policy = PasswordPolicy('foo')
814 pw_credits = pw_policy.check_password_complexity()
815 self.assertEqual(pw_credits, 3)
816
817 def test_password_policy_credits_weak(self):
818 Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True
819 pw_policy = PasswordPolicy('mypassword1')
820 pw_credits = pw_policy.check_password_complexity()
821 self.assertEqual(pw_credits, 11)
822
823 def test_password_policy_credits_ok(self):
824 Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True
825 pw_policy = PasswordPolicy('mypassword1!@')
826 pw_credits = pw_policy.check_password_complexity()
827 self.assertEqual(pw_credits, 17)
828
829 def test_password_policy_credits_strong(self):
830 Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True
831 pw_policy = PasswordPolicy('testpassword0047!@')
832 pw_credits = pw_policy.check_password_complexity()
833 self.assertEqual(pw_credits, 22)
834
835 def test_password_policy_credits_very_strong(self):
836 Settings.PWD_POLICY_CHECK_COMPLEXITY_ENABLED = True
837 pw_policy = PasswordPolicy('testpassword#!$!@$')
838 pw_credits = pw_policy.check_password_complexity()
839 self.assertEqual(pw_credits, 30)
840
841 def test_password_policy_forbidden_words(self):
842 Settings.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED = True
843 pw_policy = PasswordPolicy('!@$testdashboard#!$')
844 self.assertTrue(pw_policy.check_if_contains_forbidden_words())
845
846 def test_password_policy_forbidden_words_custom(self):
847 Settings.PWD_POLICY_CHECK_EXCLUSION_LIST_ENABLED = True
848 Settings.PWD_POLICY_EXCLUSION_LIST = 'foo,bar'
849 pw_policy = PasswordPolicy('foo123bar')
850 self.assertTrue(pw_policy.check_if_contains_forbidden_words())
851
852 def test_password_policy_sequential_chars(self):
853 Settings.PWD_POLICY_CHECK_SEQUENTIAL_CHARS_ENABLED = True
854 pw_policy = PasswordPolicy('!@$test123#!$')
855 self.assertTrue(pw_policy.check_if_sequential_characters())
856
857 def test_password_policy_repetitive_chars(self):
858 Settings.PWD_POLICY_CHECK_REPETITIVE_CHARS_ENABLED = True
859 pw_policy = PasswordPolicy('!@$testfooo#!$')
860 self.assertTrue(pw_policy.check_if_repetitive_characters())
861
862 def test_password_policy_contain_username(self):
863 Settings.PWD_POLICY_CHECK_USERNAME_ENABLED = True
864 pw_policy = PasswordPolicy('%admin135)', 'admin')
865 self.assertTrue(pw_policy.check_if_contains_username())
866
867 def test_password_policy_is_old_pwd(self):
868 Settings.PWD_POLICY_CHECK_OLDPWD_ENABLED = True
869 pw_policy = PasswordPolicy('foo', old_password='foo')
870 self.assertTrue(pw_policy.check_is_old_password())