from base64 import b64encode
import json
+import logging
import os
import threading
import time
import jwt
from .access_control import LocalAuthenticator, UserDoesNotExist
-from .. import mgr, logger
+from .. import mgr
class JwtManager(object):
@classmethod
def init(cls):
+ cls.logger = logging.getLogger('jwt') # type: ignore
# generate a new secret if it does not exist
secret = mgr.get_store('jwt_secret')
if secret is None:
'iat': now,
'username': username
}
- return jwt.encode(payload, cls._secret, algorithm=cls.JWT_ALGORITHM)
+ return jwt.encode(payload, cls._secret, algorithm=cls.JWT_ALGORITHM) # type: ignore
@classmethod
def decode_token(cls, token):
if not cls._secret:
cls.init()
- return jwt.decode(token, cls._secret, algorithms=cls.JWT_ALGORITHM)
+ return jwt.decode(token, cls._secret, algorithms=cls.JWT_ALGORITHM) # type: ignore
@classmethod
def get_token_from_header(cls):
return None
@classmethod
- def set_user(cls, token):
- cls.LOCAL_USER.username = token['username']
+ def set_user(cls, username):
+ cls.LOCAL_USER.username = username
@classmethod
def reset_user(cls):
- cls.set_user({'username': None, 'permissions': None})
+ cls.set_user(None)
@classmethod
def get_username(cls):
return getattr(cls.LOCAL_USER, 'username', None)
+ @classmethod
+ def get_user(cls, token):
+ try:
+ dtoken = JwtManager.decode_token(token)
+ if not JwtManager.is_blacklisted(dtoken['jti']):
+ user = AuthManager.get_user(dtoken['username'])
+ if user.last_update <= dtoken['iat']:
+ return user
+ cls.logger.debug( # type: ignore
+ "user info changed after token was issued, iat=%s last_update=%s",
+ dtoken['iat'], user.last_update
+ )
+ else:
+ cls.logger.debug('Token is black-listed') # type: ignore
+ except jwt.ExpiredSignatureError:
+ cls.logger.debug("Token has expired") # type: ignore
+ except jwt.InvalidTokenError:
+ cls.logger.debug("Failed to decode token") # type: ignore
+ except UserDoesNotExist:
+ cls.logger.debug( # type: ignore
+ "Invalid token: user %s does not exist", dtoken['username']
+ )
+ return None
+
@classmethod
def blacklist_token(cls, token):
token = jwt.decode(token, verify=False)
@classmethod
def get_user(cls, username):
- return cls.AUTH_PROVIDER.get_user(username)
+ return cls.AUTH_PROVIDER.get_user(username) # type: ignore
@classmethod
def authenticate(cls, username, password):
- return cls.AUTH_PROVIDER.authenticate(username, password)
+ return cls.AUTH_PROVIDER.authenticate(username, password) # type: ignore
@classmethod
def authorize(cls, username, scope, permissions):
- return cls.AUTH_PROVIDER.authorize(username, scope, permissions)
+ return cls.AUTH_PROVIDER.authorize(username, scope, permissions) # type: ignore
class AuthManagerTool(cherrypy.Tool):
def __init__(self):
super(AuthManagerTool, self).__init__(
'before_handler', self._check_authentication, priority=20)
+ self.logger = logging.getLogger('auth')
def _check_authentication(self):
JwtManager.reset_user()
token = JwtManager.get_token_from_header()
- logger.debug("AMT: token: %s", token)
+ self.logger.debug("token: %s", token)
if token:
- try:
- token = JwtManager.decode_token(token)
- if not JwtManager.is_blacklisted(token['jti']):
- user = AuthManager.get_user(token['username'])
- if user.lastUpdate <= token['iat']:
- self._check_authorization(token)
- return
-
- logger.debug("AMT: user info changed after token was"
- " issued, iat=%s lastUpdate=%s",
- token['iat'], user.lastUpdate)
- else:
- logger.debug('AMT: Token is black-listed')
- except jwt.exceptions.ExpiredSignatureError:
- logger.debug("AMT: Token has expired")
- except jwt.exceptions.InvalidTokenError:
- logger.debug("AMT: Failed to decode token")
- except UserDoesNotExist:
- logger.debug("AMT: Invalid token: user %s does not exist",
- token['username'])
-
- logger.debug('AMT: Unauthorized access to %s',
- cherrypy.url(relative='server'))
+ user = JwtManager.get_user(token)
+ if user:
+ self._check_authorization(user.username)
+ return
+ self.logger.debug('Unauthorized access to %s',
+ cherrypy.url(relative='server'))
raise cherrypy.HTTPError(401, 'You are not authorized to access '
'that resource')
- def _check_authorization(self, token):
- logger.debug("AMT: checking authorization...")
- username = token['username']
+ def _check_authorization(self, username):
+ self.logger.debug("checking authorization...")
+ username = username
handler = cherrypy.request.handler.callable
controller = handler.__self__
sec_scope = getattr(controller, '_security_scope', None)
sec_perms = getattr(handler, '_security_permissions', None)
- JwtManager.set_user(token)
+ JwtManager.set_user(username)
if not sec_scope:
# controller does not define any authorization restrictions
return
- logger.debug("AMT: checking '%s' access to '%s' scope", sec_perms,
- sec_scope)
+ self.logger.debug("checking '%s' access to '%s' scope", sec_perms,
+ sec_scope)
if not sec_perms:
- logger.debug("Fail to check permission on: %s:%s", controller,
- handler)
+ self.logger.debug("Fail to check permission on: %s:%s", controller,
+ handler)
raise cherrypy.HTTPError(403, "You don't have permissions to "
"access that resource")