backend.py 94 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. import collections
  5. import contextlib
  6. import itertools
  7. import typing
  8. import warnings
  9. from contextlib import contextmanager
  10. from cryptography import utils, x509
  11. from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
  12. from cryptography.hazmat.backends.openssl import aead
  13. from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
  14. from cryptography.hazmat.backends.openssl.cmac import _CMACContext
  15. from cryptography.hazmat.backends.openssl.dh import (
  16. _DHParameters,
  17. _DHPrivateKey,
  18. _DHPublicKey,
  19. _dh_params_dup,
  20. )
  21. from cryptography.hazmat.backends.openssl.dsa import (
  22. _DSAParameters,
  23. _DSAPrivateKey,
  24. _DSAPublicKey,
  25. )
  26. from cryptography.hazmat.backends.openssl.ec import (
  27. _EllipticCurvePrivateKey,
  28. _EllipticCurvePublicKey,
  29. )
  30. from cryptography.hazmat.backends.openssl.ed25519 import (
  31. _Ed25519PrivateKey,
  32. _Ed25519PublicKey,
  33. )
  34. from cryptography.hazmat.backends.openssl.ed448 import (
  35. _ED448_KEY_SIZE,
  36. _Ed448PrivateKey,
  37. _Ed448PublicKey,
  38. )
  39. from cryptography.hazmat.backends.openssl.hashes import _HashContext
  40. from cryptography.hazmat.backends.openssl.hmac import _HMACContext
  41. from cryptography.hazmat.backends.openssl.poly1305 import (
  42. _POLY1305_KEY_SIZE,
  43. _Poly1305Context,
  44. )
  45. from cryptography.hazmat.backends.openssl.rsa import (
  46. _RSAPrivateKey,
  47. _RSAPublicKey,
  48. )
  49. from cryptography.hazmat.backends.openssl.x25519 import (
  50. _X25519PrivateKey,
  51. _X25519PublicKey,
  52. )
  53. from cryptography.hazmat.backends.openssl.x448 import (
  54. _X448PrivateKey,
  55. _X448PublicKey,
  56. )
  57. from cryptography.hazmat.bindings._rust import (
  58. x509 as rust_x509,
  59. )
  60. from cryptography.hazmat.bindings.openssl import binding
  61. from cryptography.hazmat.primitives import hashes, serialization
  62. from cryptography.hazmat.primitives._asymmetric import AsymmetricPadding
  63. from cryptography.hazmat.primitives.asymmetric import (
  64. dh,
  65. dsa,
  66. ec,
  67. ed25519,
  68. ed448,
  69. rsa,
  70. x25519,
  71. x448,
  72. )
  73. from cryptography.hazmat.primitives.asymmetric.padding import (
  74. MGF1,
  75. OAEP,
  76. PKCS1v15,
  77. PSS,
  78. )
  79. from cryptography.hazmat.primitives.asymmetric.types import (
  80. CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES,
  81. PRIVATE_KEY_TYPES,
  82. PUBLIC_KEY_TYPES,
  83. )
  84. from cryptography.hazmat.primitives.ciphers import (
  85. BlockCipherAlgorithm,
  86. CipherAlgorithm,
  87. )
  88. from cryptography.hazmat.primitives.ciphers.algorithms import (
  89. AES,
  90. ARC4,
  91. Camellia,
  92. ChaCha20,
  93. SM4,
  94. TripleDES,
  95. _BlowfishInternal,
  96. _CAST5Internal,
  97. _IDEAInternal,
  98. _SEEDInternal,
  99. )
  100. from cryptography.hazmat.primitives.ciphers.modes import (
  101. CBC,
  102. CFB,
  103. CFB8,
  104. CTR,
  105. ECB,
  106. GCM,
  107. Mode,
  108. OFB,
  109. XTS,
  110. )
  111. from cryptography.hazmat.primitives.kdf import scrypt
  112. from cryptography.hazmat.primitives.serialization import pkcs7, ssh
  113. from cryptography.hazmat.primitives.serialization.pkcs12 import (
  114. PKCS12Certificate,
  115. PKCS12KeyAndCertificates,
  116. _ALLOWED_PKCS12_TYPES,
  117. _PKCS12_CAS_TYPES,
  118. )
  119. _MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"])
  120. # Not actually supported, just used as a marker for some serialization tests.
  121. class _RC2:
  122. pass
  123. class Backend:
  124. """
  125. OpenSSL API binding interfaces.
  126. """
  127. name = "openssl"
  128. # FIPS has opinions about acceptable algorithms and key sizes, but the
  129. # disallowed algorithms are still present in OpenSSL. They just error if
  130. # you try to use them. To avoid that we allowlist the algorithms in
  131. # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are.
  132. _fips_aead = {
  133. b"aes-128-ccm",
  134. b"aes-192-ccm",
  135. b"aes-256-ccm",
  136. b"aes-128-gcm",
  137. b"aes-192-gcm",
  138. b"aes-256-gcm",
  139. }
  140. # TripleDES encryption is disallowed/deprecated throughout 2023 in
  141. # FIPS 140-3. To keep it simple we denylist any use of TripleDES (TDEA).
  142. _fips_ciphers = (AES,)
  143. # Sometimes SHA1 is still permissible. That logic is contained
  144. # within the various *_supported methods.
  145. _fips_hashes = (
  146. hashes.SHA224,
  147. hashes.SHA256,
  148. hashes.SHA384,
  149. hashes.SHA512,
  150. hashes.SHA512_224,
  151. hashes.SHA512_256,
  152. hashes.SHA3_224,
  153. hashes.SHA3_256,
  154. hashes.SHA3_384,
  155. hashes.SHA3_512,
  156. hashes.SHAKE128,
  157. hashes.SHAKE256,
  158. )
  159. _fips_ecdh_curves = (
  160. ec.SECP224R1,
  161. ec.SECP256R1,
  162. ec.SECP384R1,
  163. ec.SECP521R1,
  164. )
  165. _fips_rsa_min_key_size = 2048
  166. _fips_rsa_min_public_exponent = 65537
  167. _fips_dsa_min_modulus = 1 << 2048
  168. _fips_dh_min_key_size = 2048
  169. _fips_dh_min_modulus = 1 << _fips_dh_min_key_size
  170. def __init__(self):
  171. self._binding = binding.Binding()
  172. self._ffi = self._binding.ffi
  173. self._lib = self._binding.lib
  174. self._rsa_skip_check_key = False
  175. self._fips_enabled = self._is_fips_enabled()
  176. self._cipher_registry = {}
  177. self._register_default_ciphers()
  178. if self._fips_enabled and self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
  179. warnings.warn(
  180. "OpenSSL FIPS mode is enabled. Can't enable DRBG fork safety.",
  181. UserWarning,
  182. )
  183. else:
  184. self.activate_osrandom_engine()
  185. self._dh_types = [self._lib.EVP_PKEY_DH]
  186. if self._lib.Cryptography_HAS_EVP_PKEY_DHX:
  187. self._dh_types.append(self._lib.EVP_PKEY_DHX)
  188. def __repr__(self) -> str:
  189. return "<OpenSSLBackend(version: {}, FIPS: {})>".format(
  190. self.openssl_version_text(), self._fips_enabled
  191. )
  192. def openssl_assert(
  193. self,
  194. ok: bool,
  195. errors: typing.Optional[typing.List[binding._OpenSSLError]] = None,
  196. ) -> None:
  197. return binding._openssl_assert(self._lib, ok, errors=errors)
  198. def _is_fips_enabled(self) -> bool:
  199. if self._lib.Cryptography_HAS_300_FIPS:
  200. mode = self._lib.EVP_default_properties_is_fips_enabled(
  201. self._ffi.NULL
  202. )
  203. else:
  204. mode = getattr(self._lib, "FIPS_mode", lambda: 0)()
  205. if mode == 0:
  206. # OpenSSL without FIPS pushes an error on the error stack
  207. self._lib.ERR_clear_error()
  208. return bool(mode)
  209. def _enable_fips(self) -> None:
  210. # This function enables FIPS mode for OpenSSL 3.0.0 on installs that
  211. # have the FIPS provider installed properly.
  212. self._binding._enable_fips()
  213. assert self._is_fips_enabled()
  214. self._fips_enabled = self._is_fips_enabled()
  215. def activate_builtin_random(self) -> None:
  216. if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
  217. # Obtain a new structural reference.
  218. e = self._lib.ENGINE_get_default_RAND()
  219. if e != self._ffi.NULL:
  220. self._lib.ENGINE_unregister_RAND(e)
  221. # Reset the RNG to use the built-in.
  222. res = self._lib.RAND_set_rand_method(self._ffi.NULL)
  223. self.openssl_assert(res == 1)
  224. # decrement the structural reference from get_default_RAND
  225. res = self._lib.ENGINE_finish(e)
  226. self.openssl_assert(res == 1)
  227. @contextlib.contextmanager
  228. def _get_osurandom_engine(self):
  229. # Fetches an engine by id and returns it. This creates a structural
  230. # reference.
  231. e = self._lib.ENGINE_by_id(self._lib.Cryptography_osrandom_engine_id)
  232. self.openssl_assert(e != self._ffi.NULL)
  233. # Initialize the engine for use. This adds a functional reference.
  234. res = self._lib.ENGINE_init(e)
  235. self.openssl_assert(res == 1)
  236. try:
  237. yield e
  238. finally:
  239. # Decrement the structural ref incremented by ENGINE_by_id.
  240. res = self._lib.ENGINE_free(e)
  241. self.openssl_assert(res == 1)
  242. # Decrement the functional ref incremented by ENGINE_init.
  243. res = self._lib.ENGINE_finish(e)
  244. self.openssl_assert(res == 1)
  245. def activate_osrandom_engine(self) -> None:
  246. if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
  247. # Unregister and free the current engine.
  248. self.activate_builtin_random()
  249. with self._get_osurandom_engine() as e:
  250. # Set the engine as the default RAND provider.
  251. res = self._lib.ENGINE_set_default_RAND(e)
  252. self.openssl_assert(res == 1)
  253. # Reset the RNG to use the engine
  254. res = self._lib.RAND_set_rand_method(self._ffi.NULL)
  255. self.openssl_assert(res == 1)
  256. def osrandom_engine_implementation(self) -> str:
  257. buf = self._ffi.new("char[]", 64)
  258. with self._get_osurandom_engine() as e:
  259. res = self._lib.ENGINE_ctrl_cmd(
  260. e, b"get_implementation", len(buf), buf, self._ffi.NULL, 0
  261. )
  262. self.openssl_assert(res > 0)
  263. return self._ffi.string(buf).decode("ascii")
  264. def openssl_version_text(self) -> str:
  265. """
  266. Friendly string name of the loaded OpenSSL library. This is not
  267. necessarily the same version as it was compiled against.
  268. Example: OpenSSL 1.1.1d 10 Sep 2019
  269. """
  270. return self._ffi.string(
  271. self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION)
  272. ).decode("ascii")
  273. def openssl_version_number(self) -> int:
  274. return self._lib.OpenSSL_version_num()
  275. def create_hmac_ctx(
  276. self, key: bytes, algorithm: hashes.HashAlgorithm
  277. ) -> _HMACContext:
  278. return _HMACContext(self, key, algorithm)
  279. def _evp_md_from_algorithm(self, algorithm: hashes.HashAlgorithm):
  280. if algorithm.name == "blake2b" or algorithm.name == "blake2s":
  281. alg = "{}{}".format(
  282. algorithm.name, algorithm.digest_size * 8
  283. ).encode("ascii")
  284. else:
  285. alg = algorithm.name.encode("ascii")
  286. evp_md = self._lib.EVP_get_digestbyname(alg)
  287. return evp_md
  288. def _evp_md_non_null_from_algorithm(self, algorithm: hashes.HashAlgorithm):
  289. evp_md = self._evp_md_from_algorithm(algorithm)
  290. self.openssl_assert(evp_md != self._ffi.NULL)
  291. return evp_md
  292. def hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
  293. if self._fips_enabled and not isinstance(algorithm, self._fips_hashes):
  294. return False
  295. evp_md = self._evp_md_from_algorithm(algorithm)
  296. return evp_md != self._ffi.NULL
  297. def signature_hash_supported(
  298. self, algorithm: hashes.HashAlgorithm
  299. ) -> bool:
  300. # Dedicated check for hashing algorithm use in message digest for
  301. # signatures, e.g. RSA PKCS#1 v1.5 SHA1 (sha1WithRSAEncryption).
  302. if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
  303. return False
  304. return self.hash_supported(algorithm)
  305. def scrypt_supported(self) -> bool:
  306. if self._fips_enabled:
  307. return False
  308. else:
  309. return self._lib.Cryptography_HAS_SCRYPT == 1
  310. def hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
  311. # FIPS mode still allows SHA1 for HMAC
  312. if self._fips_enabled and isinstance(algorithm, hashes.SHA1):
  313. return True
  314. return self.hash_supported(algorithm)
  315. def create_hash_ctx(
  316. self, algorithm: hashes.HashAlgorithm
  317. ) -> hashes.HashContext:
  318. return _HashContext(self, algorithm)
  319. def cipher_supported(self, cipher: CipherAlgorithm, mode: Mode) -> bool:
  320. if self._fips_enabled:
  321. # FIPS mode requires AES. TripleDES is disallowed/deprecated in
  322. # FIPS 140-3.
  323. if not isinstance(cipher, self._fips_ciphers):
  324. return False
  325. try:
  326. adapter = self._cipher_registry[type(cipher), type(mode)]
  327. except KeyError:
  328. return False
  329. evp_cipher = adapter(self, cipher, mode)
  330. return self._ffi.NULL != evp_cipher
  331. def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
  332. if (cipher_cls, mode_cls) in self._cipher_registry:
  333. raise ValueError(
  334. "Duplicate registration for: {} {}.".format(
  335. cipher_cls, mode_cls
  336. )
  337. )
  338. self._cipher_registry[cipher_cls, mode_cls] = adapter
  339. def _register_default_ciphers(self) -> None:
  340. for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
  341. self.register_cipher_adapter(
  342. AES,
  343. mode_cls,
  344. GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
  345. )
  346. for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
  347. self.register_cipher_adapter(
  348. Camellia,
  349. mode_cls,
  350. GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
  351. )
  352. for mode_cls in [CBC, CFB, CFB8, OFB]:
  353. self.register_cipher_adapter(
  354. TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}")
  355. )
  356. self.register_cipher_adapter(
  357. TripleDES, ECB, GetCipherByName("des-ede3")
  358. )
  359. for mode_cls in [CBC, CFB, OFB, ECB]:
  360. self.register_cipher_adapter(
  361. _BlowfishInternal, mode_cls, GetCipherByName("bf-{mode.name}")
  362. )
  363. for mode_cls in [CBC, CFB, OFB, ECB]:
  364. self.register_cipher_adapter(
  365. _SEEDInternal, mode_cls, GetCipherByName("seed-{mode.name}")
  366. )
  367. for cipher_cls, mode_cls in itertools.product(
  368. [_CAST5Internal, _IDEAInternal],
  369. [CBC, OFB, CFB, ECB],
  370. ):
  371. self.register_cipher_adapter(
  372. cipher_cls,
  373. mode_cls,
  374. GetCipherByName("{cipher.name}-{mode.name}"),
  375. )
  376. self.register_cipher_adapter(ARC4, type(None), GetCipherByName("rc4"))
  377. # We don't actually support RC2, this is just used by some tests.
  378. self.register_cipher_adapter(_RC2, type(None), GetCipherByName("rc2"))
  379. self.register_cipher_adapter(
  380. ChaCha20, type(None), GetCipherByName("chacha20")
  381. )
  382. self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
  383. for mode_cls in [ECB, CBC, OFB, CFB, CTR]:
  384. self.register_cipher_adapter(
  385. SM4, mode_cls, GetCipherByName("sm4-{mode.name}")
  386. )
  387. def create_symmetric_encryption_ctx(
  388. self, cipher: CipherAlgorithm, mode: Mode
  389. ) -> _CipherContext:
  390. return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
  391. def create_symmetric_decryption_ctx(
  392. self, cipher: CipherAlgorithm, mode: Mode
  393. ) -> _CipherContext:
  394. return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
  395. def pbkdf2_hmac_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
  396. return self.hmac_supported(algorithm)
  397. def derive_pbkdf2_hmac(
  398. self,
  399. algorithm: hashes.HashAlgorithm,
  400. length: int,
  401. salt: bytes,
  402. iterations: int,
  403. key_material: bytes,
  404. ) -> bytes:
  405. buf = self._ffi.new("unsigned char[]", length)
  406. evp_md = self._evp_md_non_null_from_algorithm(algorithm)
  407. key_material_ptr = self._ffi.from_buffer(key_material)
  408. res = self._lib.PKCS5_PBKDF2_HMAC(
  409. key_material_ptr,
  410. len(key_material),
  411. salt,
  412. len(salt),
  413. iterations,
  414. evp_md,
  415. length,
  416. buf,
  417. )
  418. self.openssl_assert(res == 1)
  419. return self._ffi.buffer(buf)[:]
  420. def _consume_errors(self) -> typing.List[binding._OpenSSLError]:
  421. return binding._consume_errors(self._lib)
  422. def _consume_errors_with_text(
  423. self,
  424. ) -> typing.List[binding._OpenSSLErrorWithText]:
  425. return binding._consume_errors_with_text(self._lib)
  426. def _bn_to_int(self, bn) -> int:
  427. assert bn != self._ffi.NULL
  428. self.openssl_assert(not self._lib.BN_is_negative(bn))
  429. bn_num_bytes = self._lib.BN_num_bytes(bn)
  430. bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
  431. bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
  432. # A zero length means the BN has value 0
  433. self.openssl_assert(bin_len >= 0)
  434. val = int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
  435. return val
  436. def _int_to_bn(self, num: int, bn=None):
  437. """
  438. Converts a python integer to a BIGNUM. The returned BIGNUM will not
  439. be garbage collected (to support adding them to structs that take
  440. ownership of the object). Be sure to register it for GC if it will
  441. be discarded after use.
  442. """
  443. assert bn is None or bn != self._ffi.NULL
  444. if bn is None:
  445. bn = self._ffi.NULL
  446. binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big")
  447. bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn)
  448. self.openssl_assert(bn_ptr != self._ffi.NULL)
  449. return bn_ptr
  450. def generate_rsa_private_key(
  451. self, public_exponent: int, key_size: int
  452. ) -> rsa.RSAPrivateKey:
  453. rsa._verify_rsa_parameters(public_exponent, key_size)
  454. rsa_cdata = self._lib.RSA_new()
  455. self.openssl_assert(rsa_cdata != self._ffi.NULL)
  456. rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
  457. bn = self._int_to_bn(public_exponent)
  458. bn = self._ffi.gc(bn, self._lib.BN_free)
  459. res = self._lib.RSA_generate_key_ex(
  460. rsa_cdata, key_size, bn, self._ffi.NULL
  461. )
  462. self.openssl_assert(res == 1)
  463. evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
  464. return _RSAPrivateKey(
  465. self, rsa_cdata, evp_pkey, self._rsa_skip_check_key
  466. )
  467. def generate_rsa_parameters_supported(
  468. self, public_exponent: int, key_size: int
  469. ) -> bool:
  470. return (
  471. public_exponent >= 3
  472. and public_exponent & 1 != 0
  473. and key_size >= 512
  474. )
  475. def load_rsa_private_numbers(
  476. self, numbers: rsa.RSAPrivateNumbers
  477. ) -> rsa.RSAPrivateKey:
  478. rsa._check_private_key_components(
  479. numbers.p,
  480. numbers.q,
  481. numbers.d,
  482. numbers.dmp1,
  483. numbers.dmq1,
  484. numbers.iqmp,
  485. numbers.public_numbers.e,
  486. numbers.public_numbers.n,
  487. )
  488. rsa_cdata = self._lib.RSA_new()
  489. self.openssl_assert(rsa_cdata != self._ffi.NULL)
  490. rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
  491. p = self._int_to_bn(numbers.p)
  492. q = self._int_to_bn(numbers.q)
  493. d = self._int_to_bn(numbers.d)
  494. dmp1 = self._int_to_bn(numbers.dmp1)
  495. dmq1 = self._int_to_bn(numbers.dmq1)
  496. iqmp = self._int_to_bn(numbers.iqmp)
  497. e = self._int_to_bn(numbers.public_numbers.e)
  498. n = self._int_to_bn(numbers.public_numbers.n)
  499. res = self._lib.RSA_set0_factors(rsa_cdata, p, q)
  500. self.openssl_assert(res == 1)
  501. res = self._lib.RSA_set0_key(rsa_cdata, n, e, d)
  502. self.openssl_assert(res == 1)
  503. res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp)
  504. self.openssl_assert(res == 1)
  505. evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
  506. return _RSAPrivateKey(
  507. self, rsa_cdata, evp_pkey, self._rsa_skip_check_key
  508. )
  509. def load_rsa_public_numbers(
  510. self, numbers: rsa.RSAPublicNumbers
  511. ) -> rsa.RSAPublicKey:
  512. rsa._check_public_key_components(numbers.e, numbers.n)
  513. rsa_cdata = self._lib.RSA_new()
  514. self.openssl_assert(rsa_cdata != self._ffi.NULL)
  515. rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
  516. e = self._int_to_bn(numbers.e)
  517. n = self._int_to_bn(numbers.n)
  518. res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL)
  519. self.openssl_assert(res == 1)
  520. evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
  521. return _RSAPublicKey(self, rsa_cdata, evp_pkey)
  522. def _create_evp_pkey_gc(self):
  523. evp_pkey = self._lib.EVP_PKEY_new()
  524. self.openssl_assert(evp_pkey != self._ffi.NULL)
  525. evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
  526. return evp_pkey
  527. def _rsa_cdata_to_evp_pkey(self, rsa_cdata):
  528. evp_pkey = self._create_evp_pkey_gc()
  529. res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata)
  530. self.openssl_assert(res == 1)
  531. return evp_pkey
  532. def _bytes_to_bio(self, data: bytes):
  533. """
  534. Return a _MemoryBIO namedtuple of (BIO, char*).
  535. The char* is the storage for the BIO and it must stay alive until the
  536. BIO is finished with.
  537. """
  538. data_ptr = self._ffi.from_buffer(data)
  539. bio = self._lib.BIO_new_mem_buf(data_ptr, len(data))
  540. self.openssl_assert(bio != self._ffi.NULL)
  541. return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr)
  542. def _create_mem_bio_gc(self):
  543. """
  544. Creates an empty memory BIO.
  545. """
  546. bio_method = self._lib.BIO_s_mem()
  547. self.openssl_assert(bio_method != self._ffi.NULL)
  548. bio = self._lib.BIO_new(bio_method)
  549. self.openssl_assert(bio != self._ffi.NULL)
  550. bio = self._ffi.gc(bio, self._lib.BIO_free)
  551. return bio
  552. def _read_mem_bio(self, bio) -> bytes:
  553. """
  554. Reads a memory BIO. This only works on memory BIOs.
  555. """
  556. buf = self._ffi.new("char **")
  557. buf_len = self._lib.BIO_get_mem_data(bio, buf)
  558. self.openssl_assert(buf_len > 0)
  559. self.openssl_assert(buf[0] != self._ffi.NULL)
  560. bio_data = self._ffi.buffer(buf[0], buf_len)[:]
  561. return bio_data
  562. def _evp_pkey_to_private_key(self, evp_pkey) -> PRIVATE_KEY_TYPES:
  563. """
  564. Return the appropriate type of PrivateKey given an evp_pkey cdata
  565. pointer.
  566. """
  567. key_type = self._lib.EVP_PKEY_id(evp_pkey)
  568. if key_type == self._lib.EVP_PKEY_RSA:
  569. rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
  570. self.openssl_assert(rsa_cdata != self._ffi.NULL)
  571. rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
  572. return _RSAPrivateKey(
  573. self, rsa_cdata, evp_pkey, self._rsa_skip_check_key
  574. )
  575. elif (
  576. key_type == self._lib.EVP_PKEY_RSA_PSS
  577. and not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
  578. and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
  579. and not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111E
  580. ):
  581. # At the moment the way we handle RSA PSS keys is to strip the
  582. # PSS constraints from them and treat them as normal RSA keys
  583. # Unfortunately the RSA * itself tracks this data so we need to
  584. # extract, serialize, and reload it without the constraints.
  585. rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
  586. self.openssl_assert(rsa_cdata != self._ffi.NULL)
  587. rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
  588. bio = self._create_mem_bio_gc()
  589. res = self._lib.i2d_RSAPrivateKey_bio(bio, rsa_cdata)
  590. self.openssl_assert(res == 1)
  591. return self.load_der_private_key(
  592. self._read_mem_bio(bio), password=None
  593. )
  594. elif key_type == self._lib.EVP_PKEY_DSA:
  595. dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
  596. self.openssl_assert(dsa_cdata != self._ffi.NULL)
  597. dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
  598. return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
  599. elif key_type == self._lib.EVP_PKEY_EC:
  600. ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
  601. self.openssl_assert(ec_cdata != self._ffi.NULL)
  602. ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
  603. return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
  604. elif key_type in self._dh_types:
  605. dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
  606. self.openssl_assert(dh_cdata != self._ffi.NULL)
  607. dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
  608. return _DHPrivateKey(self, dh_cdata, evp_pkey)
  609. elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
  610. # EVP_PKEY_ED25519 is not present in OpenSSL < 1.1.1
  611. return _Ed25519PrivateKey(self, evp_pkey)
  612. elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
  613. # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1
  614. return _X448PrivateKey(self, evp_pkey)
  615. elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None):
  616. # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0
  617. return _X25519PrivateKey(self, evp_pkey)
  618. elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
  619. # EVP_PKEY_ED448 is not present in OpenSSL < 1.1.1
  620. return _Ed448PrivateKey(self, evp_pkey)
  621. else:
  622. raise UnsupportedAlgorithm("Unsupported key type.")
  623. def _evp_pkey_to_public_key(self, evp_pkey) -> PUBLIC_KEY_TYPES:
  624. """
  625. Return the appropriate type of PublicKey given an evp_pkey cdata
  626. pointer.
  627. """
  628. key_type = self._lib.EVP_PKEY_id(evp_pkey)
  629. if key_type == self._lib.EVP_PKEY_RSA:
  630. rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
  631. self.openssl_assert(rsa_cdata != self._ffi.NULL)
  632. rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
  633. return _RSAPublicKey(self, rsa_cdata, evp_pkey)
  634. elif key_type == self._lib.EVP_PKEY_DSA:
  635. dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
  636. self.openssl_assert(dsa_cdata != self._ffi.NULL)
  637. dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
  638. return _DSAPublicKey(self, dsa_cdata, evp_pkey)
  639. elif key_type == self._lib.EVP_PKEY_EC:
  640. ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
  641. self.openssl_assert(ec_cdata != self._ffi.NULL)
  642. ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
  643. return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
  644. elif key_type in self._dh_types:
  645. dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
  646. self.openssl_assert(dh_cdata != self._ffi.NULL)
  647. dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
  648. return _DHPublicKey(self, dh_cdata, evp_pkey)
  649. elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
  650. # EVP_PKEY_ED25519 is not present in OpenSSL < 1.1.1
  651. return _Ed25519PublicKey(self, evp_pkey)
  652. elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
  653. # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1
  654. return _X448PublicKey(self, evp_pkey)
  655. elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None):
  656. # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0
  657. return _X25519PublicKey(self, evp_pkey)
  658. elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
  659. # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.1
  660. return _Ed448PublicKey(self, evp_pkey)
  661. else:
  662. raise UnsupportedAlgorithm("Unsupported key type.")
  663. def _oaep_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
  664. return isinstance(
  665. algorithm,
  666. (
  667. hashes.SHA1,
  668. hashes.SHA224,
  669. hashes.SHA256,
  670. hashes.SHA384,
  671. hashes.SHA512,
  672. ),
  673. )
  674. def rsa_padding_supported(self, padding: AsymmetricPadding) -> bool:
  675. if isinstance(padding, PKCS1v15):
  676. return True
  677. elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1):
  678. # SHA1 is permissible in MGF1 in FIPS even when SHA1 is blocked
  679. # as signature algorithm.
  680. if self._fips_enabled and isinstance(
  681. padding._mgf._algorithm, hashes.SHA1
  682. ):
  683. return True
  684. else:
  685. return self.hash_supported(padding._mgf._algorithm)
  686. elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1):
  687. return self._oaep_hash_supported(
  688. padding._mgf._algorithm
  689. ) and self._oaep_hash_supported(padding._algorithm)
  690. else:
  691. return False
  692. def generate_dsa_parameters(self, key_size: int) -> dsa.DSAParameters:
  693. if key_size not in (1024, 2048, 3072, 4096):
  694. raise ValueError(
  695. "Key size must be 1024, 2048, 3072, or 4096 bits."
  696. )
  697. ctx = self._lib.DSA_new()
  698. self.openssl_assert(ctx != self._ffi.NULL)
  699. ctx = self._ffi.gc(ctx, self._lib.DSA_free)
  700. res = self._lib.DSA_generate_parameters_ex(
  701. ctx,
  702. key_size,
  703. self._ffi.NULL,
  704. 0,
  705. self._ffi.NULL,
  706. self._ffi.NULL,
  707. self._ffi.NULL,
  708. )
  709. self.openssl_assert(res == 1)
  710. return _DSAParameters(self, ctx)
  711. def generate_dsa_private_key(
  712. self, parameters: dsa.DSAParameters
  713. ) -> dsa.DSAPrivateKey:
  714. ctx = self._lib.DSAparams_dup(
  715. parameters._dsa_cdata # type: ignore[attr-defined]
  716. )
  717. self.openssl_assert(ctx != self._ffi.NULL)
  718. ctx = self._ffi.gc(ctx, self._lib.DSA_free)
  719. self._lib.DSA_generate_key(ctx)
  720. evp_pkey = self._dsa_cdata_to_evp_pkey(ctx)
  721. return _DSAPrivateKey(self, ctx, evp_pkey)
  722. def generate_dsa_private_key_and_parameters(
  723. self, key_size: int
  724. ) -> dsa.DSAPrivateKey:
  725. parameters = self.generate_dsa_parameters(key_size)
  726. return self.generate_dsa_private_key(parameters)
  727. def _dsa_cdata_set_values(self, dsa_cdata, p, q, g, pub_key, priv_key):
  728. res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
  729. self.openssl_assert(res == 1)
  730. res = self._lib.DSA_set0_key(dsa_cdata, pub_key, priv_key)
  731. self.openssl_assert(res == 1)
  732. def load_dsa_private_numbers(
  733. self, numbers: dsa.DSAPrivateNumbers
  734. ) -> dsa.DSAPrivateKey:
  735. dsa._check_dsa_private_numbers(numbers)
  736. parameter_numbers = numbers.public_numbers.parameter_numbers
  737. dsa_cdata = self._lib.DSA_new()
  738. self.openssl_assert(dsa_cdata != self._ffi.NULL)
  739. dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
  740. p = self._int_to_bn(parameter_numbers.p)
  741. q = self._int_to_bn(parameter_numbers.q)
  742. g = self._int_to_bn(parameter_numbers.g)
  743. pub_key = self._int_to_bn(numbers.public_numbers.y)
  744. priv_key = self._int_to_bn(numbers.x)
  745. self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
  746. evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
  747. return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
  748. def load_dsa_public_numbers(
  749. self, numbers: dsa.DSAPublicNumbers
  750. ) -> dsa.DSAPublicKey:
  751. dsa._check_dsa_parameters(numbers.parameter_numbers)
  752. dsa_cdata = self._lib.DSA_new()
  753. self.openssl_assert(dsa_cdata != self._ffi.NULL)
  754. dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
  755. p = self._int_to_bn(numbers.parameter_numbers.p)
  756. q = self._int_to_bn(numbers.parameter_numbers.q)
  757. g = self._int_to_bn(numbers.parameter_numbers.g)
  758. pub_key = self._int_to_bn(numbers.y)
  759. priv_key = self._ffi.NULL
  760. self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
  761. evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
  762. return _DSAPublicKey(self, dsa_cdata, evp_pkey)
  763. def load_dsa_parameter_numbers(
  764. self, numbers: dsa.DSAParameterNumbers
  765. ) -> dsa.DSAParameters:
  766. dsa._check_dsa_parameters(numbers)
  767. dsa_cdata = self._lib.DSA_new()
  768. self.openssl_assert(dsa_cdata != self._ffi.NULL)
  769. dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
  770. p = self._int_to_bn(numbers.p)
  771. q = self._int_to_bn(numbers.q)
  772. g = self._int_to_bn(numbers.g)
  773. res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
  774. self.openssl_assert(res == 1)
  775. return _DSAParameters(self, dsa_cdata)
  776. def _dsa_cdata_to_evp_pkey(self, dsa_cdata):
  777. evp_pkey = self._create_evp_pkey_gc()
  778. res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata)
  779. self.openssl_assert(res == 1)
  780. return evp_pkey
  781. def dsa_supported(self) -> bool:
  782. return not self._fips_enabled
  783. def dsa_hash_supported(self, algorithm: hashes.HashAlgorithm) -> bool:
  784. if not self.dsa_supported():
  785. return False
  786. return self.signature_hash_supported(algorithm)
  787. def cmac_algorithm_supported(self, algorithm) -> bool:
  788. return self.cipher_supported(
  789. algorithm, CBC(b"\x00" * algorithm.block_size)
  790. )
  791. def create_cmac_ctx(self, algorithm: BlockCipherAlgorithm) -> _CMACContext:
  792. return _CMACContext(self, algorithm)
  793. def load_pem_private_key(
  794. self, data: bytes, password: typing.Optional[bytes]
  795. ) -> PRIVATE_KEY_TYPES:
  796. return self._load_key(
  797. self._lib.PEM_read_bio_PrivateKey,
  798. self._evp_pkey_to_private_key,
  799. data,
  800. password,
  801. )
  802. def load_pem_public_key(self, data: bytes) -> PUBLIC_KEY_TYPES:
  803. mem_bio = self._bytes_to_bio(data)
  804. # In OpenSSL 3.0.x the PEM_read_bio_PUBKEY function will invoke
  805. # the default password callback if you pass an encrypted private
  806. # key. This is very, very, very bad as the default callback can
  807. # trigger an interactive console prompt, which will hang the
  808. # Python process. We therefore provide our own callback to
  809. # catch this and error out properly.
  810. userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
  811. evp_pkey = self._lib.PEM_read_bio_PUBKEY(
  812. mem_bio.bio,
  813. self._ffi.NULL,
  814. self._ffi.addressof(
  815. self._lib._original_lib, "Cryptography_pem_password_cb"
  816. ),
  817. userdata,
  818. )
  819. if evp_pkey != self._ffi.NULL:
  820. evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
  821. return self._evp_pkey_to_public_key(evp_pkey)
  822. else:
  823. # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
  824. # need to check to see if it is a pure PKCS1 RSA public key (not
  825. # embedded in a subjectPublicKeyInfo)
  826. self._consume_errors()
  827. res = self._lib.BIO_reset(mem_bio.bio)
  828. self.openssl_assert(res == 1)
  829. rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey(
  830. mem_bio.bio,
  831. self._ffi.NULL,
  832. self._ffi.addressof(
  833. self._lib._original_lib, "Cryptography_pem_password_cb"
  834. ),
  835. userdata,
  836. )
  837. if rsa_cdata != self._ffi.NULL:
  838. rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
  839. evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
  840. return _RSAPublicKey(self, rsa_cdata, evp_pkey)
  841. else:
  842. self._handle_key_loading_error()
  843. def load_pem_parameters(self, data: bytes) -> dh.DHParameters:
  844. mem_bio = self._bytes_to_bio(data)
  845. # only DH is supported currently
  846. dh_cdata = self._lib.PEM_read_bio_DHparams(
  847. mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
  848. )
  849. if dh_cdata != self._ffi.NULL:
  850. dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
  851. return _DHParameters(self, dh_cdata)
  852. else:
  853. self._handle_key_loading_error()
  854. def load_der_private_key(
  855. self, data: bytes, password: typing.Optional[bytes]
  856. ) -> PRIVATE_KEY_TYPES:
  857. # OpenSSL has a function called d2i_AutoPrivateKey that in theory
  858. # handles this automatically, however it doesn't handle encrypted
  859. # private keys. Instead we try to load the key two different ways.
  860. # First we'll try to load it as a traditional key.
  861. bio_data = self._bytes_to_bio(data)
  862. key = self._evp_pkey_from_der_traditional_key(bio_data, password)
  863. if key:
  864. return self._evp_pkey_to_private_key(key)
  865. else:
  866. # Finally we try to load it with the method that handles encrypted
  867. # PKCS8 properly.
  868. return self._load_key(
  869. self._lib.d2i_PKCS8PrivateKey_bio,
  870. self._evp_pkey_to_private_key,
  871. data,
  872. password,
  873. )
  874. def _evp_pkey_from_der_traditional_key(self, bio_data, password):
  875. key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL)
  876. if key != self._ffi.NULL:
  877. # In OpenSSL 3.0.0-alpha15 there exist scenarios where the key will
  878. # successfully load but errors are still put on the stack. Tracked
  879. # as https://github.com/openssl/openssl/issues/14996
  880. self._consume_errors()
  881. key = self._ffi.gc(key, self._lib.EVP_PKEY_free)
  882. if password is not None:
  883. raise TypeError(
  884. "Password was given but private key is not encrypted."
  885. )
  886. return key
  887. else:
  888. self._consume_errors()
  889. return None
  890. def load_der_public_key(self, data: bytes) -> PUBLIC_KEY_TYPES:
  891. mem_bio = self._bytes_to_bio(data)
  892. evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL)
  893. if evp_pkey != self._ffi.NULL:
  894. evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
  895. return self._evp_pkey_to_public_key(evp_pkey)
  896. else:
  897. # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
  898. # need to check to see if it is a pure PKCS1 RSA public key (not
  899. # embedded in a subjectPublicKeyInfo)
  900. self._consume_errors()
  901. res = self._lib.BIO_reset(mem_bio.bio)
  902. self.openssl_assert(res == 1)
  903. rsa_cdata = self._lib.d2i_RSAPublicKey_bio(
  904. mem_bio.bio, self._ffi.NULL
  905. )
  906. if rsa_cdata != self._ffi.NULL:
  907. rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
  908. evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
  909. return _RSAPublicKey(self, rsa_cdata, evp_pkey)
  910. else:
  911. self._handle_key_loading_error()
  912. def load_der_parameters(self, data: bytes) -> dh.DHParameters:
  913. mem_bio = self._bytes_to_bio(data)
  914. dh_cdata = self._lib.d2i_DHparams_bio(mem_bio.bio, self._ffi.NULL)
  915. if dh_cdata != self._ffi.NULL:
  916. dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
  917. return _DHParameters(self, dh_cdata)
  918. elif self._lib.Cryptography_HAS_EVP_PKEY_DHX:
  919. # We check to see if the is dhx.
  920. self._consume_errors()
  921. res = self._lib.BIO_reset(mem_bio.bio)
  922. self.openssl_assert(res == 1)
  923. dh_cdata = self._lib.Cryptography_d2i_DHxparams_bio(
  924. mem_bio.bio, self._ffi.NULL
  925. )
  926. if dh_cdata != self._ffi.NULL:
  927. dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
  928. return _DHParameters(self, dh_cdata)
  929. self._handle_key_loading_error()
  930. def _cert2ossl(self, cert: x509.Certificate) -> typing.Any:
  931. data = cert.public_bytes(serialization.Encoding.DER)
  932. mem_bio = self._bytes_to_bio(data)
  933. x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL)
  934. self.openssl_assert(x509 != self._ffi.NULL)
  935. x509 = self._ffi.gc(x509, self._lib.X509_free)
  936. return x509
  937. def _ossl2cert(self, x509: typing.Any) -> x509.Certificate:
  938. bio = self._create_mem_bio_gc()
  939. res = self._lib.i2d_X509_bio(bio, x509)
  940. self.openssl_assert(res == 1)
  941. return rust_x509.load_der_x509_certificate(self._read_mem_bio(bio))
  942. def _csr2ossl(self, csr: x509.CertificateSigningRequest) -> typing.Any:
  943. data = csr.public_bytes(serialization.Encoding.DER)
  944. mem_bio = self._bytes_to_bio(data)
  945. x509_req = self._lib.d2i_X509_REQ_bio(mem_bio.bio, self._ffi.NULL)
  946. self.openssl_assert(x509_req != self._ffi.NULL)
  947. x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
  948. return x509_req
  949. def _ossl2csr(
  950. self, x509_req: typing.Any
  951. ) -> x509.CertificateSigningRequest:
  952. bio = self._create_mem_bio_gc()
  953. res = self._lib.i2d_X509_REQ_bio(bio, x509_req)
  954. self.openssl_assert(res == 1)
  955. return rust_x509.load_der_x509_csr(self._read_mem_bio(bio))
  956. def _crl2ossl(self, crl: x509.CertificateRevocationList) -> typing.Any:
  957. data = crl.public_bytes(serialization.Encoding.DER)
  958. mem_bio = self._bytes_to_bio(data)
  959. x509_crl = self._lib.d2i_X509_CRL_bio(mem_bio.bio, self._ffi.NULL)
  960. self.openssl_assert(x509_crl != self._ffi.NULL)
  961. x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
  962. return x509_crl
  963. def _ossl2crl(
  964. self, x509_crl: typing.Any
  965. ) -> x509.CertificateRevocationList:
  966. bio = self._create_mem_bio_gc()
  967. res = self._lib.i2d_X509_CRL_bio(bio, x509_crl)
  968. self.openssl_assert(res == 1)
  969. return rust_x509.load_der_x509_crl(self._read_mem_bio(bio))
  970. def _crl_is_signature_valid(
  971. self,
  972. crl: x509.CertificateRevocationList,
  973. public_key: CERTIFICATE_ISSUER_PUBLIC_KEY_TYPES,
  974. ) -> bool:
  975. if not isinstance(
  976. public_key,
  977. (
  978. _DSAPublicKey,
  979. _RSAPublicKey,
  980. _EllipticCurvePublicKey,
  981. ),
  982. ):
  983. raise TypeError(
  984. "Expecting one of DSAPublicKey, RSAPublicKey,"
  985. " or EllipticCurvePublicKey."
  986. )
  987. x509_crl = self._crl2ossl(crl)
  988. res = self._lib.X509_CRL_verify(x509_crl, public_key._evp_pkey)
  989. if res != 1:
  990. self._consume_errors()
  991. return False
  992. return True
  993. def _csr_is_signature_valid(
  994. self, csr: x509.CertificateSigningRequest
  995. ) -> bool:
  996. x509_req = self._csr2ossl(csr)
  997. pkey = self._lib.X509_REQ_get_pubkey(x509_req)
  998. self.openssl_assert(pkey != self._ffi.NULL)
  999. pkey = self._ffi.gc(pkey, self._lib.EVP_PKEY_free)
  1000. res = self._lib.X509_REQ_verify(x509_req, pkey)
  1001. if res != 1:
  1002. self._consume_errors()
  1003. return False
  1004. return True
  1005. def _check_keys_correspond(self, key1, key2):
  1006. if self._lib.EVP_PKEY_cmp(key1._evp_pkey, key2._evp_pkey) != 1:
  1007. raise ValueError("Keys do not correspond")
  1008. def _load_key(self, openssl_read_func, convert_func, data, password):
  1009. mem_bio = self._bytes_to_bio(data)
  1010. userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
  1011. if password is not None:
  1012. utils._check_byteslike("password", password)
  1013. password_ptr = self._ffi.from_buffer(password)
  1014. userdata.password = password_ptr
  1015. userdata.length = len(password)
  1016. evp_pkey = openssl_read_func(
  1017. mem_bio.bio,
  1018. self._ffi.NULL,
  1019. self._ffi.addressof(
  1020. self._lib._original_lib, "Cryptography_pem_password_cb"
  1021. ),
  1022. userdata,
  1023. )
  1024. if evp_pkey == self._ffi.NULL:
  1025. if userdata.error != 0:
  1026. self._consume_errors()
  1027. if userdata.error == -1:
  1028. raise TypeError(
  1029. "Password was not given but private key is encrypted"
  1030. )
  1031. else:
  1032. assert userdata.error == -2
  1033. raise ValueError(
  1034. "Passwords longer than {} bytes are not supported "
  1035. "by this backend.".format(userdata.maxsize - 1)
  1036. )
  1037. else:
  1038. self._handle_key_loading_error()
  1039. # In OpenSSL 3.0.0-alpha15 there exist scenarios where the key will
  1040. # successfully load but errors are still put on the stack. Tracked
  1041. # as https://github.com/openssl/openssl/issues/14996
  1042. self._consume_errors()
  1043. evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
  1044. if password is not None and userdata.called == 0:
  1045. raise TypeError(
  1046. "Password was given but private key is not encrypted."
  1047. )
  1048. assert (
  1049. password is not None and userdata.called == 1
  1050. ) or password is None
  1051. return convert_func(evp_pkey)
  1052. def _handle_key_loading_error(self) -> typing.NoReturn:
  1053. errors = self._consume_errors()
  1054. if not errors:
  1055. raise ValueError(
  1056. "Could not deserialize key data. The data may be in an "
  1057. "incorrect format or it may be encrypted with an unsupported "
  1058. "algorithm."
  1059. )
  1060. elif (
  1061. errors[0]._lib_reason_match(
  1062. self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT
  1063. )
  1064. or errors[0]._lib_reason_match(
  1065. self._lib.ERR_LIB_PKCS12,
  1066. self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR,
  1067. )
  1068. or (
  1069. self._lib.Cryptography_HAS_PROVIDERS
  1070. and errors[0]._lib_reason_match(
  1071. self._lib.ERR_LIB_PROV,
  1072. self._lib.PROV_R_BAD_DECRYPT,
  1073. )
  1074. )
  1075. ):
  1076. raise ValueError("Bad decrypt. Incorrect password?")
  1077. elif any(
  1078. error._lib_reason_match(
  1079. self._lib.ERR_LIB_EVP,
  1080. self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM,
  1081. )
  1082. for error in errors
  1083. ):
  1084. raise ValueError("Unsupported public key algorithm.")
  1085. else:
  1086. errors_with_text = binding._errors_with_text(errors)
  1087. raise ValueError(
  1088. "Could not deserialize key data. The data may be in an "
  1089. "incorrect format, it may be encrypted with an unsupported "
  1090. "algorithm, or it may be an unsupported key type (e.g. EC "
  1091. "curves with explicit parameters).",
  1092. errors_with_text,
  1093. )
  1094. def elliptic_curve_supported(self, curve: ec.EllipticCurve) -> bool:
  1095. try:
  1096. curve_nid = self._elliptic_curve_to_nid(curve)
  1097. except UnsupportedAlgorithm:
  1098. curve_nid = self._lib.NID_undef
  1099. group = self._lib.EC_GROUP_new_by_curve_name(curve_nid)
  1100. if group == self._ffi.NULL:
  1101. self._consume_errors()
  1102. return False
  1103. else:
  1104. self.openssl_assert(curve_nid != self._lib.NID_undef)
  1105. self._lib.EC_GROUP_free(group)
  1106. return True
  1107. def elliptic_curve_signature_algorithm_supported(
  1108. self,
  1109. signature_algorithm: ec.EllipticCurveSignatureAlgorithm,
  1110. curve: ec.EllipticCurve,
  1111. ) -> bool:
  1112. # We only support ECDSA right now.
  1113. if not isinstance(signature_algorithm, ec.ECDSA):
  1114. return False
  1115. return self.elliptic_curve_supported(curve)
  1116. def generate_elliptic_curve_private_key(
  1117. self, curve: ec.EllipticCurve
  1118. ) -> ec.EllipticCurvePrivateKey:
  1119. """
  1120. Generate a new private key on the named curve.
  1121. """
  1122. if self.elliptic_curve_supported(curve):
  1123. ec_cdata = self._ec_key_new_by_curve(curve)
  1124. res = self._lib.EC_KEY_generate_key(ec_cdata)
  1125. self.openssl_assert(res == 1)
  1126. evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
  1127. return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
  1128. else:
  1129. raise UnsupportedAlgorithm(
  1130. "Backend object does not support {}.".format(curve.name),
  1131. _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
  1132. )
  1133. def load_elliptic_curve_private_numbers(
  1134. self, numbers: ec.EllipticCurvePrivateNumbers
  1135. ) -> ec.EllipticCurvePrivateKey:
  1136. public = numbers.public_numbers
  1137. ec_cdata = self._ec_key_new_by_curve(public.curve)
  1138. private_value = self._ffi.gc(
  1139. self._int_to_bn(numbers.private_value), self._lib.BN_clear_free
  1140. )
  1141. res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value)
  1142. if res != 1:
  1143. self._consume_errors()
  1144. raise ValueError("Invalid EC key.")
  1145. self._ec_key_set_public_key_affine_coordinates(
  1146. ec_cdata, public.x, public.y
  1147. )
  1148. evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
  1149. return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
  1150. def load_elliptic_curve_public_numbers(
  1151. self, numbers: ec.EllipticCurvePublicNumbers
  1152. ) -> ec.EllipticCurvePublicKey:
  1153. ec_cdata = self._ec_key_new_by_curve(numbers.curve)
  1154. self._ec_key_set_public_key_affine_coordinates(
  1155. ec_cdata, numbers.x, numbers.y
  1156. )
  1157. evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
  1158. return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
  1159. def load_elliptic_curve_public_bytes(
  1160. self, curve: ec.EllipticCurve, point_bytes: bytes
  1161. ) -> ec.EllipticCurvePublicKey:
  1162. ec_cdata = self._ec_key_new_by_curve(curve)
  1163. group = self._lib.EC_KEY_get0_group(ec_cdata)
  1164. self.openssl_assert(group != self._ffi.NULL)
  1165. point = self._lib.EC_POINT_new(group)
  1166. self.openssl_assert(point != self._ffi.NULL)
  1167. point = self._ffi.gc(point, self._lib.EC_POINT_free)
  1168. with self._tmp_bn_ctx() as bn_ctx:
  1169. res = self._lib.EC_POINT_oct2point(
  1170. group, point, point_bytes, len(point_bytes), bn_ctx
  1171. )
  1172. if res != 1:
  1173. self._consume_errors()
  1174. raise ValueError("Invalid public bytes for the given curve")
  1175. res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
  1176. self.openssl_assert(res == 1)
  1177. evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
  1178. return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
  1179. def derive_elliptic_curve_private_key(
  1180. self, private_value: int, curve: ec.EllipticCurve
  1181. ) -> ec.EllipticCurvePrivateKey:
  1182. ec_cdata = self._ec_key_new_by_curve(curve)
  1183. get_func, group = self._ec_key_determine_group_get_func(ec_cdata)
  1184. point = self._lib.EC_POINT_new(group)
  1185. self.openssl_assert(point != self._ffi.NULL)
  1186. point = self._ffi.gc(point, self._lib.EC_POINT_free)
  1187. value = self._int_to_bn(private_value)
  1188. value = self._ffi.gc(value, self._lib.BN_clear_free)
  1189. with self._tmp_bn_ctx() as bn_ctx:
  1190. res = self._lib.EC_POINT_mul(
  1191. group, point, value, self._ffi.NULL, self._ffi.NULL, bn_ctx
  1192. )
  1193. self.openssl_assert(res == 1)
  1194. bn_x = self._lib.BN_CTX_get(bn_ctx)
  1195. bn_y = self._lib.BN_CTX_get(bn_ctx)
  1196. res = get_func(group, point, bn_x, bn_y, bn_ctx)
  1197. if res != 1:
  1198. self._consume_errors()
  1199. raise ValueError("Unable to derive key from private_value")
  1200. res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
  1201. self.openssl_assert(res == 1)
  1202. private = self._int_to_bn(private_value)
  1203. private = self._ffi.gc(private, self._lib.BN_clear_free)
  1204. res = self._lib.EC_KEY_set_private_key(ec_cdata, private)
  1205. self.openssl_assert(res == 1)
  1206. evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
  1207. return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
  1208. def _ec_key_new_by_curve(self, curve: ec.EllipticCurve):
  1209. curve_nid = self._elliptic_curve_to_nid(curve)
  1210. return self._ec_key_new_by_curve_nid(curve_nid)
  1211. def _ec_key_new_by_curve_nid(self, curve_nid: int):
  1212. ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
  1213. self.openssl_assert(ec_cdata != self._ffi.NULL)
  1214. return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
  1215. def elliptic_curve_exchange_algorithm_supported(
  1216. self, algorithm: ec.ECDH, curve: ec.EllipticCurve
  1217. ) -> bool:
  1218. if self._fips_enabled and not isinstance(
  1219. curve, self._fips_ecdh_curves
  1220. ):
  1221. return False
  1222. return self.elliptic_curve_supported(curve) and isinstance(
  1223. algorithm, ec.ECDH
  1224. )
  1225. def _ec_cdata_to_evp_pkey(self, ec_cdata):
  1226. evp_pkey = self._create_evp_pkey_gc()
  1227. res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata)
  1228. self.openssl_assert(res == 1)
  1229. return evp_pkey
  1230. def _elliptic_curve_to_nid(self, curve: ec.EllipticCurve) -> int:
  1231. """
  1232. Get the NID for a curve name.
  1233. """
  1234. curve_aliases = {"secp192r1": "prime192v1", "secp256r1": "prime256v1"}
  1235. curve_name = curve_aliases.get(curve.name, curve.name)
  1236. curve_nid = self._lib.OBJ_sn2nid(curve_name.encode())
  1237. if curve_nid == self._lib.NID_undef:
  1238. raise UnsupportedAlgorithm(
  1239. "{} is not a supported elliptic curve".format(curve.name),
  1240. _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
  1241. )
  1242. return curve_nid
  1243. @contextmanager
  1244. def _tmp_bn_ctx(self):
  1245. bn_ctx = self._lib.BN_CTX_new()
  1246. self.openssl_assert(bn_ctx != self._ffi.NULL)
  1247. bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free)
  1248. self._lib.BN_CTX_start(bn_ctx)
  1249. try:
  1250. yield bn_ctx
  1251. finally:
  1252. self._lib.BN_CTX_end(bn_ctx)
  1253. def _ec_key_determine_group_get_func(self, ctx):
  1254. """
  1255. Given an EC_KEY determine the group and what function is required to
  1256. get point coordinates.
  1257. """
  1258. self.openssl_assert(ctx != self._ffi.NULL)
  1259. nid_two_field = self._lib.OBJ_sn2nid(b"characteristic-two-field")
  1260. self.openssl_assert(nid_two_field != self._lib.NID_undef)
  1261. group = self._lib.EC_KEY_get0_group(ctx)
  1262. self.openssl_assert(group != self._ffi.NULL)
  1263. method = self._lib.EC_GROUP_method_of(group)
  1264. self.openssl_assert(method != self._ffi.NULL)
  1265. nid = self._lib.EC_METHOD_get_field_type(method)
  1266. self.openssl_assert(nid != self._lib.NID_undef)
  1267. if nid == nid_two_field and self._lib.Cryptography_HAS_EC2M:
  1268. get_func = self._lib.EC_POINT_get_affine_coordinates_GF2m
  1269. else:
  1270. get_func = self._lib.EC_POINT_get_affine_coordinates_GFp
  1271. assert get_func
  1272. return get_func, group
  1273. def _ec_key_set_public_key_affine_coordinates(self, ctx, x: int, y: int):
  1274. """
  1275. Sets the public key point in the EC_KEY context to the affine x and y
  1276. values.
  1277. """
  1278. if x < 0 or y < 0:
  1279. raise ValueError(
  1280. "Invalid EC key. Both x and y must be non-negative."
  1281. )
  1282. x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free)
  1283. y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free)
  1284. res = self._lib.EC_KEY_set_public_key_affine_coordinates(ctx, x, y)
  1285. if res != 1:
  1286. self._consume_errors()
  1287. raise ValueError("Invalid EC key.")
  1288. def _private_key_bytes(
  1289. self,
  1290. encoding: serialization.Encoding,
  1291. format: serialization.PrivateFormat,
  1292. encryption_algorithm: serialization.KeySerializationEncryption,
  1293. key,
  1294. evp_pkey,
  1295. cdata,
  1296. ) -> bytes:
  1297. # validate argument types
  1298. if not isinstance(encoding, serialization.Encoding):
  1299. raise TypeError("encoding must be an item from the Encoding enum")
  1300. if not isinstance(format, serialization.PrivateFormat):
  1301. raise TypeError(
  1302. "format must be an item from the PrivateFormat enum"
  1303. )
  1304. if not isinstance(
  1305. encryption_algorithm, serialization.KeySerializationEncryption
  1306. ):
  1307. raise TypeError(
  1308. "Encryption algorithm must be a KeySerializationEncryption "
  1309. "instance"
  1310. )
  1311. # validate password
  1312. if isinstance(encryption_algorithm, serialization.NoEncryption):
  1313. password = b""
  1314. elif isinstance(
  1315. encryption_algorithm, serialization.BestAvailableEncryption
  1316. ):
  1317. password = encryption_algorithm.password
  1318. if len(password) > 1023:
  1319. raise ValueError(
  1320. "Passwords longer than 1023 bytes are not supported by "
  1321. "this backend"
  1322. )
  1323. else:
  1324. raise ValueError("Unsupported encryption type")
  1325. # PKCS8 + PEM/DER
  1326. if format is serialization.PrivateFormat.PKCS8:
  1327. if encoding is serialization.Encoding.PEM:
  1328. write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey
  1329. elif encoding is serialization.Encoding.DER:
  1330. write_bio = self._lib.i2d_PKCS8PrivateKey_bio
  1331. else:
  1332. raise ValueError("Unsupported encoding for PKCS8")
  1333. return self._private_key_bytes_via_bio(
  1334. write_bio, evp_pkey, password
  1335. )
  1336. # TraditionalOpenSSL + PEM/DER
  1337. if format is serialization.PrivateFormat.TraditionalOpenSSL:
  1338. if self._fips_enabled and not isinstance(
  1339. encryption_algorithm, serialization.NoEncryption
  1340. ):
  1341. raise ValueError(
  1342. "Encrypted traditional OpenSSL format is not "
  1343. "supported in FIPS mode."
  1344. )
  1345. key_type = self._lib.EVP_PKEY_id(evp_pkey)
  1346. if encoding is serialization.Encoding.PEM:
  1347. if key_type == self._lib.EVP_PKEY_RSA:
  1348. write_bio = self._lib.PEM_write_bio_RSAPrivateKey
  1349. elif key_type == self._lib.EVP_PKEY_DSA:
  1350. write_bio = self._lib.PEM_write_bio_DSAPrivateKey
  1351. elif key_type == self._lib.EVP_PKEY_EC:
  1352. write_bio = self._lib.PEM_write_bio_ECPrivateKey
  1353. else:
  1354. raise ValueError(
  1355. "Unsupported key type for TraditionalOpenSSL"
  1356. )
  1357. return self._private_key_bytes_via_bio(
  1358. write_bio, cdata, password
  1359. )
  1360. if encoding is serialization.Encoding.DER:
  1361. if password:
  1362. raise ValueError(
  1363. "Encryption is not supported for DER encoded "
  1364. "traditional OpenSSL keys"
  1365. )
  1366. if key_type == self._lib.EVP_PKEY_RSA:
  1367. write_bio = self._lib.i2d_RSAPrivateKey_bio
  1368. elif key_type == self._lib.EVP_PKEY_EC:
  1369. write_bio = self._lib.i2d_ECPrivateKey_bio
  1370. elif key_type == self._lib.EVP_PKEY_DSA:
  1371. write_bio = self._lib.i2d_DSAPrivateKey_bio
  1372. else:
  1373. raise ValueError(
  1374. "Unsupported key type for TraditionalOpenSSL"
  1375. )
  1376. return self._bio_func_output(write_bio, cdata)
  1377. raise ValueError("Unsupported encoding for TraditionalOpenSSL")
  1378. # OpenSSH + PEM
  1379. if format is serialization.PrivateFormat.OpenSSH:
  1380. if encoding is serialization.Encoding.PEM:
  1381. return ssh.serialize_ssh_private_key(key, password)
  1382. raise ValueError(
  1383. "OpenSSH private key format can only be used"
  1384. " with PEM encoding"
  1385. )
  1386. # Anything that key-specific code was supposed to handle earlier,
  1387. # like Raw.
  1388. raise ValueError("format is invalid with this key")
  1389. def _private_key_bytes_via_bio(self, write_bio, evp_pkey, password):
  1390. if not password:
  1391. evp_cipher = self._ffi.NULL
  1392. else:
  1393. # This is a curated value that we will update over time.
  1394. evp_cipher = self._lib.EVP_get_cipherbyname(b"aes-256-cbc")
  1395. return self._bio_func_output(
  1396. write_bio,
  1397. evp_pkey,
  1398. evp_cipher,
  1399. password,
  1400. len(password),
  1401. self._ffi.NULL,
  1402. self._ffi.NULL,
  1403. )
  1404. def _bio_func_output(self, write_bio, *args):
  1405. bio = self._create_mem_bio_gc()
  1406. res = write_bio(bio, *args)
  1407. self.openssl_assert(res == 1)
  1408. return self._read_mem_bio(bio)
  1409. def _public_key_bytes(
  1410. self,
  1411. encoding: serialization.Encoding,
  1412. format: serialization.PublicFormat,
  1413. key,
  1414. evp_pkey,
  1415. cdata,
  1416. ) -> bytes:
  1417. if not isinstance(encoding, serialization.Encoding):
  1418. raise TypeError("encoding must be an item from the Encoding enum")
  1419. if not isinstance(format, serialization.PublicFormat):
  1420. raise TypeError(
  1421. "format must be an item from the PublicFormat enum"
  1422. )
  1423. # SubjectPublicKeyInfo + PEM/DER
  1424. if format is serialization.PublicFormat.SubjectPublicKeyInfo:
  1425. if encoding is serialization.Encoding.PEM:
  1426. write_bio = self._lib.PEM_write_bio_PUBKEY
  1427. elif encoding is serialization.Encoding.DER:
  1428. write_bio = self._lib.i2d_PUBKEY_bio
  1429. else:
  1430. raise ValueError(
  1431. "SubjectPublicKeyInfo works only with PEM or DER encoding"
  1432. )
  1433. return self._bio_func_output(write_bio, evp_pkey)
  1434. # PKCS1 + PEM/DER
  1435. if format is serialization.PublicFormat.PKCS1:
  1436. # Only RSA is supported here.
  1437. key_type = self._lib.EVP_PKEY_id(evp_pkey)
  1438. if key_type != self._lib.EVP_PKEY_RSA:
  1439. raise ValueError("PKCS1 format is supported only for RSA keys")
  1440. if encoding is serialization.Encoding.PEM:
  1441. write_bio = self._lib.PEM_write_bio_RSAPublicKey
  1442. elif encoding is serialization.Encoding.DER:
  1443. write_bio = self._lib.i2d_RSAPublicKey_bio
  1444. else:
  1445. raise ValueError("PKCS1 works only with PEM or DER encoding")
  1446. return self._bio_func_output(write_bio, cdata)
  1447. # OpenSSH + OpenSSH
  1448. if format is serialization.PublicFormat.OpenSSH:
  1449. if encoding is serialization.Encoding.OpenSSH:
  1450. return ssh.serialize_ssh_public_key(key)
  1451. raise ValueError(
  1452. "OpenSSH format must be used with OpenSSH encoding"
  1453. )
  1454. # Anything that key-specific code was supposed to handle earlier,
  1455. # like Raw, CompressedPoint, UncompressedPoint
  1456. raise ValueError("format is invalid with this key")
  1457. def dh_supported(self) -> bool:
  1458. return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
  1459. def generate_dh_parameters(
  1460. self, generator: int, key_size: int
  1461. ) -> dh.DHParameters:
  1462. if key_size < dh._MIN_MODULUS_SIZE:
  1463. raise ValueError(
  1464. "DH key_size must be at least {} bits".format(
  1465. dh._MIN_MODULUS_SIZE
  1466. )
  1467. )
  1468. if generator not in (2, 5):
  1469. raise ValueError("DH generator must be 2 or 5")
  1470. dh_param_cdata = self._lib.DH_new()
  1471. self.openssl_assert(dh_param_cdata != self._ffi.NULL)
  1472. dh_param_cdata = self._ffi.gc(dh_param_cdata, self._lib.DH_free)
  1473. res = self._lib.DH_generate_parameters_ex(
  1474. dh_param_cdata, key_size, generator, self._ffi.NULL
  1475. )
  1476. self.openssl_assert(res == 1)
  1477. return _DHParameters(self, dh_param_cdata)
  1478. def _dh_cdata_to_evp_pkey(self, dh_cdata):
  1479. evp_pkey = self._create_evp_pkey_gc()
  1480. res = self._lib.EVP_PKEY_set1_DH(evp_pkey, dh_cdata)
  1481. self.openssl_assert(res == 1)
  1482. return evp_pkey
  1483. def generate_dh_private_key(
  1484. self, parameters: dh.DHParameters
  1485. ) -> dh.DHPrivateKey:
  1486. dh_key_cdata = _dh_params_dup(
  1487. parameters._dh_cdata, self # type: ignore[attr-defined]
  1488. )
  1489. res = self._lib.DH_generate_key(dh_key_cdata)
  1490. self.openssl_assert(res == 1)
  1491. evp_pkey = self._dh_cdata_to_evp_pkey(dh_key_cdata)
  1492. return _DHPrivateKey(self, dh_key_cdata, evp_pkey)
  1493. def generate_dh_private_key_and_parameters(
  1494. self, generator: int, key_size: int
  1495. ) -> dh.DHPrivateKey:
  1496. return self.generate_dh_private_key(
  1497. self.generate_dh_parameters(generator, key_size)
  1498. )
  1499. def load_dh_private_numbers(
  1500. self, numbers: dh.DHPrivateNumbers
  1501. ) -> dh.DHPrivateKey:
  1502. parameter_numbers = numbers.public_numbers.parameter_numbers
  1503. dh_cdata = self._lib.DH_new()
  1504. self.openssl_assert(dh_cdata != self._ffi.NULL)
  1505. dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
  1506. p = self._int_to_bn(parameter_numbers.p)
  1507. g = self._int_to_bn(parameter_numbers.g)
  1508. if parameter_numbers.q is not None:
  1509. q = self._int_to_bn(parameter_numbers.q)
  1510. else:
  1511. q = self._ffi.NULL
  1512. pub_key = self._int_to_bn(numbers.public_numbers.y)
  1513. priv_key = self._int_to_bn(numbers.x)
  1514. res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
  1515. self.openssl_assert(res == 1)
  1516. res = self._lib.DH_set0_key(dh_cdata, pub_key, priv_key)
  1517. self.openssl_assert(res == 1)
  1518. codes = self._ffi.new("int[]", 1)
  1519. res = self._lib.Cryptography_DH_check(dh_cdata, codes)
  1520. self.openssl_assert(res == 1)
  1521. # DH_check will return DH_NOT_SUITABLE_GENERATOR if p % 24 does not
  1522. # equal 11 when the generator is 2 (a quadratic nonresidue).
  1523. # We want to ignore that error because p % 24 == 23 is also fine.
  1524. # Specifically, g is then a quadratic residue. Within the context of
  1525. # Diffie-Hellman this means it can only generate half the possible
  1526. # values. That sounds bad, but quadratic nonresidues leak a bit of
  1527. # the key to the attacker in exchange for having the full key space
  1528. # available. See: https://crypto.stackexchange.com/questions/12961
  1529. if codes[0] != 0 and not (
  1530. parameter_numbers.g == 2
  1531. and codes[0] ^ self._lib.DH_NOT_SUITABLE_GENERATOR == 0
  1532. ):
  1533. raise ValueError("DH private numbers did not pass safety checks.")
  1534. evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
  1535. return _DHPrivateKey(self, dh_cdata, evp_pkey)
  1536. def load_dh_public_numbers(
  1537. self, numbers: dh.DHPublicNumbers
  1538. ) -> dh.DHPublicKey:
  1539. dh_cdata = self._lib.DH_new()
  1540. self.openssl_assert(dh_cdata != self._ffi.NULL)
  1541. dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
  1542. parameter_numbers = numbers.parameter_numbers
  1543. p = self._int_to_bn(parameter_numbers.p)
  1544. g = self._int_to_bn(parameter_numbers.g)
  1545. if parameter_numbers.q is not None:
  1546. q = self._int_to_bn(parameter_numbers.q)
  1547. else:
  1548. q = self._ffi.NULL
  1549. pub_key = self._int_to_bn(numbers.y)
  1550. res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
  1551. self.openssl_assert(res == 1)
  1552. res = self._lib.DH_set0_key(dh_cdata, pub_key, self._ffi.NULL)
  1553. self.openssl_assert(res == 1)
  1554. evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
  1555. return _DHPublicKey(self, dh_cdata, evp_pkey)
  1556. def load_dh_parameter_numbers(
  1557. self, numbers: dh.DHParameterNumbers
  1558. ) -> dh.DHParameters:
  1559. dh_cdata = self._lib.DH_new()
  1560. self.openssl_assert(dh_cdata != self._ffi.NULL)
  1561. dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
  1562. p = self._int_to_bn(numbers.p)
  1563. g = self._int_to_bn(numbers.g)
  1564. if numbers.q is not None:
  1565. q = self._int_to_bn(numbers.q)
  1566. else:
  1567. q = self._ffi.NULL
  1568. res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
  1569. self.openssl_assert(res == 1)
  1570. return _DHParameters(self, dh_cdata)
  1571. def dh_parameters_supported(
  1572. self, p: int, g: int, q: typing.Optional[int] = None
  1573. ) -> bool:
  1574. dh_cdata = self._lib.DH_new()
  1575. self.openssl_assert(dh_cdata != self._ffi.NULL)
  1576. dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
  1577. p = self._int_to_bn(p)
  1578. g = self._int_to_bn(g)
  1579. if q is not None:
  1580. q = self._int_to_bn(q)
  1581. else:
  1582. q = self._ffi.NULL
  1583. res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
  1584. self.openssl_assert(res == 1)
  1585. codes = self._ffi.new("int[]", 1)
  1586. res = self._lib.Cryptography_DH_check(dh_cdata, codes)
  1587. self.openssl_assert(res == 1)
  1588. return codes[0] == 0
  1589. def dh_x942_serialization_supported(self) -> bool:
  1590. return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1
  1591. def x25519_load_public_bytes(self, data: bytes) -> x25519.X25519PublicKey:
  1592. # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can
  1593. # switch this to EVP_PKEY_new_raw_public_key
  1594. if len(data) != 32:
  1595. raise ValueError("An X25519 public key is 32 bytes long")
  1596. evp_pkey = self._create_evp_pkey_gc()
  1597. res = self._lib.EVP_PKEY_set_type(evp_pkey, self._lib.NID_X25519)
  1598. self.openssl_assert(res == 1)
  1599. res = self._lib.EVP_PKEY_set1_tls_encodedpoint(
  1600. evp_pkey, data, len(data)
  1601. )
  1602. self.openssl_assert(res == 1)
  1603. return _X25519PublicKey(self, evp_pkey)
  1604. def x25519_load_private_bytes(
  1605. self, data: bytes
  1606. ) -> x25519.X25519PrivateKey:
  1607. # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can
  1608. # switch this to EVP_PKEY_new_raw_private_key and drop the
  1609. # zeroed_bytearray garbage.
  1610. # OpenSSL only has facilities for loading PKCS8 formatted private
  1611. # keys using the algorithm identifiers specified in
  1612. # https://tools.ietf.org/html/draft-ietf-curdle-pkix-09.
  1613. # This is the standard PKCS8 prefix for a 32 byte X25519 key.
  1614. # The form is:
  1615. # 0:d=0 hl=2 l= 46 cons: SEQUENCE
  1616. # 2:d=1 hl=2 l= 1 prim: INTEGER :00
  1617. # 5:d=1 hl=2 l= 5 cons: SEQUENCE
  1618. # 7:d=2 hl=2 l= 3 prim: OBJECT :1.3.101.110
  1619. # 12:d=1 hl=2 l= 34 prim: OCTET STRING (the key)
  1620. # Of course there's a bit more complexity. In reality OCTET STRING
  1621. # contains an OCTET STRING of length 32! So the last two bytes here
  1622. # are \x04\x20, which is an OCTET STRING of length 32.
  1623. if len(data) != 32:
  1624. raise ValueError("An X25519 private key is 32 bytes long")
  1625. pkcs8_prefix = b'0.\x02\x01\x000\x05\x06\x03+en\x04"\x04 '
  1626. with self._zeroed_bytearray(48) as ba:
  1627. ba[0:16] = pkcs8_prefix
  1628. ba[16:] = data
  1629. bio = self._bytes_to_bio(ba)
  1630. evp_pkey = self._lib.d2i_PrivateKey_bio(bio.bio, self._ffi.NULL)
  1631. self.openssl_assert(evp_pkey != self._ffi.NULL)
  1632. evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
  1633. self.openssl_assert(
  1634. self._lib.EVP_PKEY_id(evp_pkey) == self._lib.EVP_PKEY_X25519
  1635. )
  1636. return _X25519PrivateKey(self, evp_pkey)
  1637. def _evp_pkey_keygen_gc(self, nid):
  1638. evp_pkey_ctx = self._lib.EVP_PKEY_CTX_new_id(nid, self._ffi.NULL)
  1639. self.openssl_assert(evp_pkey_ctx != self._ffi.NULL)
  1640. evp_pkey_ctx = self._ffi.gc(evp_pkey_ctx, self._lib.EVP_PKEY_CTX_free)
  1641. res = self._lib.EVP_PKEY_keygen_init(evp_pkey_ctx)
  1642. self.openssl_assert(res == 1)
  1643. evp_ppkey = self._ffi.new("EVP_PKEY **")
  1644. res = self._lib.EVP_PKEY_keygen(evp_pkey_ctx, evp_ppkey)
  1645. self.openssl_assert(res == 1)
  1646. self.openssl_assert(evp_ppkey[0] != self._ffi.NULL)
  1647. evp_pkey = self._ffi.gc(evp_ppkey[0], self._lib.EVP_PKEY_free)
  1648. return evp_pkey
  1649. def x25519_generate_key(self) -> x25519.X25519PrivateKey:
  1650. evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X25519)
  1651. return _X25519PrivateKey(self, evp_pkey)
  1652. def x25519_supported(self) -> bool:
  1653. if self._fips_enabled:
  1654. return False
  1655. return not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
  1656. def x448_load_public_bytes(self, data: bytes) -> x448.X448PublicKey:
  1657. if len(data) != 56:
  1658. raise ValueError("An X448 public key is 56 bytes long")
  1659. evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
  1660. self._lib.NID_X448, self._ffi.NULL, data, len(data)
  1661. )
  1662. self.openssl_assert(evp_pkey != self._ffi.NULL)
  1663. evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
  1664. return _X448PublicKey(self, evp_pkey)
  1665. def x448_load_private_bytes(self, data: bytes) -> x448.X448PrivateKey:
  1666. if len(data) != 56:
  1667. raise ValueError("An X448 private key is 56 bytes long")
  1668. data_ptr = self._ffi.from_buffer(data)
  1669. evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
  1670. self._lib.NID_X448, self._ffi.NULL, data_ptr, len(data)
  1671. )
  1672. self.openssl_assert(evp_pkey != self._ffi.NULL)
  1673. evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
  1674. return _X448PrivateKey(self, evp_pkey)
  1675. def x448_generate_key(self) -> x448.X448PrivateKey:
  1676. evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X448)
  1677. return _X448PrivateKey(self, evp_pkey)
  1678. def x448_supported(self) -> bool:
  1679. if self._fips_enabled:
  1680. return False
  1681. return (
  1682. not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111
  1683. and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
  1684. )
  1685. def ed25519_supported(self) -> bool:
  1686. if self._fips_enabled:
  1687. return False
  1688. return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B
  1689. def ed25519_load_public_bytes(
  1690. self, data: bytes
  1691. ) -> ed25519.Ed25519PublicKey:
  1692. utils._check_bytes("data", data)
  1693. if len(data) != ed25519._ED25519_KEY_SIZE:
  1694. raise ValueError("An Ed25519 public key is 32 bytes long")
  1695. evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
  1696. self._lib.NID_ED25519, self._ffi.NULL, data, len(data)
  1697. )
  1698. self.openssl_assert(evp_pkey != self._ffi.NULL)
  1699. evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
  1700. return _Ed25519PublicKey(self, evp_pkey)
  1701. def ed25519_load_private_bytes(
  1702. self, data: bytes
  1703. ) -> ed25519.Ed25519PrivateKey:
  1704. if len(data) != ed25519._ED25519_KEY_SIZE:
  1705. raise ValueError("An Ed25519 private key is 32 bytes long")
  1706. utils._check_byteslike("data", data)
  1707. data_ptr = self._ffi.from_buffer(data)
  1708. evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
  1709. self._lib.NID_ED25519, self._ffi.NULL, data_ptr, len(data)
  1710. )
  1711. self.openssl_assert(evp_pkey != self._ffi.NULL)
  1712. evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
  1713. return _Ed25519PrivateKey(self, evp_pkey)
  1714. def ed25519_generate_key(self) -> ed25519.Ed25519PrivateKey:
  1715. evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED25519)
  1716. return _Ed25519PrivateKey(self, evp_pkey)
  1717. def ed448_supported(self) -> bool:
  1718. if self._fips_enabled:
  1719. return False
  1720. return (
  1721. not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B
  1722. and not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
  1723. )
  1724. def ed448_load_public_bytes(self, data: bytes) -> ed448.Ed448PublicKey:
  1725. utils._check_bytes("data", data)
  1726. if len(data) != _ED448_KEY_SIZE:
  1727. raise ValueError("An Ed448 public key is 57 bytes long")
  1728. evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
  1729. self._lib.NID_ED448, self._ffi.NULL, data, len(data)
  1730. )
  1731. self.openssl_assert(evp_pkey != self._ffi.NULL)
  1732. evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
  1733. return _Ed448PublicKey(self, evp_pkey)
  1734. def ed448_load_private_bytes(self, data: bytes) -> ed448.Ed448PrivateKey:
  1735. utils._check_byteslike("data", data)
  1736. if len(data) != _ED448_KEY_SIZE:
  1737. raise ValueError("An Ed448 private key is 57 bytes long")
  1738. data_ptr = self._ffi.from_buffer(data)
  1739. evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
  1740. self._lib.NID_ED448, self._ffi.NULL, data_ptr, len(data)
  1741. )
  1742. self.openssl_assert(evp_pkey != self._ffi.NULL)
  1743. evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
  1744. return _Ed448PrivateKey(self, evp_pkey)
  1745. def ed448_generate_key(self) -> ed448.Ed448PrivateKey:
  1746. evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED448)
  1747. return _Ed448PrivateKey(self, evp_pkey)
  1748. def derive_scrypt(
  1749. self,
  1750. key_material: bytes,
  1751. salt: bytes,
  1752. length: int,
  1753. n: int,
  1754. r: int,
  1755. p: int,
  1756. ) -> bytes:
  1757. buf = self._ffi.new("unsigned char[]", length)
  1758. key_material_ptr = self._ffi.from_buffer(key_material)
  1759. res = self._lib.EVP_PBE_scrypt(
  1760. key_material_ptr,
  1761. len(key_material),
  1762. salt,
  1763. len(salt),
  1764. n,
  1765. r,
  1766. p,
  1767. scrypt._MEM_LIMIT,
  1768. buf,
  1769. length,
  1770. )
  1771. if res != 1:
  1772. errors = self._consume_errors_with_text()
  1773. # memory required formula explained here:
  1774. # https://blog.filippo.io/the-scrypt-parameters/
  1775. min_memory = 128 * n * r // (1024**2)
  1776. raise MemoryError(
  1777. "Not enough memory to derive key. These parameters require"
  1778. " {} MB of memory.".format(min_memory),
  1779. errors,
  1780. )
  1781. return self._ffi.buffer(buf)[:]
  1782. def aead_cipher_supported(self, cipher) -> bool:
  1783. cipher_name = aead._aead_cipher_name(cipher)
  1784. if self._fips_enabled and cipher_name not in self._fips_aead:
  1785. return False
  1786. # SIV isn't loaded through get_cipherbyname but instead a new fetch API
  1787. # only available in 3.0+. But if we know we're on 3.0+ then we know
  1788. # it's supported.
  1789. if cipher_name.endswith(b"-siv"):
  1790. return self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER == 1
  1791. else:
  1792. return (
  1793. self._lib.EVP_get_cipherbyname(cipher_name) != self._ffi.NULL
  1794. )
  1795. @contextlib.contextmanager
  1796. def _zeroed_bytearray(self, length: int) -> typing.Iterator[bytearray]:
  1797. """
  1798. This method creates a bytearray, which we copy data into (hopefully
  1799. also from a mutable buffer that can be dynamically erased!), and then
  1800. zero when we're done.
  1801. """
  1802. ba = bytearray(length)
  1803. try:
  1804. yield ba
  1805. finally:
  1806. self._zero_data(ba, length)
  1807. def _zero_data(self, data, length: int) -> None:
  1808. # We clear things this way because at the moment we're not
  1809. # sure of a better way that can guarantee it overwrites the
  1810. # memory of a bytearray and doesn't just replace the underlying char *.
  1811. for i in range(length):
  1812. data[i] = 0
  1813. @contextlib.contextmanager
  1814. def _zeroed_null_terminated_buf(self, data):
  1815. """
  1816. This method takes bytes, which can be a bytestring or a mutable
  1817. buffer like a bytearray, and yields a null-terminated version of that
  1818. data. This is required because PKCS12_parse doesn't take a length with
  1819. its password char * and ffi.from_buffer doesn't provide null
  1820. termination. So, to support zeroing the data via bytearray we
  1821. need to build this ridiculous construct that copies the memory, but
  1822. zeroes it after use.
  1823. """
  1824. if data is None:
  1825. yield self._ffi.NULL
  1826. else:
  1827. data_len = len(data)
  1828. buf = self._ffi.new("char[]", data_len + 1)
  1829. self._ffi.memmove(buf, data, data_len)
  1830. try:
  1831. yield buf
  1832. finally:
  1833. # Cast to a uint8_t * so we can assign by integer
  1834. self._zero_data(self._ffi.cast("uint8_t *", buf), data_len)
  1835. def load_key_and_certificates_from_pkcs12(
  1836. self, data: bytes, password: typing.Optional[bytes]
  1837. ) -> typing.Tuple[
  1838. typing.Optional[PRIVATE_KEY_TYPES],
  1839. typing.Optional[x509.Certificate],
  1840. typing.List[x509.Certificate],
  1841. ]:
  1842. pkcs12 = self.load_pkcs12(data, password)
  1843. return (
  1844. pkcs12.key,
  1845. pkcs12.cert.certificate if pkcs12.cert else None,
  1846. [cert.certificate for cert in pkcs12.additional_certs],
  1847. )
  1848. def load_pkcs12(
  1849. self, data: bytes, password: typing.Optional[bytes]
  1850. ) -> PKCS12KeyAndCertificates:
  1851. if password is not None:
  1852. utils._check_byteslike("password", password)
  1853. bio = self._bytes_to_bio(data)
  1854. p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL)
  1855. if p12 == self._ffi.NULL:
  1856. self._consume_errors()
  1857. raise ValueError("Could not deserialize PKCS12 data")
  1858. p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
  1859. evp_pkey_ptr = self._ffi.new("EVP_PKEY **")
  1860. x509_ptr = self._ffi.new("X509 **")
  1861. sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **")
  1862. with self._zeroed_null_terminated_buf(password) as password_buf:
  1863. res = self._lib.PKCS12_parse(
  1864. p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr
  1865. )
  1866. # Workaround for
  1867. # https://github.com/libressl-portable/portable/issues/659
  1868. if self._lib.CRYPTOGRAPHY_LIBRESSL_LESS_THAN_340:
  1869. self._consume_errors()
  1870. if res == 0:
  1871. self._consume_errors()
  1872. raise ValueError("Invalid password or PKCS12 data")
  1873. cert = None
  1874. key = None
  1875. additional_certificates = []
  1876. if evp_pkey_ptr[0] != self._ffi.NULL:
  1877. evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free)
  1878. key = self._evp_pkey_to_private_key(evp_pkey)
  1879. if x509_ptr[0] != self._ffi.NULL:
  1880. x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free)
  1881. cert_obj = self._ossl2cert(x509)
  1882. name = None
  1883. maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL)
  1884. if maybe_name != self._ffi.NULL:
  1885. name = self._ffi.string(maybe_name)
  1886. cert = PKCS12Certificate(cert_obj, name)
  1887. if sk_x509_ptr[0] != self._ffi.NULL:
  1888. sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free)
  1889. num = self._lib.sk_X509_num(sk_x509_ptr[0])
  1890. # In OpenSSL < 3.0.0 PKCS12 parsing reverses the order of the
  1891. # certificates.
  1892. indices: typing.Iterable[int]
  1893. if (
  1894. self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER
  1895. or self._lib.CRYPTOGRAPHY_IS_BORINGSSL
  1896. ):
  1897. indices = range(num)
  1898. else:
  1899. indices = reversed(range(num))
  1900. for i in indices:
  1901. x509 = self._lib.sk_X509_value(sk_x509, i)
  1902. self.openssl_assert(x509 != self._ffi.NULL)
  1903. x509 = self._ffi.gc(x509, self._lib.X509_free)
  1904. addl_cert = self._ossl2cert(x509)
  1905. addl_name = None
  1906. maybe_name = self._lib.X509_alias_get0(x509, self._ffi.NULL)
  1907. if maybe_name != self._ffi.NULL:
  1908. addl_name = self._ffi.string(maybe_name)
  1909. additional_certificates.append(
  1910. PKCS12Certificate(addl_cert, addl_name)
  1911. )
  1912. return PKCS12KeyAndCertificates(key, cert, additional_certificates)
  1913. def serialize_key_and_certificates_to_pkcs12(
  1914. self,
  1915. name: typing.Optional[bytes],
  1916. key: typing.Optional[_ALLOWED_PKCS12_TYPES],
  1917. cert: typing.Optional[x509.Certificate],
  1918. cas: typing.Optional[typing.List[_PKCS12_CAS_TYPES]],
  1919. encryption_algorithm: serialization.KeySerializationEncryption,
  1920. ) -> bytes:
  1921. password = None
  1922. if name is not None:
  1923. utils._check_bytes("name", name)
  1924. if isinstance(encryption_algorithm, serialization.NoEncryption):
  1925. nid_cert = -1
  1926. nid_key = -1
  1927. pkcs12_iter = 0
  1928. mac_iter = 0
  1929. elif isinstance(
  1930. encryption_algorithm, serialization.BestAvailableEncryption
  1931. ):
  1932. # PKCS12 encryption is hopeless trash and can never be fixed.
  1933. # This is the least terrible option.
  1934. nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
  1935. nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
  1936. # At least we can set this higher than OpenSSL's default
  1937. pkcs12_iter = 20000
  1938. # mac_iter chosen for compatibility reasons, see:
  1939. # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html
  1940. # Did we mention how lousy PKCS12 encryption is?
  1941. mac_iter = 1
  1942. password = encryption_algorithm.password
  1943. else:
  1944. raise ValueError("Unsupported key encryption type")
  1945. if cas is None or len(cas) == 0:
  1946. sk_x509 = self._ffi.NULL
  1947. else:
  1948. sk_x509 = self._lib.sk_X509_new_null()
  1949. sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free)
  1950. # This list is to keep the x509 values alive until end of function
  1951. ossl_cas = []
  1952. for ca in cas:
  1953. if isinstance(ca, PKCS12Certificate):
  1954. ca_alias = ca.friendly_name
  1955. ossl_ca = self._cert2ossl(ca.certificate)
  1956. with self._zeroed_null_terminated_buf(
  1957. ca_alias
  1958. ) as ca_name_buf:
  1959. res = self._lib.X509_alias_set1(
  1960. ossl_ca, ca_name_buf, -1
  1961. )
  1962. self.openssl_assert(res == 1)
  1963. else:
  1964. ossl_ca = self._cert2ossl(ca)
  1965. ossl_cas.append(ossl_ca)
  1966. res = self._lib.sk_X509_push(sk_x509, ossl_ca)
  1967. backend.openssl_assert(res >= 1)
  1968. with self._zeroed_null_terminated_buf(password) as password_buf:
  1969. with self._zeroed_null_terminated_buf(name) as name_buf:
  1970. ossl_cert = self._cert2ossl(cert) if cert else self._ffi.NULL
  1971. if key is not None:
  1972. evp_pkey = key._evp_pkey # type: ignore[union-attr]
  1973. else:
  1974. evp_pkey = self._ffi.NULL
  1975. p12 = self._lib.PKCS12_create(
  1976. password_buf,
  1977. name_buf,
  1978. evp_pkey,
  1979. ossl_cert,
  1980. sk_x509,
  1981. nid_key,
  1982. nid_cert,
  1983. pkcs12_iter,
  1984. mac_iter,
  1985. 0,
  1986. )
  1987. self.openssl_assert(p12 != self._ffi.NULL)
  1988. p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
  1989. bio = self._create_mem_bio_gc()
  1990. res = self._lib.i2d_PKCS12_bio(bio, p12)
  1991. self.openssl_assert(res > 0)
  1992. return self._read_mem_bio(bio)
  1993. def poly1305_supported(self) -> bool:
  1994. if self._fips_enabled:
  1995. return False
  1996. return self._lib.Cryptography_HAS_POLY1305 == 1
  1997. def create_poly1305_ctx(self, key: bytes) -> _Poly1305Context:
  1998. utils._check_byteslike("key", key)
  1999. if len(key) != _POLY1305_KEY_SIZE:
  2000. raise ValueError("A poly1305 key is 32 bytes long")
  2001. return _Poly1305Context(self, key)
  2002. def pkcs7_supported(self) -> bool:
  2003. return not self._lib.CRYPTOGRAPHY_IS_BORINGSSL
  2004. def load_pem_pkcs7_certificates(
  2005. self, data: bytes
  2006. ) -> typing.List[x509.Certificate]:
  2007. utils._check_bytes("data", data)
  2008. bio = self._bytes_to_bio(data)
  2009. p7 = self._lib.PEM_read_bio_PKCS7(
  2010. bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
  2011. )
  2012. if p7 == self._ffi.NULL:
  2013. self._consume_errors()
  2014. raise ValueError("Unable to parse PKCS7 data")
  2015. p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
  2016. return self._load_pkcs7_certificates(p7)
  2017. def load_der_pkcs7_certificates(
  2018. self, data: bytes
  2019. ) -> typing.List[x509.Certificate]:
  2020. utils._check_bytes("data", data)
  2021. bio = self._bytes_to_bio(data)
  2022. p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL)
  2023. if p7 == self._ffi.NULL:
  2024. self._consume_errors()
  2025. raise ValueError("Unable to parse PKCS7 data")
  2026. p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
  2027. return self._load_pkcs7_certificates(p7)
  2028. def _load_pkcs7_certificates(self, p7):
  2029. nid = self._lib.OBJ_obj2nid(p7.type)
  2030. self.openssl_assert(nid != self._lib.NID_undef)
  2031. if nid != self._lib.NID_pkcs7_signed:
  2032. raise UnsupportedAlgorithm(
  2033. "Only basic signed structures are currently supported. NID"
  2034. " for this data was {}".format(nid),
  2035. _Reasons.UNSUPPORTED_SERIALIZATION,
  2036. )
  2037. sk_x509 = p7.d.sign.cert
  2038. num = self._lib.sk_X509_num(sk_x509)
  2039. certs = []
  2040. for i in range(num):
  2041. x509 = self._lib.sk_X509_value(sk_x509, i)
  2042. self.openssl_assert(x509 != self._ffi.NULL)
  2043. res = self._lib.X509_up_ref(x509)
  2044. # When OpenSSL is less than 1.1.0 up_ref returns the current
  2045. # refcount. On 1.1.0+ it returns 1 for success.
  2046. self.openssl_assert(res >= 1)
  2047. x509 = self._ffi.gc(x509, self._lib.X509_free)
  2048. cert = self._ossl2cert(x509)
  2049. certs.append(cert)
  2050. return certs
  2051. def pkcs7_serialize_certificates(
  2052. self,
  2053. certs: typing.List[x509.Certificate],
  2054. encoding: serialization.Encoding,
  2055. ):
  2056. certs = list(certs)
  2057. if not certs or not all(
  2058. isinstance(cert, x509.Certificate) for cert in certs
  2059. ):
  2060. raise TypeError("certs must be a list of certs with length >= 1")
  2061. if encoding not in (
  2062. serialization.Encoding.PEM,
  2063. serialization.Encoding.DER,
  2064. ):
  2065. raise TypeError("encoding must DER or PEM from the Encoding enum")
  2066. certs_sk = self._lib.sk_X509_new_null()
  2067. certs_sk = self._ffi.gc(certs_sk, self._lib.sk_X509_free)
  2068. # This list is to keep the x509 values alive until end of function
  2069. ossl_certs = []
  2070. for cert in certs:
  2071. ossl_cert = self._cert2ossl(cert)
  2072. ossl_certs.append(ossl_cert)
  2073. res = self._lib.sk_X509_push(certs_sk, ossl_cert)
  2074. self.openssl_assert(res >= 1)
  2075. # We use PKCS7_sign here because it creates the PKCS7 and PKCS7_SIGNED
  2076. # structures for us rather than requiring manual assignment.
  2077. p7 = self._lib.PKCS7_sign(
  2078. self._ffi.NULL,
  2079. self._ffi.NULL,
  2080. certs_sk,
  2081. self._ffi.NULL,
  2082. self._lib.PKCS7_PARTIAL,
  2083. )
  2084. bio_out = self._create_mem_bio_gc()
  2085. if encoding is serialization.Encoding.PEM:
  2086. res = self._lib.PEM_write_bio_PKCS7_stream(
  2087. bio_out, p7, self._ffi.NULL, 0
  2088. )
  2089. else:
  2090. assert encoding is serialization.Encoding.DER
  2091. res = self._lib.i2d_PKCS7_bio(bio_out, p7)
  2092. self.openssl_assert(res == 1)
  2093. return self._read_mem_bio(bio_out)
  2094. def pkcs7_sign(
  2095. self,
  2096. builder: pkcs7.PKCS7SignatureBuilder,
  2097. encoding: serialization.Encoding,
  2098. options: typing.List[pkcs7.PKCS7Options],
  2099. ) -> bytes:
  2100. assert builder._data is not None
  2101. bio = self._bytes_to_bio(builder._data)
  2102. init_flags = self._lib.PKCS7_PARTIAL
  2103. final_flags = 0
  2104. if len(builder._additional_certs) == 0:
  2105. certs = self._ffi.NULL
  2106. else:
  2107. certs = self._lib.sk_X509_new_null()
  2108. certs = self._ffi.gc(certs, self._lib.sk_X509_free)
  2109. # This list is to keep the x509 values alive until end of function
  2110. ossl_certs = []
  2111. for cert in builder._additional_certs:
  2112. ossl_cert = self._cert2ossl(cert)
  2113. ossl_certs.append(ossl_cert)
  2114. res = self._lib.sk_X509_push(certs, ossl_cert)
  2115. self.openssl_assert(res >= 1)
  2116. if pkcs7.PKCS7Options.DetachedSignature in options:
  2117. # Don't embed the data in the PKCS7 structure
  2118. init_flags |= self._lib.PKCS7_DETACHED
  2119. final_flags |= self._lib.PKCS7_DETACHED
  2120. # This just inits a structure for us. However, there
  2121. # are flags we need to set, joy.
  2122. p7 = self._lib.PKCS7_sign(
  2123. self._ffi.NULL,
  2124. self._ffi.NULL,
  2125. certs,
  2126. self._ffi.NULL,
  2127. init_flags,
  2128. )
  2129. self.openssl_assert(p7 != self._ffi.NULL)
  2130. p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
  2131. signer_flags = 0
  2132. # These flags are configurable on a per-signature basis
  2133. # but we've deliberately chosen to make the API only allow
  2134. # setting it across all signatures for now.
  2135. if pkcs7.PKCS7Options.NoCapabilities in options:
  2136. signer_flags |= self._lib.PKCS7_NOSMIMECAP
  2137. elif pkcs7.PKCS7Options.NoAttributes in options:
  2138. signer_flags |= self._lib.PKCS7_NOATTR
  2139. if pkcs7.PKCS7Options.NoCerts in options:
  2140. signer_flags |= self._lib.PKCS7_NOCERTS
  2141. for certificate, private_key, hash_algorithm in builder._signers:
  2142. ossl_cert = self._cert2ossl(certificate)
  2143. md = self._evp_md_non_null_from_algorithm(hash_algorithm)
  2144. p7signerinfo = self._lib.PKCS7_sign_add_signer(
  2145. p7,
  2146. ossl_cert,
  2147. private_key._evp_pkey, # type: ignore[union-attr]
  2148. md,
  2149. signer_flags,
  2150. )
  2151. self.openssl_assert(p7signerinfo != self._ffi.NULL)
  2152. for option in options:
  2153. # DetachedSignature, NoCapabilities, and NoAttributes are already
  2154. # handled so we just need to check these last two options.
  2155. if option is pkcs7.PKCS7Options.Text:
  2156. final_flags |= self._lib.PKCS7_TEXT
  2157. elif option is pkcs7.PKCS7Options.Binary:
  2158. final_flags |= self._lib.PKCS7_BINARY
  2159. bio_out = self._create_mem_bio_gc()
  2160. if encoding is serialization.Encoding.SMIME:
  2161. # This finalizes the structure
  2162. res = self._lib.SMIME_write_PKCS7(
  2163. bio_out, p7, bio.bio, final_flags
  2164. )
  2165. elif encoding is serialization.Encoding.PEM:
  2166. res = self._lib.PKCS7_final(p7, bio.bio, final_flags)
  2167. self.openssl_assert(res == 1)
  2168. res = self._lib.PEM_write_bio_PKCS7_stream(
  2169. bio_out, p7, bio.bio, final_flags
  2170. )
  2171. else:
  2172. assert encoding is serialization.Encoding.DER
  2173. # We need to call finalize here becauase i2d_PKCS7_bio does not
  2174. # finalize.
  2175. res = self._lib.PKCS7_final(p7, bio.bio, final_flags)
  2176. self.openssl_assert(res == 1)
  2177. # OpenSSL 3.0 leaves a random bio error on the stack:
  2178. # https://github.com/openssl/openssl/issues/16681
  2179. if self._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER:
  2180. self._consume_errors()
  2181. res = self._lib.i2d_PKCS7_bio(bio_out, p7)
  2182. self.openssl_assert(res == 1)
  2183. return self._read_mem_bio(bio_out)
  2184. class GetCipherByName:
  2185. def __init__(self, fmt: str):
  2186. self._fmt = fmt
  2187. def __call__(self, backend: Backend, cipher: CipherAlgorithm, mode: Mode):
  2188. cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
  2189. return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
  2190. def _get_xts_cipher(backend: Backend, cipher: AES, mode):
  2191. cipher_name = "aes-{}-xts".format(cipher.key_size // 2)
  2192. return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
  2193. backend = Backend()