1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- from typing import Any
- from flask import current_app
- from flask_jwt_extended import JWTManager
- from flask_jwt_extended.exceptions import RevokedTokenError
- from flask_jwt_extended.exceptions import UserClaimsVerificationError
- from flask_jwt_extended.exceptions import WrongTokenError
- def get_jwt_manager() -> JWTManager:
- try:
- return current_app.extensions["flask-jwt-extended"]
- except KeyError: # pragma: no cover
- raise RuntimeError(
- "You must initialize a JWTManager with this flask "
- "application before using this method"
- ) from None
- def has_user_lookup() -> bool:
- jwt_manager = get_jwt_manager()
- return jwt_manager._user_lookup_callback is not None
- def user_lookup(*args, **kwargs) -> Any:
- jwt_manager = get_jwt_manager()
- return jwt_manager._user_lookup_callback and jwt_manager._user_lookup_callback(
- *args, **kwargs
- )
- def verify_token_type(decoded_token: dict, refresh: bool) -> None:
- if not refresh and decoded_token["type"] == "refresh":
- raise WrongTokenError("Only non-refresh tokens are allowed")
- elif refresh and decoded_token["type"] != "refresh":
- raise WrongTokenError("Only refresh tokens are allowed")
- def verify_token_not_blocklisted(jwt_header: dict, jwt_data: dict) -> None:
- jwt_manager = get_jwt_manager()
- if jwt_manager._token_in_blocklist_callback(jwt_header, jwt_data):
- raise RevokedTokenError(jwt_header, jwt_data)
- def custom_verification_for_token(jwt_header: dict, jwt_data: dict) -> None:
- jwt_manager = get_jwt_manager()
- if not jwt_manager._token_verification_callback(jwt_header, jwt_data):
- error_msg = "User claims verification failed"
- raise UserClaimsVerificationError(error_msg, jwt_header, jwt_data)
|