]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/controllers/auth.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / dashboard / controllers / auth.py
1 # -*- coding: utf-8 -*-
2 from __future__ import absolute_import
3
4 import http.cookies
5 import logging
6 import sys
7
8 from .. import mgr
9 from ..exceptions import InvalidCredentialsError, UserDoesNotExist
10 from ..services.auth import AuthManager, JwtManager
11 from ..settings import Settings
12 from . import ApiController, ControllerAuthMixin, ControllerDoc, EndpointDoc, \
13 RESTController, allow_empty_body
14
15 # Python 3.8 introduced `samesite` attribute:
16 # https://docs.python.org/3/library/http.cookies.html#morsel-objects
17 if sys.version_info < (3, 8):
18 http.cookies.Morsel._reserved["samesite"] = "SameSite" # type: ignore # pylint: disable=W0212
19
20 logger = logging.getLogger('controllers.auth')
21
22 AUTH_CHECK_SCHEMA = {
23 "username": (str, "Username"),
24 "permissions": ({
25 "cephfs": ([str], "")
26 }, "List of permissions acquired"),
27 "sso": (bool, "Uses single sign on?"),
28 "pwdUpdateRequired": (bool, "Is password update required?")
29 }
30
31
32 @ApiController('/auth', secure=False)
33 @ControllerDoc("Initiate a session with Ceph", "Auth")
34 class Auth(RESTController, ControllerAuthMixin):
35 """
36 Provide authenticates and returns JWT token.
37 """
38
39 def create(self, username, password):
40 user_data = AuthManager.authenticate(username, password)
41 user_perms, pwd_expiration_date, pwd_update_required = None, None, None
42 max_attempt = Settings.ACCOUNT_LOCKOUT_ATTEMPTS
43 if max_attempt == 0 or mgr.ACCESS_CTRL_DB.get_attempt(username) < max_attempt:
44 if user_data:
45 user_perms = user_data.get('permissions')
46 pwd_expiration_date = user_data.get('pwdExpirationDate', None)
47 pwd_update_required = user_data.get('pwdUpdateRequired', False)
48
49 if user_perms is not None:
50 url_prefix = 'https' if mgr.get_localized_module_option('ssl') else 'http'
51
52 logger.info('Login successful: %s', username)
53 mgr.ACCESS_CTRL_DB.reset_attempt(username)
54 mgr.ACCESS_CTRL_DB.save()
55 token = JwtManager.gen_token(username)
56
57 # For backward-compatibility: PyJWT versions < 2.0.0 return bytes.
58 token = token.decode('utf-8') if isinstance(token, bytes) else token
59
60 self._set_token_cookie(url_prefix, token)
61 return {
62 'token': token,
63 'username': username,
64 'permissions': user_perms,
65 'pwdExpirationDate': pwd_expiration_date,
66 'sso': mgr.SSO_DB.protocol == 'saml2',
67 'pwdUpdateRequired': pwd_update_required
68 }
69 mgr.ACCESS_CTRL_DB.increment_attempt(username)
70 mgr.ACCESS_CTRL_DB.save()
71 else:
72 try:
73 user = mgr.ACCESS_CTRL_DB.get_user(username)
74 user.enabled = False
75 mgr.ACCESS_CTRL_DB.save()
76 logging.warning('Maximum number of unsuccessful log-in attempts '
77 '(%d) reached for '
78 'username "%s" so the account was blocked. '
79 'An administrator will need to re-enable the account',
80 max_attempt, username)
81 raise InvalidCredentialsError
82 except UserDoesNotExist:
83 raise InvalidCredentialsError
84 logger.info('Login failed: %s', username)
85 raise InvalidCredentialsError
86
87 @RESTController.Collection('POST')
88 @allow_empty_body
89 def logout(self):
90 logger.debug('Logout successful')
91 token = JwtManager.get_token_from_header()
92 JwtManager.blocklist_token(token)
93 self._delete_token_cookie(token)
94 redirect_url = '#/login'
95 if mgr.SSO_DB.protocol == 'saml2':
96 redirect_url = 'auth/saml2/slo'
97 return {
98 'redirect_url': redirect_url
99 }
100
101 def _get_login_url(self):
102 if mgr.SSO_DB.protocol == 'saml2':
103 return 'auth/saml2/login'
104 return '#/login'
105
106 @RESTController.Collection('POST', query_params=['token'])
107 @EndpointDoc("Check token Authentication",
108 parameters={'token': (str, 'Authentication Token')},
109 responses={201: AUTH_CHECK_SCHEMA})
110 def check(self, token):
111 if token:
112 user = JwtManager.get_user(token)
113 if user:
114 return {
115 'username': user.username,
116 'permissions': user.permissions_dict(),
117 'sso': mgr.SSO_DB.protocol == 'saml2',
118 'pwdUpdateRequired': user.pwd_update_required
119 }
120 return {
121 'login_url': self._get_login_url(),
122 }