backends.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from django.utils.translation import gettext_lazy as _
  2. import jwt
  3. from jwt import InvalidAlgorithmError, InvalidTokenError, PyJWKClient, algorithms
  4. from .exceptions import TokenBackendError
  5. from .utils import format_lazy
  6. ALLOWED_ALGORITHMS = (
  7. 'HS256',
  8. 'HS384',
  9. 'HS512',
  10. 'RS256',
  11. 'RS384',
  12. 'RS512',
  13. )
  14. class TokenBackend:
  15. def __init__(
  16. self,
  17. algorithm,
  18. signing_key=None,
  19. verifying_key=None,
  20. audience=None,
  21. issuer=None,
  22. jwk_url: str = None,
  23. leeway=0,
  24. ):
  25. self._validate_algorithm(algorithm)
  26. self.algorithm = algorithm
  27. self.signing_key = signing_key
  28. self.audience = audience
  29. self.issuer = issuer
  30. self.jwks_client = PyJWKClient(jwk_url) if jwk_url else None
  31. self.leeway = leeway
  32. if algorithm.startswith("HS"):
  33. self.verifying_key = signing_key
  34. else:
  35. self.verifying_key = verifying_key
  36. def _validate_algorithm(self, algorithm):
  37. """
  38. Ensure that the nominated algorithm is recognized, and that cryptography is installed for those
  39. algorithms that require it
  40. """
  41. if algorithm not in ALLOWED_ALGORITHMS:
  42. raise TokenBackendError(format_lazy(_("Unrecognized algorithm type '{}'"), algorithm))
  43. if algorithm in algorithms.requires_cryptography and not algorithms.has_crypto:
  44. raise TokenBackendError(format_lazy(_("You must have cryptography installed to use {}."), algorithm))
  45. def get_verifying_key(self, token):
  46. if self.algorithm.startswith("HS"):
  47. return self.signing_key
  48. if self.jwks_client:
  49. return self.jwks_client.get_signing_key_from_jwt(token).key
  50. return self.verifying_key
  51. def encode(self, payload):
  52. """
  53. Returns an encoded token for the given payload dictionary.
  54. """
  55. jwt_payload = payload.copy()
  56. if self.audience is not None:
  57. jwt_payload['aud'] = self.audience
  58. if self.issuer is not None:
  59. jwt_payload['iss'] = self.issuer
  60. token = jwt.encode(jwt_payload, self.signing_key, algorithm=self.algorithm)
  61. if isinstance(token, bytes):
  62. # For PyJWT <= 1.7.1
  63. return token.decode('utf-8')
  64. # For PyJWT >= 2.0.0a1
  65. return token
  66. def decode(self, token, verify=True):
  67. """
  68. Performs a validation of the given token and returns its payload
  69. dictionary.
  70. Raises a `TokenBackendError` if the token is malformed, if its
  71. signature check fails, or if its 'exp' claim indicates it has expired.
  72. """
  73. try:
  74. return jwt.decode(
  75. token,
  76. self.get_verifying_key(token),
  77. algorithms=[self.algorithm],
  78. verify=verify,
  79. audience=self.audience,
  80. issuer=self.issuer,
  81. leeway=self.leeway,
  82. options={
  83. 'verify_aud': self.audience is not None,
  84. 'verify_signature': verify,
  85. },
  86. )
  87. except InvalidAlgorithmError as ex:
  88. raise TokenBackendError(_('Invalid algorithm specified')) from ex
  89. except InvalidTokenError:
  90. raise TokenBackendError(_('Token is invalid or expired'))