cert.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import logging
  2. from datetime import datetime, timedelta
  3. # Make a call to strptime before starting threads to
  4. # prevent thread safety issues.
  5. datetime.strptime('1970-01-01 12:00:00', "%Y-%m-%d %H:%M:%S")
  6. try:
  7. from pyasn1.codec.der import decoder, encoder
  8. from pyasn1.type.univ import Any, ObjectIdentifier, OctetString
  9. from pyasn1.type.char import BMPString, IA5String, UTF8String
  10. from pyasn1.type.useful import GeneralizedTime
  11. from pyasn1_modules.rfc2459 import (Certificate, DirectoryString,
  12. SubjectAltName, GeneralNames,
  13. GeneralName)
  14. from pyasn1_modules.rfc2459 import id_ce_subjectAltName as SUBJECT_ALT_NAME
  15. from pyasn1_modules.rfc2459 import id_at_commonName as COMMON_NAME
  16. XMPP_ADDR = ObjectIdentifier('1.3.6.1.5.5.7.8.5')
  17. SRV_NAME = ObjectIdentifier('1.3.6.1.5.5.7.8.7')
  18. HAVE_PYASN1 = True
  19. except ImportError:
  20. HAVE_PYASN1 = False
  21. log = logging.getLogger(__name__)
  22. class CertificateError(Exception):
  23. pass
  24. def decode_str(data):
  25. encoding = 'utf-16-be' if isinstance(data, BMPString) else 'utf-8'
  26. return bytes(data).decode(encoding)
  27. def extract_names(raw_cert):
  28. results = {'CN': set(),
  29. 'DNS': set(),
  30. 'SRV': set(),
  31. 'URI': set(),
  32. 'XMPPAddr': set()}
  33. cert = decoder.decode(raw_cert, asn1Spec=Certificate())[0]
  34. tbs = cert.getComponentByName('tbsCertificate')
  35. subject = tbs.getComponentByName('subject')
  36. extensions = tbs.getComponentByName('extensions') or []
  37. # Extract the CommonName(s) from the cert.
  38. for rdnss in subject:
  39. for rdns in rdnss:
  40. for name in rdns:
  41. oid = name.getComponentByName('type')
  42. value = name.getComponentByName('value')
  43. if oid != COMMON_NAME:
  44. continue
  45. value = decoder.decode(value, asn1Spec=DirectoryString())[0]
  46. value = decode_str(value.getComponent())
  47. results['CN'].add(value)
  48. # Extract the Subject Alternate Names (DNS, SRV, URI, XMPPAddr)
  49. for extension in extensions:
  50. oid = extension.getComponentByName('extnID')
  51. if oid != SUBJECT_ALT_NAME:
  52. continue
  53. value = decoder.decode(extension.getComponentByName('extnValue'),
  54. asn1Spec=OctetString())[0]
  55. sa_names = decoder.decode(value, asn1Spec=SubjectAltName())[0]
  56. for name in sa_names:
  57. name_type = name.getName()
  58. if name_type == 'dNSName':
  59. results['DNS'].add(decode_str(name.getComponent()))
  60. if name_type == 'uniformResourceIdentifier':
  61. value = decode_str(name.getComponent())
  62. if value.startswith('xmpp:'):
  63. results['URI'].add(value[5:])
  64. elif name_type == 'otherName':
  65. name = name.getComponent()
  66. oid = name.getComponentByName('type-id')
  67. value = name.getComponentByName('value')
  68. if oid == XMPP_ADDR:
  69. value = decoder.decode(value, asn1Spec=UTF8String())[0]
  70. results['XMPPAddr'].add(decode_str(value))
  71. elif oid == SRV_NAME:
  72. value = decoder.decode(value, asn1Spec=IA5String())[0]
  73. results['SRV'].add(decode_str(value))
  74. return results
  75. def extract_dates(raw_cert):
  76. if not HAVE_PYASN1:
  77. log.warning("Could not find pyasn1 and pyasn1_modules. " + \
  78. "SSL certificate expiration COULD NOT BE VERIFIED.")
  79. return None, None
  80. cert = decoder.decode(raw_cert, asn1Spec=Certificate())[0]
  81. tbs = cert.getComponentByName('tbsCertificate')
  82. validity = tbs.getComponentByName('validity')
  83. not_before = validity.getComponentByName('notBefore')
  84. not_before = str(not_before.getComponent())
  85. not_before = datetime.strptime(not_before, '%Y%m%d%H%M%SZ')
  86. not_after = validity.getComponentByName('notAfter')
  87. not_after = str(not_after.getComponent())
  88. not_after = datetime.strptime(not_after, '%Y%m%d%H%M%SZ')
  89. return not_before, not_after
  90. def get_ttl(raw_cert):
  91. not_before, not_after = extract_dates(raw_cert)
  92. if not_after is None:
  93. return None
  94. return not_after - datetime.utcnow()
  95. def verify(expected, raw_cert):
  96. if not HAVE_PYASN1:
  97. log.warning("Could not find pyasn1 and pyasn1_modules. " + \
  98. "SSL certificate COULD NOT BE VERIFIED.")
  99. return
  100. not_before, not_after = extract_dates(raw_cert)
  101. cert_names = extract_names(raw_cert)
  102. now = datetime.utcnow()
  103. if not_before > now:
  104. raise CertificateError(
  105. 'Certificate has not entered its valid date range.')
  106. if not_after <= now:
  107. raise CertificateError(
  108. 'Certificate has expired.')
  109. if '.' in expected:
  110. expected_wild = expected[expected.index('.'):]
  111. else:
  112. expected_wild = expected
  113. expected_srv = '_xmpp-client.%s' % expected
  114. for name in cert_names['XMPPAddr']:
  115. if name == expected:
  116. return True
  117. for name in cert_names['SRV']:
  118. if name == expected_srv or name == expected:
  119. return True
  120. for name in cert_names['DNS']:
  121. if name == expected:
  122. return True
  123. if name.startswith('*'):
  124. if '.' in name:
  125. name_wild = name[name.index('.'):]
  126. else:
  127. name_wild = name
  128. if expected_wild == name_wild:
  129. return True
  130. for name in cert_names['URI']:
  131. if name == expected:
  132. return True
  133. for name in cert_names['CN']:
  134. if name == expected:
  135. return True
  136. raise CertificateError(
  137. 'Could not match certificate against hostname: %s' % expected)