ocsp.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. import abc
  5. import datetime
  6. import typing
  7. from cryptography import utils
  8. from cryptography import x509
  9. from cryptography.hazmat.bindings._rust import ocsp
  10. from cryptography.hazmat.primitives import hashes, serialization
  11. from cryptography.hazmat.primitives.asymmetric.types import (
  12. CERTIFICATE_PRIVATE_KEY_TYPES,
  13. )
  14. from cryptography.x509.base import (
  15. _EARLIEST_UTC_TIME,
  16. _convert_to_naive_utc_time,
  17. _reject_duplicate_extension,
  18. )
  19. class OCSPResponderEncoding(utils.Enum):
  20. HASH = "By Hash"
  21. NAME = "By Name"
  22. class OCSPResponseStatus(utils.Enum):
  23. SUCCESSFUL = 0
  24. MALFORMED_REQUEST = 1
  25. INTERNAL_ERROR = 2
  26. TRY_LATER = 3
  27. SIG_REQUIRED = 5
  28. UNAUTHORIZED = 6
  29. _ALLOWED_HASHES = (
  30. hashes.SHA1,
  31. hashes.SHA224,
  32. hashes.SHA256,
  33. hashes.SHA384,
  34. hashes.SHA512,
  35. )
  36. def _verify_algorithm(algorithm: hashes.HashAlgorithm) -> None:
  37. if not isinstance(algorithm, _ALLOWED_HASHES):
  38. raise ValueError(
  39. "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512"
  40. )
  41. class OCSPCertStatus(utils.Enum):
  42. GOOD = 0
  43. REVOKED = 1
  44. UNKNOWN = 2
  45. class _SingleResponse:
  46. def __init__(
  47. self,
  48. cert: x509.Certificate,
  49. issuer: x509.Certificate,
  50. algorithm: hashes.HashAlgorithm,
  51. cert_status: OCSPCertStatus,
  52. this_update: datetime.datetime,
  53. next_update: typing.Optional[datetime.datetime],
  54. revocation_time: typing.Optional[datetime.datetime],
  55. revocation_reason: typing.Optional[x509.ReasonFlags],
  56. ):
  57. if not isinstance(cert, x509.Certificate) or not isinstance(
  58. issuer, x509.Certificate
  59. ):
  60. raise TypeError("cert and issuer must be a Certificate")
  61. _verify_algorithm(algorithm)
  62. if not isinstance(this_update, datetime.datetime):
  63. raise TypeError("this_update must be a datetime object")
  64. if next_update is not None and not isinstance(
  65. next_update, datetime.datetime
  66. ):
  67. raise TypeError("next_update must be a datetime object or None")
  68. self._cert = cert
  69. self._issuer = issuer
  70. self._algorithm = algorithm
  71. self._this_update = this_update
  72. self._next_update = next_update
  73. if not isinstance(cert_status, OCSPCertStatus):
  74. raise TypeError(
  75. "cert_status must be an item from the OCSPCertStatus enum"
  76. )
  77. if cert_status is not OCSPCertStatus.REVOKED:
  78. if revocation_time is not None:
  79. raise ValueError(
  80. "revocation_time can only be provided if the certificate "
  81. "is revoked"
  82. )
  83. if revocation_reason is not None:
  84. raise ValueError(
  85. "revocation_reason can only be provided if the certificate"
  86. " is revoked"
  87. )
  88. else:
  89. if not isinstance(revocation_time, datetime.datetime):
  90. raise TypeError("revocation_time must be a datetime object")
  91. revocation_time = _convert_to_naive_utc_time(revocation_time)
  92. if revocation_time < _EARLIEST_UTC_TIME:
  93. raise ValueError(
  94. "The revocation_time must be on or after"
  95. " 1950 January 1."
  96. )
  97. if revocation_reason is not None and not isinstance(
  98. revocation_reason, x509.ReasonFlags
  99. ):
  100. raise TypeError(
  101. "revocation_reason must be an item from the ReasonFlags "
  102. "enum or None"
  103. )
  104. self._cert_status = cert_status
  105. self._revocation_time = revocation_time
  106. self._revocation_reason = revocation_reason
  107. class OCSPRequest(metaclass=abc.ABCMeta):
  108. @abc.abstractproperty
  109. def issuer_key_hash(self) -> bytes:
  110. """
  111. The hash of the issuer public key
  112. """
  113. @abc.abstractproperty
  114. def issuer_name_hash(self) -> bytes:
  115. """
  116. The hash of the issuer name
  117. """
  118. @abc.abstractproperty
  119. def hash_algorithm(self) -> hashes.HashAlgorithm:
  120. """
  121. The hash algorithm used in the issuer name and key hashes
  122. """
  123. @abc.abstractproperty
  124. def serial_number(self) -> int:
  125. """
  126. The serial number of the cert whose status is being checked
  127. """
  128. @abc.abstractmethod
  129. def public_bytes(self, encoding: serialization.Encoding) -> bytes:
  130. """
  131. Serializes the request to DER
  132. """
  133. @abc.abstractproperty
  134. def extensions(self) -> x509.Extensions:
  135. """
  136. The list of request extensions. Not single request extensions.
  137. """
  138. class OCSPSingleResponse(metaclass=abc.ABCMeta):
  139. @abc.abstractproperty
  140. def certificate_status(self) -> OCSPCertStatus:
  141. """
  142. The status of the certificate (an element from the OCSPCertStatus enum)
  143. """
  144. @abc.abstractproperty
  145. def revocation_time(self) -> typing.Optional[datetime.datetime]:
  146. """
  147. The date of when the certificate was revoked or None if not
  148. revoked.
  149. """
  150. @abc.abstractproperty
  151. def revocation_reason(self) -> typing.Optional[x509.ReasonFlags]:
  152. """
  153. The reason the certificate was revoked or None if not specified or
  154. not revoked.
  155. """
  156. @abc.abstractproperty
  157. def this_update(self) -> datetime.datetime:
  158. """
  159. The most recent time at which the status being indicated is known by
  160. the responder to have been correct
  161. """
  162. @abc.abstractproperty
  163. def next_update(self) -> typing.Optional[datetime.datetime]:
  164. """
  165. The time when newer information will be available
  166. """
  167. @abc.abstractproperty
  168. def issuer_key_hash(self) -> bytes:
  169. """
  170. The hash of the issuer public key
  171. """
  172. @abc.abstractproperty
  173. def issuer_name_hash(self) -> bytes:
  174. """
  175. The hash of the issuer name
  176. """
  177. @abc.abstractproperty
  178. def hash_algorithm(self) -> hashes.HashAlgorithm:
  179. """
  180. The hash algorithm used in the issuer name and key hashes
  181. """
  182. @abc.abstractproperty
  183. def serial_number(self) -> int:
  184. """
  185. The serial number of the cert whose status is being checked
  186. """
  187. class OCSPResponse(metaclass=abc.ABCMeta):
  188. @abc.abstractproperty
  189. def responses(self) -> typing.Iterator[OCSPSingleResponse]:
  190. """
  191. An iterator over the individual SINGLERESP structures in the
  192. response
  193. """
  194. @abc.abstractproperty
  195. def response_status(self) -> OCSPResponseStatus:
  196. """
  197. The status of the response. This is a value from the OCSPResponseStatus
  198. enumeration
  199. """
  200. @abc.abstractproperty
  201. def signature_algorithm_oid(self) -> x509.ObjectIdentifier:
  202. """
  203. The ObjectIdentifier of the signature algorithm
  204. """
  205. @abc.abstractproperty
  206. def signature_hash_algorithm(
  207. self,
  208. ) -> typing.Optional[hashes.HashAlgorithm]:
  209. """
  210. Returns a HashAlgorithm corresponding to the type of the digest signed
  211. """
  212. @abc.abstractproperty
  213. def signature(self) -> bytes:
  214. """
  215. The signature bytes
  216. """
  217. @abc.abstractproperty
  218. def tbs_response_bytes(self) -> bytes:
  219. """
  220. The tbsResponseData bytes
  221. """
  222. @abc.abstractproperty
  223. def certificates(self) -> typing.List[x509.Certificate]:
  224. """
  225. A list of certificates used to help build a chain to verify the OCSP
  226. response. This situation occurs when the OCSP responder uses a delegate
  227. certificate.
  228. """
  229. @abc.abstractproperty
  230. def responder_key_hash(self) -> typing.Optional[bytes]:
  231. """
  232. The responder's key hash or None
  233. """
  234. @abc.abstractproperty
  235. def responder_name(self) -> typing.Optional[x509.Name]:
  236. """
  237. The responder's Name or None
  238. """
  239. @abc.abstractproperty
  240. def produced_at(self) -> datetime.datetime:
  241. """
  242. The time the response was produced
  243. """
  244. @abc.abstractproperty
  245. def certificate_status(self) -> OCSPCertStatus:
  246. """
  247. The status of the certificate (an element from the OCSPCertStatus enum)
  248. """
  249. @abc.abstractproperty
  250. def revocation_time(self) -> typing.Optional[datetime.datetime]:
  251. """
  252. The date of when the certificate was revoked or None if not
  253. revoked.
  254. """
  255. @abc.abstractproperty
  256. def revocation_reason(self) -> typing.Optional[x509.ReasonFlags]:
  257. """
  258. The reason the certificate was revoked or None if not specified or
  259. not revoked.
  260. """
  261. @abc.abstractproperty
  262. def this_update(self) -> datetime.datetime:
  263. """
  264. The most recent time at which the status being indicated is known by
  265. the responder to have been correct
  266. """
  267. @abc.abstractproperty
  268. def next_update(self) -> typing.Optional[datetime.datetime]:
  269. """
  270. The time when newer information will be available
  271. """
  272. @abc.abstractproperty
  273. def issuer_key_hash(self) -> bytes:
  274. """
  275. The hash of the issuer public key
  276. """
  277. @abc.abstractproperty
  278. def issuer_name_hash(self) -> bytes:
  279. """
  280. The hash of the issuer name
  281. """
  282. @abc.abstractproperty
  283. def hash_algorithm(self) -> hashes.HashAlgorithm:
  284. """
  285. The hash algorithm used in the issuer name and key hashes
  286. """
  287. @abc.abstractproperty
  288. def serial_number(self) -> int:
  289. """
  290. The serial number of the cert whose status is being checked
  291. """
  292. @abc.abstractproperty
  293. def extensions(self) -> x509.Extensions:
  294. """
  295. The list of response extensions. Not single response extensions.
  296. """
  297. @abc.abstractproperty
  298. def single_extensions(self) -> x509.Extensions:
  299. """
  300. The list of single response extensions. Not response extensions.
  301. """
  302. @abc.abstractmethod
  303. def public_bytes(self, encoding: serialization.Encoding) -> bytes:
  304. """
  305. Serializes the response to DER
  306. """
  307. class OCSPRequestBuilder:
  308. def __init__(
  309. self,
  310. request: typing.Optional[
  311. typing.Tuple[
  312. x509.Certificate, x509.Certificate, hashes.HashAlgorithm
  313. ]
  314. ] = None,
  315. extensions: typing.List[x509.Extension[x509.ExtensionType]] = [],
  316. ) -> None:
  317. self._request = request
  318. self._extensions = extensions
  319. def add_certificate(
  320. self,
  321. cert: x509.Certificate,
  322. issuer: x509.Certificate,
  323. algorithm: hashes.HashAlgorithm,
  324. ) -> "OCSPRequestBuilder":
  325. if self._request is not None:
  326. raise ValueError("Only one certificate can be added to a request")
  327. _verify_algorithm(algorithm)
  328. if not isinstance(cert, x509.Certificate) or not isinstance(
  329. issuer, x509.Certificate
  330. ):
  331. raise TypeError("cert and issuer must be a Certificate")
  332. return OCSPRequestBuilder((cert, issuer, algorithm), self._extensions)
  333. def add_extension(
  334. self, extval: x509.ExtensionType, critical: bool
  335. ) -> "OCSPRequestBuilder":
  336. if not isinstance(extval, x509.ExtensionType):
  337. raise TypeError("extension must be an ExtensionType")
  338. extension = x509.Extension(extval.oid, critical, extval)
  339. _reject_duplicate_extension(extension, self._extensions)
  340. return OCSPRequestBuilder(
  341. self._request, self._extensions + [extension]
  342. )
  343. def build(self) -> OCSPRequest:
  344. if self._request is None:
  345. raise ValueError("You must add a certificate before building")
  346. return ocsp.create_ocsp_request(self)
  347. class OCSPResponseBuilder:
  348. def __init__(
  349. self,
  350. response: typing.Optional[_SingleResponse] = None,
  351. responder_id: typing.Optional[
  352. typing.Tuple[x509.Certificate, OCSPResponderEncoding]
  353. ] = None,
  354. certs: typing.Optional[typing.List[x509.Certificate]] = None,
  355. extensions: typing.List[x509.Extension[x509.ExtensionType]] = [],
  356. ):
  357. self._response = response
  358. self._responder_id = responder_id
  359. self._certs = certs
  360. self._extensions = extensions
  361. def add_response(
  362. self,
  363. cert: x509.Certificate,
  364. issuer: x509.Certificate,
  365. algorithm: hashes.HashAlgorithm,
  366. cert_status: OCSPCertStatus,
  367. this_update: datetime.datetime,
  368. next_update: typing.Optional[datetime.datetime],
  369. revocation_time: typing.Optional[datetime.datetime],
  370. revocation_reason: typing.Optional[x509.ReasonFlags],
  371. ) -> "OCSPResponseBuilder":
  372. if self._response is not None:
  373. raise ValueError("Only one response per OCSPResponse.")
  374. singleresp = _SingleResponse(
  375. cert,
  376. issuer,
  377. algorithm,
  378. cert_status,
  379. this_update,
  380. next_update,
  381. revocation_time,
  382. revocation_reason,
  383. )
  384. return OCSPResponseBuilder(
  385. singleresp,
  386. self._responder_id,
  387. self._certs,
  388. self._extensions,
  389. )
  390. def responder_id(
  391. self, encoding: OCSPResponderEncoding, responder_cert: x509.Certificate
  392. ) -> "OCSPResponseBuilder":
  393. if self._responder_id is not None:
  394. raise ValueError("responder_id can only be set once")
  395. if not isinstance(responder_cert, x509.Certificate):
  396. raise TypeError("responder_cert must be a Certificate")
  397. if not isinstance(encoding, OCSPResponderEncoding):
  398. raise TypeError(
  399. "encoding must be an element from OCSPResponderEncoding"
  400. )
  401. return OCSPResponseBuilder(
  402. self._response,
  403. (responder_cert, encoding),
  404. self._certs,
  405. self._extensions,
  406. )
  407. def certificates(
  408. self, certs: typing.Iterable[x509.Certificate]
  409. ) -> "OCSPResponseBuilder":
  410. if self._certs is not None:
  411. raise ValueError("certificates may only be set once")
  412. certs = list(certs)
  413. if len(certs) == 0:
  414. raise ValueError("certs must not be an empty list")
  415. if not all(isinstance(x, x509.Certificate) for x in certs):
  416. raise TypeError("certs must be a list of Certificates")
  417. return OCSPResponseBuilder(
  418. self._response,
  419. self._responder_id,
  420. certs,
  421. self._extensions,
  422. )
  423. def add_extension(
  424. self, extval: x509.ExtensionType, critical: bool
  425. ) -> "OCSPResponseBuilder":
  426. if not isinstance(extval, x509.ExtensionType):
  427. raise TypeError("extension must be an ExtensionType")
  428. extension = x509.Extension(extval.oid, critical, extval)
  429. _reject_duplicate_extension(extension, self._extensions)
  430. return OCSPResponseBuilder(
  431. self._response,
  432. self._responder_id,
  433. self._certs,
  434. self._extensions + [extension],
  435. )
  436. def sign(
  437. self,
  438. private_key: CERTIFICATE_PRIVATE_KEY_TYPES,
  439. algorithm: typing.Optional[hashes.HashAlgorithm],
  440. ) -> OCSPResponse:
  441. if self._response is None:
  442. raise ValueError("You must add a response before signing")
  443. if self._responder_id is None:
  444. raise ValueError("You must add a responder_id before signing")
  445. return ocsp.create_ocsp_response(
  446. OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm
  447. )
  448. @classmethod
  449. def build_unsuccessful(
  450. cls, response_status: OCSPResponseStatus
  451. ) -> OCSPResponse:
  452. if not isinstance(response_status, OCSPResponseStatus):
  453. raise TypeError(
  454. "response_status must be an item from OCSPResponseStatus"
  455. )
  456. if response_status is OCSPResponseStatus.SUCCESSFUL:
  457. raise ValueError("response_status cannot be SUCCESSFUL")
  458. return ocsp.create_ocsp_response(response_status, None, None, None)
  459. def load_der_ocsp_request(data: bytes) -> OCSPRequest:
  460. return ocsp.load_der_ocsp_request(data)
  461. def load_der_ocsp_response(data: bytes) -> OCSPResponse:
  462. return ocsp.load_der_ocsp_response(data)