base.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. import abc
  5. import typing
  6. from cryptography.exceptions import (
  7. AlreadyFinalized,
  8. AlreadyUpdated,
  9. NotYetFinalized,
  10. )
  11. from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm
  12. from cryptography.hazmat.primitives.ciphers import modes
  13. if typing.TYPE_CHECKING:
  14. from cryptography.hazmat.backends.openssl.ciphers import (
  15. _CipherContext as _BackendCipherContext,
  16. )
  17. class CipherContext(metaclass=abc.ABCMeta):
  18. @abc.abstractmethod
  19. def update(self, data: bytes) -> bytes:
  20. """
  21. Processes the provided bytes through the cipher and returns the results
  22. as bytes.
  23. """
  24. @abc.abstractmethod
  25. def update_into(self, data: bytes, buf: bytes) -> int:
  26. """
  27. Processes the provided bytes and writes the resulting data into the
  28. provided buffer. Returns the number of bytes written.
  29. """
  30. @abc.abstractmethod
  31. def finalize(self) -> bytes:
  32. """
  33. Returns the results of processing the final block as bytes.
  34. """
  35. class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta):
  36. @abc.abstractmethod
  37. def authenticate_additional_data(self, data: bytes) -> None:
  38. """
  39. Authenticates the provided bytes.
  40. """
  41. class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
  42. @abc.abstractmethod
  43. def finalize_with_tag(self, tag: bytes) -> bytes:
  44. """
  45. Returns the results of processing the final block as bytes and allows
  46. delayed passing of the authentication tag.
  47. """
  48. class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
  49. @abc.abstractproperty
  50. def tag(self) -> bytes:
  51. """
  52. Returns tag bytes. This is only available after encryption is
  53. finalized.
  54. """
  55. Mode = typing.TypeVar(
  56. "Mode", bound=typing.Optional[modes.Mode], covariant=True
  57. )
  58. class Cipher(typing.Generic[Mode]):
  59. def __init__(
  60. self,
  61. algorithm: CipherAlgorithm,
  62. mode: Mode,
  63. backend: typing.Any = None,
  64. ):
  65. if not isinstance(algorithm, CipherAlgorithm):
  66. raise TypeError("Expected interface of CipherAlgorithm.")
  67. if mode is not None:
  68. # mypy needs this assert to narrow the type from our generic
  69. # type. Maybe it won't some time in the future.
  70. assert isinstance(mode, modes.Mode)
  71. mode.validate_for_algorithm(algorithm)
  72. self.algorithm = algorithm
  73. self.mode = mode
  74. @typing.overload
  75. def encryptor(
  76. self: "Cipher[modes.ModeWithAuthenticationTag]",
  77. ) -> AEADEncryptionContext:
  78. ...
  79. @typing.overload
  80. def encryptor(
  81. self: "_CIPHER_TYPE",
  82. ) -> CipherContext:
  83. ...
  84. def encryptor(self):
  85. if isinstance(self.mode, modes.ModeWithAuthenticationTag):
  86. if self.mode.tag is not None:
  87. raise ValueError(
  88. "Authentication tag must be None when encrypting."
  89. )
  90. from cryptography.hazmat.backends.openssl.backend import backend
  91. ctx = backend.create_symmetric_encryption_ctx(
  92. self.algorithm, self.mode
  93. )
  94. return self._wrap_ctx(ctx, encrypt=True)
  95. @typing.overload
  96. def decryptor(
  97. self: "Cipher[modes.ModeWithAuthenticationTag]",
  98. ) -> AEADDecryptionContext:
  99. ...
  100. @typing.overload
  101. def decryptor(
  102. self: "_CIPHER_TYPE",
  103. ) -> CipherContext:
  104. ...
  105. def decryptor(self):
  106. from cryptography.hazmat.backends.openssl.backend import backend
  107. ctx = backend.create_symmetric_decryption_ctx(
  108. self.algorithm, self.mode
  109. )
  110. return self._wrap_ctx(ctx, encrypt=False)
  111. def _wrap_ctx(
  112. self, ctx: "_BackendCipherContext", encrypt: bool
  113. ) -> typing.Union[
  114. AEADEncryptionContext, AEADDecryptionContext, CipherContext
  115. ]:
  116. if isinstance(self.mode, modes.ModeWithAuthenticationTag):
  117. if encrypt:
  118. return _AEADEncryptionContext(ctx)
  119. else:
  120. return _AEADDecryptionContext(ctx)
  121. else:
  122. return _CipherContext(ctx)
  123. _CIPHER_TYPE = Cipher[
  124. typing.Union[
  125. modes.ModeWithNonce,
  126. modes.ModeWithTweak,
  127. None,
  128. modes.ECB,
  129. modes.ModeWithInitializationVector,
  130. ]
  131. ]
  132. class _CipherContext(CipherContext):
  133. _ctx: typing.Optional["_BackendCipherContext"]
  134. def __init__(self, ctx: "_BackendCipherContext") -> None:
  135. self._ctx = ctx
  136. def update(self, data: bytes) -> bytes:
  137. if self._ctx is None:
  138. raise AlreadyFinalized("Context was already finalized.")
  139. return self._ctx.update(data)
  140. def update_into(self, data: bytes, buf: bytes) -> int:
  141. if self._ctx is None:
  142. raise AlreadyFinalized("Context was already finalized.")
  143. return self._ctx.update_into(data, buf)
  144. def finalize(self) -> bytes:
  145. if self._ctx is None:
  146. raise AlreadyFinalized("Context was already finalized.")
  147. data = self._ctx.finalize()
  148. self._ctx = None
  149. return data
  150. class _AEADCipherContext(AEADCipherContext):
  151. _ctx: typing.Optional["_BackendCipherContext"]
  152. _tag: typing.Optional[bytes]
  153. def __init__(self, ctx: "_BackendCipherContext") -> None:
  154. self._ctx = ctx
  155. self._bytes_processed = 0
  156. self._aad_bytes_processed = 0
  157. self._tag = None
  158. self._updated = False
  159. def _check_limit(self, data_size: int) -> None:
  160. if self._ctx is None:
  161. raise AlreadyFinalized("Context was already finalized.")
  162. self._updated = True
  163. self._bytes_processed += data_size
  164. if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES:
  165. raise ValueError(
  166. "{} has a maximum encrypted byte limit of {}".format(
  167. self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES
  168. )
  169. )
  170. def update(self, data: bytes) -> bytes:
  171. self._check_limit(len(data))
  172. # mypy needs this assert even though _check_limit already checked
  173. assert self._ctx is not None
  174. return self._ctx.update(data)
  175. def update_into(self, data: bytes, buf: bytes) -> int:
  176. self._check_limit(len(data))
  177. # mypy needs this assert even though _check_limit already checked
  178. assert self._ctx is not None
  179. return self._ctx.update_into(data, buf)
  180. def finalize(self) -> bytes:
  181. if self._ctx is None:
  182. raise AlreadyFinalized("Context was already finalized.")
  183. data = self._ctx.finalize()
  184. self._tag = self._ctx.tag
  185. self._ctx = None
  186. return data
  187. def authenticate_additional_data(self, data: bytes) -> None:
  188. if self._ctx is None:
  189. raise AlreadyFinalized("Context was already finalized.")
  190. if self._updated:
  191. raise AlreadyUpdated("Update has been called on this context.")
  192. self._aad_bytes_processed += len(data)
  193. if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES:
  194. raise ValueError(
  195. "{} has a maximum AAD byte limit of {}".format(
  196. self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES
  197. )
  198. )
  199. self._ctx.authenticate_additional_data(data)
  200. class _AEADDecryptionContext(_AEADCipherContext, AEADDecryptionContext):
  201. def finalize_with_tag(self, tag: bytes) -> bytes:
  202. if self._ctx is None:
  203. raise AlreadyFinalized("Context was already finalized.")
  204. data = self._ctx.finalize_with_tag(tag)
  205. self._tag = self._ctx.tag
  206. self._ctx = None
  207. return data
  208. class _AEADEncryptionContext(_AEADCipherContext, AEADEncryptionContext):
  209. @property
  210. def tag(self) -> bytes:
  211. if self._ctx is not None:
  212. raise NotYetFinalized(
  213. "You must finalize encryption before " "getting the tag."
  214. )
  215. assert self._tag is not None
  216. return self._tag