]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/controllers/auth.py
1 # -*- coding: utf-8 -*-
2 from __future__
import absolute_import
9 from ..exceptions
import InvalidCredentialsError
, UserDoesNotExist
10 from ..services
.auth
import AuthManager
, JwtManager
11 from ..settings
import Settings
12 from . import ApiController
, ControllerAuthMixin
, ControllerDoc
, EndpointDoc
, \
13 RESTController
, allow_empty_body
15 # Python 3.8 introduced `samesite` attribute:
16 # https://docs.python.org/3/library/http.cookies.html#morsel-objects
17 if sys
.version_info
< (3, 8):
18 http
.cookies
.Morsel
._reserved
["samesite"] = "SameSite" # type: ignore # pylint: disable=W0212
20 logger
= logging
.getLogger('controllers.auth')
23 "username": (str, "Username"),
26 }, "List of permissions acquired"),
27 "sso": (bool, "Uses single sign on?"),
28 "pwdUpdateRequired": (bool, "Is password update required?")
32 @ApiController('/auth', secure
=False)
33 @ControllerDoc("Initiate a session with Ceph", "Auth")
34 class Auth(RESTController
, ControllerAuthMixin
):
36 Provide authenticates and returns JWT token.
39 def create(self
, username
, password
):
40 user_data
= AuthManager
.authenticate(username
, password
)
41 user_perms
, pwd_expiration_date
, pwd_update_required
= None, None, None
42 max_attempt
= Settings
.ACCOUNT_LOCKOUT_ATTEMPTS
43 if max_attempt
== 0 or mgr
.ACCESS_CTRL_DB
.get_attempt(username
) < max_attempt
:
45 user_perms
= user_data
.get('permissions')
46 pwd_expiration_date
= user_data
.get('pwdExpirationDate', None)
47 pwd_update_required
= user_data
.get('pwdUpdateRequired', False)
49 if user_perms
is not None:
50 url_prefix
= 'https' if mgr
.get_localized_module_option('ssl') else 'http'
52 logger
.info('Login successful: %s', username
)
53 mgr
.ACCESS_CTRL_DB
.reset_attempt(username
)
54 mgr
.ACCESS_CTRL_DB
.save()
55 token
= JwtManager
.gen_token(username
)
57 # For backward-compatibility: PyJWT versions < 2.0.0 return bytes.
58 token
= token
.decode('utf-8') if isinstance(token
, bytes
) else token
60 self
._set
_token
_cookie
(url_prefix
, token
)
64 'permissions': user_perms
,
65 'pwdExpirationDate': pwd_expiration_date
,
66 'sso': mgr
.SSO_DB
.protocol
== 'saml2',
67 'pwdUpdateRequired': pwd_update_required
69 mgr
.ACCESS_CTRL_DB
.increment_attempt(username
)
70 mgr
.ACCESS_CTRL_DB
.save()
73 user
= mgr
.ACCESS_CTRL_DB
.get_user(username
)
75 mgr
.ACCESS_CTRL_DB
.save()
76 logging
.warning('Maximum number of unsuccessful log-in attempts '
78 'username "%s" so the account was blocked. '
79 'An administrator will need to re-enable the account',
80 max_attempt
, username
)
81 raise InvalidCredentialsError
82 except UserDoesNotExist
:
83 raise InvalidCredentialsError
84 logger
.info('Login failed: %s', username
)
85 raise InvalidCredentialsError
87 @RESTController.Collection('POST')
90 logger
.debug('Logout successful')
91 token
= JwtManager
.get_token_from_header()
92 JwtManager
.blocklist_token(token
)
93 self
._delete
_token
_cookie
(token
)
94 redirect_url
= '#/login'
95 if mgr
.SSO_DB
.protocol
== 'saml2':
96 redirect_url
= 'auth/saml2/slo'
98 'redirect_url': redirect_url
101 def _get_login_url(self
):
102 if mgr
.SSO_DB
.protocol
== 'saml2':
103 return 'auth/saml2/login'
106 @RESTController.Collection('POST', query_params
=['token'])
107 @EndpointDoc("Check token Authentication",
108 parameters
={'token': (str, 'Authentication Token')},
109 responses
={201: AUTH_CHECK_SCHEMA
})
110 def check(self
, token
):
112 user
= JwtManager
.get_user(token
)
115 'username': user
.username
,
116 'permissions': user
.permissions_dict(),
117 'sso': mgr
.SSO_DB
.protocol
== 'saml2',
118 'pwdUpdateRequired': user
.pwd_update_required
121 'login_url': self
._get
_login
_url
(),