cmac.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. import typing
  5. from cryptography.exceptions import (
  6. InvalidSignature,
  7. UnsupportedAlgorithm,
  8. _Reasons,
  9. )
  10. from cryptography.hazmat.primitives import constant_time
  11. from cryptography.hazmat.primitives.ciphers.modes import CBC
  12. if typing.TYPE_CHECKING:
  13. from cryptography.hazmat.primitives import ciphers
  14. from cryptography.hazmat.backends.openssl.backend import Backend
  15. class _CMACContext:
  16. def __init__(
  17. self,
  18. backend: "Backend",
  19. algorithm: "ciphers.BlockCipherAlgorithm",
  20. ctx=None,
  21. ) -> None:
  22. if not backend.cmac_algorithm_supported(algorithm):
  23. raise UnsupportedAlgorithm(
  24. "This backend does not support CMAC.",
  25. _Reasons.UNSUPPORTED_CIPHER,
  26. )
  27. self._backend = backend
  28. self._key = algorithm.key
  29. self._algorithm = algorithm
  30. self._output_length = algorithm.block_size // 8
  31. if ctx is None:
  32. registry = self._backend._cipher_registry
  33. adapter = registry[type(algorithm), CBC]
  34. evp_cipher = adapter(self._backend, algorithm, CBC)
  35. ctx = self._backend._lib.CMAC_CTX_new()
  36. self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
  37. ctx = self._backend._ffi.gc(ctx, self._backend._lib.CMAC_CTX_free)
  38. key_ptr = self._backend._ffi.from_buffer(self._key)
  39. res = self._backend._lib.CMAC_Init(
  40. ctx,
  41. key_ptr,
  42. len(self._key),
  43. evp_cipher,
  44. self._backend._ffi.NULL,
  45. )
  46. self._backend.openssl_assert(res == 1)
  47. self._ctx = ctx
  48. def update(self, data: bytes) -> None:
  49. res = self._backend._lib.CMAC_Update(self._ctx, data, len(data))
  50. self._backend.openssl_assert(res == 1)
  51. def finalize(self) -> bytes:
  52. buf = self._backend._ffi.new("unsigned char[]", self._output_length)
  53. length = self._backend._ffi.new("size_t *", self._output_length)
  54. res = self._backend._lib.CMAC_Final(self._ctx, buf, length)
  55. self._backend.openssl_assert(res == 1)
  56. self._ctx = None
  57. return self._backend._ffi.buffer(buf)[:]
  58. def copy(self) -> "_CMACContext":
  59. copied_ctx = self._backend._lib.CMAC_CTX_new()
  60. copied_ctx = self._backend._ffi.gc(
  61. copied_ctx, self._backend._lib.CMAC_CTX_free
  62. )
  63. res = self._backend._lib.CMAC_CTX_copy(copied_ctx, self._ctx)
  64. self._backend.openssl_assert(res == 1)
  65. return _CMACContext(self._backend, self._algorithm, ctx=copied_ctx)
  66. def verify(self, signature: bytes) -> None:
  67. digest = self.finalize()
  68. if not constant_time.bytes_eq(digest, signature):
  69. raise InvalidSignature("Signature did not match digest.")