]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/controllers/auth.py
d6dd12d6bda522eb8d00c56b888972269f4510ac
[ceph.git] / ceph / src / pybind / mgr / dashboard / controllers / auth.py
1 # -*- coding: utf-8 -*-
2 from __future__ import absolute_import
3
4 import http.cookies
5 import logging
6 import sys
7
8 from . import ApiController, RESTController, \
9 allow_empty_body, set_cookies
10 from .. import mgr
11 from ..exceptions import InvalidCredentialsError, UserDoesNotExist
12 from ..services.auth import AuthManager, JwtManager
13 from ..settings import Settings
14
15 # Python 3.8 introduced `samesite` attribute:
16 # https://docs.python.org/3/library/http.cookies.html#morsel-objects
17 if sys.version_info < (3, 8):
18 http.cookies.Morsel._reserved["samesite"] = "SameSite" # type: ignore # pylint: disable=W0212
19
20 logger = logging.getLogger('controllers.auth')
21
22
23 @ApiController('/auth', secure=False)
24 class Auth(RESTController):
25 """
26 Provide authenticates and returns JWT token.
27 """
28 def create(self, username, password):
29 user_data = AuthManager.authenticate(username, password)
30 user_perms, pwd_expiration_date, pwd_update_required = None, None, None
31 max_attempt = Settings.ACCOUNT_LOCKOUT_ATTEMPTS
32 if max_attempt == 0 or mgr.ACCESS_CTRL_DB.get_attempt(username) < max_attempt:
33 if user_data:
34 user_perms = user_data.get('permissions')
35 pwd_expiration_date = user_data.get('pwdExpirationDate', None)
36 pwd_update_required = user_data.get('pwdUpdateRequired', False)
37
38 if user_perms is not None:
39 url_prefix = 'https' if mgr.get_localized_module_option('ssl') else 'http'
40 logger.info('Login successful: %s', username)
41 mgr.ACCESS_CTRL_DB.reset_attempt(username)
42 mgr.ACCESS_CTRL_DB.save()
43 token = JwtManager.gen_token(username)
44 token = token.decode('utf-8')
45 set_cookies(url_prefix, token)
46 return {
47 'token': token,
48 'username': username,
49 'permissions': user_perms,
50 'pwdExpirationDate': pwd_expiration_date,
51 'sso': mgr.SSO_DB.protocol == 'saml2',
52 'pwdUpdateRequired': pwd_update_required
53 }
54 mgr.ACCESS_CTRL_DB.increment_attempt(username)
55 mgr.ACCESS_CTRL_DB.save()
56 else:
57 try:
58 user = mgr.ACCESS_CTRL_DB.get_user(username)
59 user.enabled = False
60 mgr.ACCESS_CTRL_DB.save()
61 logging.warning('Maximum number of unsuccessful log-in attempts '
62 '(%d) reached for '
63 'username "%s" so the account was blocked. '
64 'An administrator will need to re-enable the account',
65 max_attempt, username)
66 raise InvalidCredentialsError
67 except UserDoesNotExist:
68 raise InvalidCredentialsError
69 logger.info('Login failed: %s', username)
70 raise InvalidCredentialsError
71
72 @RESTController.Collection('POST')
73 @allow_empty_body
74 def logout(self):
75 logger.debug('Logout successful')
76 token = JwtManager.get_token_from_header()
77 JwtManager.blacklist_token(token)
78 redirect_url = '#/login'
79 if mgr.SSO_DB.protocol == 'saml2':
80 redirect_url = 'auth/saml2/slo'
81 return {
82 'redirect_url': redirect_url
83 }
84
85 def _get_login_url(self):
86 if mgr.SSO_DB.protocol == 'saml2':
87 return 'auth/saml2/login'
88 return '#/login'
89
90 @RESTController.Collection('POST')
91 def check(self, token):
92 if token:
93 user = JwtManager.get_user(token)
94 if user:
95 return {
96 'username': user.username,
97 'permissions': user.permissions_dict(),
98 'sso': mgr.SSO_DB.protocol == 'saml2',
99 'pwdUpdateRequired': user.pwd_update_required
100 }
101 return {
102 'login_url': self._get_login_url(),
103 }