fernet.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. import base64
  5. import binascii
  6. import os
  7. import time
  8. import typing
  9. from cryptography import utils
  10. from cryptography.exceptions import InvalidSignature
  11. from cryptography.hazmat.primitives import hashes, padding
  12. from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
  13. from cryptography.hazmat.primitives.hmac import HMAC
  14. class InvalidToken(Exception):
  15. pass
  16. _MAX_CLOCK_SKEW = 60
  17. class Fernet:
  18. def __init__(
  19. self,
  20. key: typing.Union[bytes, str],
  21. backend: typing.Any = None,
  22. ) -> None:
  23. try:
  24. key = base64.urlsafe_b64decode(key)
  25. except binascii.Error as exc:
  26. raise ValueError(
  27. "Fernet key must be 32 url-safe base64-encoded bytes."
  28. ) from exc
  29. if len(key) != 32:
  30. raise ValueError(
  31. "Fernet key must be 32 url-safe base64-encoded bytes."
  32. )
  33. self._signing_key = key[:16]
  34. self._encryption_key = key[16:]
  35. @classmethod
  36. def generate_key(cls) -> bytes:
  37. return base64.urlsafe_b64encode(os.urandom(32))
  38. def encrypt(self, data: bytes) -> bytes:
  39. return self.encrypt_at_time(data, int(time.time()))
  40. def encrypt_at_time(self, data: bytes, current_time: int) -> bytes:
  41. iv = os.urandom(16)
  42. return self._encrypt_from_parts(data, current_time, iv)
  43. def _encrypt_from_parts(
  44. self, data: bytes, current_time: int, iv: bytes
  45. ) -> bytes:
  46. utils._check_bytes("data", data)
  47. padder = padding.PKCS7(algorithms.AES.block_size).padder()
  48. padded_data = padder.update(data) + padder.finalize()
  49. encryptor = Cipher(
  50. algorithms.AES(self._encryption_key),
  51. modes.CBC(iv),
  52. ).encryptor()
  53. ciphertext = encryptor.update(padded_data) + encryptor.finalize()
  54. basic_parts = (
  55. b"\x80"
  56. + current_time.to_bytes(length=8, byteorder="big")
  57. + iv
  58. + ciphertext
  59. )
  60. h = HMAC(self._signing_key, hashes.SHA256())
  61. h.update(basic_parts)
  62. hmac = h.finalize()
  63. return base64.urlsafe_b64encode(basic_parts + hmac)
  64. def decrypt(
  65. self, token: typing.Union[bytes, str], ttl: typing.Optional[int] = None
  66. ) -> bytes:
  67. timestamp, data = Fernet._get_unverified_token_data(token)
  68. if ttl is None:
  69. time_info = None
  70. else:
  71. time_info = (ttl, int(time.time()))
  72. return self._decrypt_data(data, timestamp, time_info)
  73. def decrypt_at_time(
  74. self, token: typing.Union[bytes, str], ttl: int, current_time: int
  75. ) -> bytes:
  76. if ttl is None:
  77. raise ValueError(
  78. "decrypt_at_time() can only be used with a non-None ttl"
  79. )
  80. timestamp, data = Fernet._get_unverified_token_data(token)
  81. return self._decrypt_data(data, timestamp, (ttl, current_time))
  82. def extract_timestamp(self, token: typing.Union[bytes, str]) -> int:
  83. timestamp, data = Fernet._get_unverified_token_data(token)
  84. # Verify the token was not tampered with.
  85. self._verify_signature(data)
  86. return timestamp
  87. @staticmethod
  88. def _get_unverified_token_data(
  89. token: typing.Union[bytes, str]
  90. ) -> typing.Tuple[int, bytes]:
  91. if not isinstance(token, (str, bytes)):
  92. raise TypeError("token must be bytes or str")
  93. try:
  94. data = base64.urlsafe_b64decode(token)
  95. except (TypeError, binascii.Error):
  96. raise InvalidToken
  97. if not data or data[0] != 0x80:
  98. raise InvalidToken
  99. if len(data) < 9:
  100. raise InvalidToken
  101. timestamp = int.from_bytes(data[1:9], byteorder="big")
  102. return timestamp, data
  103. def _verify_signature(self, data: bytes) -> None:
  104. h = HMAC(self._signing_key, hashes.SHA256())
  105. h.update(data[:-32])
  106. try:
  107. h.verify(data[-32:])
  108. except InvalidSignature:
  109. raise InvalidToken
  110. def _decrypt_data(
  111. self,
  112. data: bytes,
  113. timestamp: int,
  114. time_info: typing.Optional[typing.Tuple[int, int]],
  115. ) -> bytes:
  116. if time_info is not None:
  117. ttl, current_time = time_info
  118. if timestamp + ttl < current_time:
  119. raise InvalidToken
  120. if current_time + _MAX_CLOCK_SKEW < timestamp:
  121. raise InvalidToken
  122. self._verify_signature(data)
  123. iv = data[9:25]
  124. ciphertext = data[25:-32]
  125. decryptor = Cipher(
  126. algorithms.AES(self._encryption_key), modes.CBC(iv)
  127. ).decryptor()
  128. plaintext_padded = decryptor.update(ciphertext)
  129. try:
  130. plaintext_padded += decryptor.finalize()
  131. except ValueError:
  132. raise InvalidToken
  133. unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
  134. unpadded = unpadder.update(plaintext_padded)
  135. try:
  136. unpadded += unpadder.finalize()
  137. except ValueError:
  138. raise InvalidToken
  139. return unpadded
  140. class MultiFernet:
  141. def __init__(self, fernets: typing.Iterable[Fernet]):
  142. fernets = list(fernets)
  143. if not fernets:
  144. raise ValueError(
  145. "MultiFernet requires at least one Fernet instance"
  146. )
  147. self._fernets = fernets
  148. def encrypt(self, msg: bytes) -> bytes:
  149. return self.encrypt_at_time(msg, int(time.time()))
  150. def encrypt_at_time(self, msg: bytes, current_time: int) -> bytes:
  151. return self._fernets[0].encrypt_at_time(msg, current_time)
  152. def rotate(self, msg: typing.Union[bytes, str]) -> bytes:
  153. timestamp, data = Fernet._get_unverified_token_data(msg)
  154. for f in self._fernets:
  155. try:
  156. p = f._decrypt_data(data, timestamp, None)
  157. break
  158. except InvalidToken:
  159. pass
  160. else:
  161. raise InvalidToken
  162. iv = os.urandom(16)
  163. return self._fernets[0]._encrypt_from_parts(p, timestamp, iv)
  164. def decrypt(
  165. self, msg: typing.Union[bytes, str], ttl: typing.Optional[int] = None
  166. ) -> bytes:
  167. for f in self._fernets:
  168. try:
  169. return f.decrypt(msg, ttl)
  170. except InvalidToken:
  171. pass
  172. raise InvalidToken
  173. def decrypt_at_time(
  174. self, msg: typing.Union[bytes, str], ttl: int, current_time: int
  175. ) -> bytes:
  176. for f in self._fernets:
  177. try:
  178. return f.decrypt_at_time(msg, ttl, current_time)
  179. except InvalidToken:
  180. pass
  181. raise InvalidToken