hashes.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. import typing
  5. from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
  6. from cryptography.hazmat.primitives import hashes
  7. if typing.TYPE_CHECKING:
  8. from cryptography.hazmat.backends.openssl.backend import Backend
  9. class _HashContext(hashes.HashContext):
  10. def __init__(
  11. self, backend: "Backend", algorithm: hashes.HashAlgorithm, ctx=None
  12. ) -> None:
  13. self._algorithm = algorithm
  14. self._backend = backend
  15. if ctx is None:
  16. ctx = self._backend._lib.EVP_MD_CTX_new()
  17. ctx = self._backend._ffi.gc(
  18. ctx, self._backend._lib.EVP_MD_CTX_free
  19. )
  20. evp_md = self._backend._evp_md_from_algorithm(algorithm)
  21. if evp_md == self._backend._ffi.NULL:
  22. raise UnsupportedAlgorithm(
  23. "{} is not a supported hash on this backend.".format(
  24. algorithm.name
  25. ),
  26. _Reasons.UNSUPPORTED_HASH,
  27. )
  28. res = self._backend._lib.EVP_DigestInit_ex(
  29. ctx, evp_md, self._backend._ffi.NULL
  30. )
  31. self._backend.openssl_assert(res != 0)
  32. self._ctx = ctx
  33. @property
  34. def algorithm(self) -> hashes.HashAlgorithm:
  35. return self._algorithm
  36. def copy(self) -> "_HashContext":
  37. copied_ctx = self._backend._lib.EVP_MD_CTX_new()
  38. copied_ctx = self._backend._ffi.gc(
  39. copied_ctx, self._backend._lib.EVP_MD_CTX_free
  40. )
  41. res = self._backend._lib.EVP_MD_CTX_copy_ex(copied_ctx, self._ctx)
  42. self._backend.openssl_assert(res != 0)
  43. return _HashContext(self._backend, self.algorithm, ctx=copied_ctx)
  44. def update(self, data: bytes) -> None:
  45. data_ptr = self._backend._ffi.from_buffer(data)
  46. res = self._backend._lib.EVP_DigestUpdate(
  47. self._ctx, data_ptr, len(data)
  48. )
  49. self._backend.openssl_assert(res != 0)
  50. def finalize(self) -> bytes:
  51. if isinstance(self.algorithm, hashes.ExtendableOutputFunction):
  52. # extendable output functions use a different finalize
  53. return self._finalize_xof()
  54. else:
  55. buf = self._backend._ffi.new(
  56. "unsigned char[]", self._backend._lib.EVP_MAX_MD_SIZE
  57. )
  58. outlen = self._backend._ffi.new("unsigned int *")
  59. res = self._backend._lib.EVP_DigestFinal_ex(self._ctx, buf, outlen)
  60. self._backend.openssl_assert(res != 0)
  61. self._backend.openssl_assert(
  62. outlen[0] == self.algorithm.digest_size
  63. )
  64. return self._backend._ffi.buffer(buf)[: outlen[0]]
  65. def _finalize_xof(self) -> bytes:
  66. buf = self._backend._ffi.new(
  67. "unsigned char[]", self.algorithm.digest_size
  68. )
  69. res = self._backend._lib.EVP_DigestFinalXOF(
  70. self._ctx, buf, self.algorithm.digest_size
  71. )
  72. self._backend.openssl_assert(res != 0)
  73. return self._backend._ffi.buffer(buf)[: self.algorithm.digest_size]