]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/controllers/auth.py
1 # -*- coding: utf-8 -*-
8 from ..exceptions
import InvalidCredentialsError
, UserDoesNotExist
9 from ..services
.auth
import AuthManager
, JwtManager
10 from ..services
.cluster
import ClusterModel
11 from ..settings
import Settings
12 from . import APIDoc
, APIRouter
, ControllerAuthMixin
, EndpointDoc
, RESTController
, allow_empty_body
14 # Python 3.8 introduced `samesite` attribute:
15 # https://docs.python.org/3/library/http.cookies.html#morsel-objects
16 if sys
.version_info
< (3, 8):
17 http
.cookies
.Morsel
._reserved
["samesite"] = "SameSite" # type: ignore # pylint: disable=W0212
19 logger
= logging
.getLogger('controllers.auth')
22 "username": (str, "Username"),
25 }, "List of permissions acquired"),
26 "sso": (bool, "Uses single sign on?"),
27 "pwdUpdateRequired": (bool, "Is password update required?")
31 @APIRouter('/auth', secure
=False)
32 @APIDoc("Initiate a session with Ceph", "Auth")
33 class Auth(RESTController
, ControllerAuthMixin
):
35 Provide authenticates and returns JWT token.
38 def create(self
, username
, password
):
39 user_data
= AuthManager
.authenticate(username
, password
)
40 user_perms
, pwd_expiration_date
, pwd_update_required
= None, None, None
41 max_attempt
= Settings
.ACCOUNT_LOCKOUT_ATTEMPTS
42 if max_attempt
== 0 or mgr
.ACCESS_CTRL_DB
.get_attempt(username
) < max_attempt
:
44 user_perms
= user_data
.get('permissions')
45 pwd_expiration_date
= user_data
.get('pwdExpirationDate', None)
46 pwd_update_required
= user_data
.get('pwdUpdateRequired', False)
48 if user_perms
is not None:
49 url_prefix
= 'https' if mgr
.get_localized_module_option('ssl') else 'http'
51 logger
.info('Login successful: %s', username
)
52 mgr
.ACCESS_CTRL_DB
.reset_attempt(username
)
53 mgr
.ACCESS_CTRL_DB
.save()
54 token
= JwtManager
.gen_token(username
)
56 # For backward-compatibility: PyJWT versions < 2.0.0 return bytes.
57 token
= token
.decode('utf-8') if isinstance(token
, bytes
) else token
59 self
._set
_token
_cookie
(url_prefix
, token
)
63 'permissions': user_perms
,
64 'pwdExpirationDate': pwd_expiration_date
,
65 'sso': mgr
.SSO_DB
.protocol
== 'saml2',
66 'pwdUpdateRequired': pwd_update_required
68 mgr
.ACCESS_CTRL_DB
.increment_attempt(username
)
69 mgr
.ACCESS_CTRL_DB
.save()
72 user
= mgr
.ACCESS_CTRL_DB
.get_user(username
)
74 mgr
.ACCESS_CTRL_DB
.save()
75 logging
.warning('Maximum number of unsuccessful log-in attempts '
77 'username "%s" so the account was blocked. '
78 'An administrator will need to re-enable the account',
79 max_attempt
, username
)
80 raise InvalidCredentialsError
81 except UserDoesNotExist
:
82 raise InvalidCredentialsError
83 logger
.info('Login failed: %s', username
)
84 raise InvalidCredentialsError
86 @RESTController.Collection('POST')
89 logger
.debug('Logout successful')
90 token
= JwtManager
.get_token_from_header()
91 JwtManager
.blocklist_token(token
)
92 self
._delete
_token
_cookie
(token
)
93 redirect_url
= '#/login'
94 if mgr
.SSO_DB
.protocol
== 'saml2':
95 redirect_url
= 'auth/saml2/slo'
97 'redirect_url': redirect_url
100 def _get_login_url(self
):
101 if mgr
.SSO_DB
.protocol
== 'saml2':
102 return 'auth/saml2/login'
105 @RESTController.Collection('POST', query_params
=['token'])
106 @EndpointDoc("Check token Authentication",
107 parameters
={'token': (str, 'Authentication Token')},
108 responses
={201: AUTH_CHECK_SCHEMA
})
109 def check(self
, token
):
111 user
= JwtManager
.get_user(token
)
114 'username': user
.username
,
115 'permissions': user
.permissions_dict(),
116 'sso': mgr
.SSO_DB
.protocol
== 'saml2',
117 'pwdUpdateRequired': user
.pwd_update_required
120 'login_url': self
._get
_login
_url
(),
121 'cluster_status': ClusterModel
.from_db().dict()['status']