internal_utils.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. from typing import Any
  2. from flask import current_app
  3. from flask_jwt_extended import JWTManager
  4. from flask_jwt_extended.exceptions import RevokedTokenError
  5. from flask_jwt_extended.exceptions import UserClaimsVerificationError
  6. from flask_jwt_extended.exceptions import WrongTokenError
  7. def get_jwt_manager() -> JWTManager:
  8. try:
  9. return current_app.extensions["flask-jwt-extended"]
  10. except KeyError: # pragma: no cover
  11. raise RuntimeError(
  12. "You must initialize a JWTManager with this flask "
  13. "application before using this method"
  14. ) from None
  15. def has_user_lookup() -> bool:
  16. jwt_manager = get_jwt_manager()
  17. return jwt_manager._user_lookup_callback is not None
  18. def user_lookup(*args, **kwargs) -> Any:
  19. jwt_manager = get_jwt_manager()
  20. return jwt_manager._user_lookup_callback and jwt_manager._user_lookup_callback(
  21. *args, **kwargs
  22. )
  23. def verify_token_type(decoded_token: dict, refresh: bool) -> None:
  24. if not refresh and decoded_token["type"] == "refresh":
  25. raise WrongTokenError("Only non-refresh tokens are allowed")
  26. elif refresh and decoded_token["type"] != "refresh":
  27. raise WrongTokenError("Only refresh tokens are allowed")
  28. def verify_token_not_blocklisted(jwt_header: dict, jwt_data: dict) -> None:
  29. jwt_manager = get_jwt_manager()
  30. if jwt_manager._token_in_blocklist_callback(jwt_header, jwt_data):
  31. raise RevokedTokenError(jwt_header, jwt_data)
  32. def custom_verification_for_token(jwt_header: dict, jwt_data: dict) -> None:
  33. jwt_manager = get_jwt_manager()
  34. if not jwt_manager._token_verification_callback(jwt_header, jwt_data):
  35. error_msg = "User claims verification failed"
  36. raise UserClaimsVerificationError(error_msg, jwt_header, jwt_data)