dh.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. import typing
  5. from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
  6. from cryptography.hazmat.primitives import serialization
  7. from cryptography.hazmat.primitives.asymmetric import dh
  8. if typing.TYPE_CHECKING:
  9. from cryptography.hazmat.backends.openssl.backend import Backend
  10. def _dh_params_dup(dh_cdata, backend: "Backend"):
  11. lib = backend._lib
  12. ffi = backend._ffi
  13. param_cdata = lib.DHparams_dup(dh_cdata)
  14. backend.openssl_assert(param_cdata != ffi.NULL)
  15. param_cdata = ffi.gc(param_cdata, lib.DH_free)
  16. if lib.CRYPTOGRAPHY_IS_LIBRESSL:
  17. # In libressl DHparams_dup don't copy q
  18. q = ffi.new("BIGNUM **")
  19. lib.DH_get0_pqg(dh_cdata, ffi.NULL, q, ffi.NULL)
  20. q_dup = lib.BN_dup(q[0])
  21. res = lib.DH_set0_pqg(param_cdata, ffi.NULL, q_dup, ffi.NULL)
  22. backend.openssl_assert(res == 1)
  23. return param_cdata
  24. def _dh_cdata_to_parameters(dh_cdata, backend: "Backend") -> "_DHParameters":
  25. param_cdata = _dh_params_dup(dh_cdata, backend)
  26. return _DHParameters(backend, param_cdata)
  27. class _DHParameters(dh.DHParameters):
  28. def __init__(self, backend: "Backend", dh_cdata):
  29. self._backend = backend
  30. self._dh_cdata = dh_cdata
  31. def parameter_numbers(self) -> dh.DHParameterNumbers:
  32. p = self._backend._ffi.new("BIGNUM **")
  33. g = self._backend._ffi.new("BIGNUM **")
  34. q = self._backend._ffi.new("BIGNUM **")
  35. self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
  36. self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
  37. self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
  38. q_val: typing.Optional[int]
  39. if q[0] == self._backend._ffi.NULL:
  40. q_val = None
  41. else:
  42. q_val = self._backend._bn_to_int(q[0])
  43. return dh.DHParameterNumbers(
  44. p=self._backend._bn_to_int(p[0]),
  45. g=self._backend._bn_to_int(g[0]),
  46. q=q_val,
  47. )
  48. def generate_private_key(self) -> dh.DHPrivateKey:
  49. return self._backend.generate_dh_private_key(self)
  50. def parameter_bytes(
  51. self,
  52. encoding: serialization.Encoding,
  53. format: serialization.ParameterFormat,
  54. ) -> bytes:
  55. if encoding is serialization.Encoding.OpenSSH:
  56. raise TypeError("OpenSSH encoding is not supported")
  57. if format is not serialization.ParameterFormat.PKCS3:
  58. raise ValueError("Only PKCS3 serialization is supported")
  59. q = self._backend._ffi.new("BIGNUM **")
  60. self._backend._lib.DH_get0_pqg(
  61. self._dh_cdata, self._backend._ffi.NULL, q, self._backend._ffi.NULL
  62. )
  63. if (
  64. q[0] != self._backend._ffi.NULL
  65. and not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX
  66. ):
  67. raise UnsupportedAlgorithm(
  68. "DH X9.42 serialization is not supported",
  69. _Reasons.UNSUPPORTED_SERIALIZATION,
  70. )
  71. if encoding is serialization.Encoding.PEM:
  72. if q[0] != self._backend._ffi.NULL:
  73. write_bio = self._backend._lib.PEM_write_bio_DHxparams
  74. else:
  75. write_bio = self._backend._lib.PEM_write_bio_DHparams
  76. elif encoding is serialization.Encoding.DER:
  77. if q[0] != self._backend._ffi.NULL:
  78. write_bio = self._backend._lib.Cryptography_i2d_DHxparams_bio
  79. else:
  80. write_bio = self._backend._lib.i2d_DHparams_bio
  81. else:
  82. raise TypeError("encoding must be an item from the Encoding enum")
  83. bio = self._backend._create_mem_bio_gc()
  84. res = write_bio(bio, self._dh_cdata)
  85. self._backend.openssl_assert(res == 1)
  86. return self._backend._read_mem_bio(bio)
  87. def _get_dh_num_bits(backend, dh_cdata) -> int:
  88. p = backend._ffi.new("BIGNUM **")
  89. backend._lib.DH_get0_pqg(dh_cdata, p, backend._ffi.NULL, backend._ffi.NULL)
  90. backend.openssl_assert(p[0] != backend._ffi.NULL)
  91. return backend._lib.BN_num_bits(p[0])
  92. class _DHPrivateKey(dh.DHPrivateKey):
  93. def __init__(self, backend: "Backend", dh_cdata, evp_pkey):
  94. self._backend = backend
  95. self._dh_cdata = dh_cdata
  96. self._evp_pkey = evp_pkey
  97. self._key_size_bytes = self._backend._lib.DH_size(dh_cdata)
  98. @property
  99. def key_size(self) -> int:
  100. return _get_dh_num_bits(self._backend, self._dh_cdata)
  101. def private_numbers(self) -> dh.DHPrivateNumbers:
  102. p = self._backend._ffi.new("BIGNUM **")
  103. g = self._backend._ffi.new("BIGNUM **")
  104. q = self._backend._ffi.new("BIGNUM **")
  105. self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
  106. self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
  107. self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
  108. if q[0] == self._backend._ffi.NULL:
  109. q_val = None
  110. else:
  111. q_val = self._backend._bn_to_int(q[0])
  112. pub_key = self._backend._ffi.new("BIGNUM **")
  113. priv_key = self._backend._ffi.new("BIGNUM **")
  114. self._backend._lib.DH_get0_key(self._dh_cdata, pub_key, priv_key)
  115. self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
  116. self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL)
  117. return dh.DHPrivateNumbers(
  118. public_numbers=dh.DHPublicNumbers(
  119. parameter_numbers=dh.DHParameterNumbers(
  120. p=self._backend._bn_to_int(p[0]),
  121. g=self._backend._bn_to_int(g[0]),
  122. q=q_val,
  123. ),
  124. y=self._backend._bn_to_int(pub_key[0]),
  125. ),
  126. x=self._backend._bn_to_int(priv_key[0]),
  127. )
  128. def exchange(self, peer_public_key: dh.DHPublicKey) -> bytes:
  129. if not isinstance(peer_public_key, _DHPublicKey):
  130. raise TypeError("peer_public_key must be a DHPublicKey")
  131. ctx = self._backend._lib.EVP_PKEY_CTX_new(
  132. self._evp_pkey, self._backend._ffi.NULL
  133. )
  134. self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
  135. ctx = self._backend._ffi.gc(ctx, self._backend._lib.EVP_PKEY_CTX_free)
  136. res = self._backend._lib.EVP_PKEY_derive_init(ctx)
  137. self._backend.openssl_assert(res == 1)
  138. res = self._backend._lib.EVP_PKEY_derive_set_peer(
  139. ctx, peer_public_key._evp_pkey
  140. )
  141. # Invalid kex errors here in OpenSSL 3.0 because checks were moved
  142. # to EVP_PKEY_derive_set_peer
  143. self._exchange_assert(res == 1)
  144. keylen = self._backend._ffi.new("size_t *")
  145. res = self._backend._lib.EVP_PKEY_derive(
  146. ctx, self._backend._ffi.NULL, keylen
  147. )
  148. # Invalid kex errors here in OpenSSL < 3
  149. self._exchange_assert(res == 1)
  150. self._backend.openssl_assert(keylen[0] > 0)
  151. buf = self._backend._ffi.new("unsigned char[]", keylen[0])
  152. res = self._backend._lib.EVP_PKEY_derive(ctx, buf, keylen)
  153. self._backend.openssl_assert(res == 1)
  154. key = self._backend._ffi.buffer(buf, keylen[0])[:]
  155. pad = self._key_size_bytes - len(key)
  156. if pad > 0:
  157. key = (b"\x00" * pad) + key
  158. return key
  159. def _exchange_assert(self, ok: bool) -> None:
  160. if not ok:
  161. errors_with_text = self._backend._consume_errors_with_text()
  162. raise ValueError(
  163. "Error computing shared key.",
  164. errors_with_text,
  165. )
  166. def public_key(self) -> dh.DHPublicKey:
  167. dh_cdata = _dh_params_dup(self._dh_cdata, self._backend)
  168. pub_key = self._backend._ffi.new("BIGNUM **")
  169. self._backend._lib.DH_get0_key(
  170. self._dh_cdata, pub_key, self._backend._ffi.NULL
  171. )
  172. self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
  173. pub_key_dup = self._backend._lib.BN_dup(pub_key[0])
  174. self._backend.openssl_assert(pub_key_dup != self._backend._ffi.NULL)
  175. res = self._backend._lib.DH_set0_key(
  176. dh_cdata, pub_key_dup, self._backend._ffi.NULL
  177. )
  178. self._backend.openssl_assert(res == 1)
  179. evp_pkey = self._backend._dh_cdata_to_evp_pkey(dh_cdata)
  180. return _DHPublicKey(self._backend, dh_cdata, evp_pkey)
  181. def parameters(self) -> dh.DHParameters:
  182. return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
  183. def private_bytes(
  184. self,
  185. encoding: serialization.Encoding,
  186. format: serialization.PrivateFormat,
  187. encryption_algorithm: serialization.KeySerializationEncryption,
  188. ) -> bytes:
  189. if format is not serialization.PrivateFormat.PKCS8:
  190. raise ValueError(
  191. "DH private keys support only PKCS8 serialization"
  192. )
  193. if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
  194. q = self._backend._ffi.new("BIGNUM **")
  195. self._backend._lib.DH_get0_pqg(
  196. self._dh_cdata,
  197. self._backend._ffi.NULL,
  198. q,
  199. self._backend._ffi.NULL,
  200. )
  201. if q[0] != self._backend._ffi.NULL:
  202. raise UnsupportedAlgorithm(
  203. "DH X9.42 serialization is not supported",
  204. _Reasons.UNSUPPORTED_SERIALIZATION,
  205. )
  206. return self._backend._private_key_bytes(
  207. encoding,
  208. format,
  209. encryption_algorithm,
  210. self,
  211. self._evp_pkey,
  212. self._dh_cdata,
  213. )
  214. class _DHPublicKey(dh.DHPublicKey):
  215. def __init__(self, backend: "Backend", dh_cdata, evp_pkey):
  216. self._backend = backend
  217. self._dh_cdata = dh_cdata
  218. self._evp_pkey = evp_pkey
  219. self._key_size_bits = _get_dh_num_bits(self._backend, self._dh_cdata)
  220. @property
  221. def key_size(self) -> int:
  222. return self._key_size_bits
  223. def public_numbers(self) -> dh.DHPublicNumbers:
  224. p = self._backend._ffi.new("BIGNUM **")
  225. g = self._backend._ffi.new("BIGNUM **")
  226. q = self._backend._ffi.new("BIGNUM **")
  227. self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
  228. self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
  229. self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
  230. if q[0] == self._backend._ffi.NULL:
  231. q_val = None
  232. else:
  233. q_val = self._backend._bn_to_int(q[0])
  234. pub_key = self._backend._ffi.new("BIGNUM **")
  235. self._backend._lib.DH_get0_key(
  236. self._dh_cdata, pub_key, self._backend._ffi.NULL
  237. )
  238. self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
  239. return dh.DHPublicNumbers(
  240. parameter_numbers=dh.DHParameterNumbers(
  241. p=self._backend._bn_to_int(p[0]),
  242. g=self._backend._bn_to_int(g[0]),
  243. q=q_val,
  244. ),
  245. y=self._backend._bn_to_int(pub_key[0]),
  246. )
  247. def parameters(self) -> dh.DHParameters:
  248. return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
  249. def public_bytes(
  250. self,
  251. encoding: serialization.Encoding,
  252. format: serialization.PublicFormat,
  253. ) -> bytes:
  254. if format is not serialization.PublicFormat.SubjectPublicKeyInfo:
  255. raise ValueError(
  256. "DH public keys support only "
  257. "SubjectPublicKeyInfo serialization"
  258. )
  259. if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
  260. q = self._backend._ffi.new("BIGNUM **")
  261. self._backend._lib.DH_get0_pqg(
  262. self._dh_cdata,
  263. self._backend._ffi.NULL,
  264. q,
  265. self._backend._ffi.NULL,
  266. )
  267. if q[0] != self._backend._ffi.NULL:
  268. raise UnsupportedAlgorithm(
  269. "DH X9.42 serialization is not supported",
  270. _Reasons.UNSUPPORTED_SERIALIZATION,
  271. )
  272. return self._backend._public_key_bytes(
  273. encoding, format, self, self._evp_pkey, None
  274. )