certlib.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import base64
  2. import datetime
  3. import logging
  4. import time
  5. from collections import OrderedDict
  6. from OpenSSL import crypto
  7. from construct import Struct, Byte, Int16ub, Int64ub, Enum, Bytes, \
  8. Int24ub, this, GreedyBytes, GreedyRange, Terminated, Embedded
  9. MerkleTreeHeader = Struct(
  10. "Version" / Byte,
  11. "MerkleLeafType" / Byte,
  12. "Timestamp" / Int64ub,
  13. "LogEntryType" / Enum(Int16ub, X509LogEntryType=0, PrecertLogEntryType=1),
  14. "Entry" / GreedyBytes
  15. )
  16. Certificate = Struct(
  17. "Length" / Int24ub,
  18. "CertData" / Bytes(this.Length)
  19. )
  20. CertificateChain = Struct(
  21. "ChainLength" / Int24ub,
  22. "Chain" / GreedyRange(Certificate),
  23. )
  24. PreCertEntry = Struct(
  25. "LeafCert" / Certificate,
  26. Embedded(CertificateChain),
  27. Terminated
  28. )
  29. def dump_extensions(certificate):
  30. extensions = {}
  31. for x in range(certificate.get_extension_count()):
  32. extension_name = ""
  33. try:
  34. extension_name = certificate.get_extension(x).get_short_name()
  35. if extension_name == b'UNDEF':
  36. continue
  37. extensions[extension_name.decode('latin-1')] = certificate.get_extension(x).__str__()
  38. except:
  39. try:
  40. extensions[extension_name.decode('latin-1')] = "NULL"
  41. except Exception as e:
  42. logging.debug("Extension parsing error -> {}".format(e))
  43. return extensions
  44. def serialize_certificate(certificate):
  45. subject = certificate.get_subject()
  46. not_before_datetime = datetime.datetime.strptime(certificate.get_notBefore().decode('ascii'), "%Y%m%d%H%M%SZ")
  47. not_after_datetime = datetime.datetime.strptime(certificate.get_notAfter().decode('ascii'), "%Y%m%d%H%M%SZ")
  48. return {
  49. "subject": {
  50. "aggregated": repr(certificate.get_subject())[18:-2],
  51. "C": subject.C,
  52. "ST": subject.ST,
  53. "L": subject.L,
  54. "O": subject.O,
  55. "OU": subject.OU,
  56. "CN": subject.CN
  57. },
  58. "extensions": dump_extensions(certificate),
  59. "not_before": not_before_datetime.timestamp(),
  60. "not_after": not_after_datetime.timestamp(),
  61. "serial_number": '{0:x}'.format(int(certificate.get_serial_number())),
  62. "fingerprint": str(certificate.digest("sha1"), 'utf-8'),
  63. "as_der": base64.b64encode(
  64. crypto.dump_certificate(
  65. crypto.FILETYPE_ASN1, certificate
  66. )
  67. ).decode('utf-8')
  68. }
  69. def add_all_domains(cert_data):
  70. all_domains = []
  71. # Apparently we have certificates with null CNs....what?
  72. if cert_data['leaf_cert']['subject']['CN']:
  73. all_domains.append(cert_data['leaf_cert']['subject']['CN'])
  74. subject_alternative_name = cert_data['leaf_cert']['extensions'].get('subjectAltName')
  75. if subject_alternative_name:
  76. for entry in subject_alternative_name.split(', '):
  77. if entry.startswith('DNS:'):
  78. all_domains.append(entry.replace('DNS:', ''))
  79. cert_data['leaf_cert']['all_domains'] = list(OrderedDict.fromkeys(all_domains))
  80. return cert_data
  81. def parse_ctl_entry(entry, operator_information):
  82. mtl = MerkleTreeHeader.parse(base64.b64decode(entry['leaf_input']))
  83. cert_data = {}
  84. if mtl.LogEntryType == "X509LogEntryType":
  85. cert_data['update_type'] = "X509LogEntry"
  86. chain = [crypto.load_certificate(crypto.FILETYPE_ASN1, Certificate.parse(mtl.Entry).CertData)]
  87. extra_data = CertificateChain.parse(base64.b64decode(entry['extra_data']))
  88. for cert in extra_data.Chain:
  89. chain.append(crypto.load_certificate(crypto.FILETYPE_ASN1, cert.CertData))
  90. else:
  91. cert_data['update_type'] = "PreCertEntry"
  92. extra_data = PreCertEntry.parse(base64.b64decode(entry['extra_data']))
  93. chain = [crypto.load_certificate(crypto.FILETYPE_ASN1, extra_data.LeafCert.CertData)]
  94. for cert in extra_data.Chain:
  95. chain.append(
  96. crypto.load_certificate(crypto.FILETYPE_ASN1, cert.CertData)
  97. )
  98. cert_data.update({
  99. "leaf_cert": serialize_certificate(chain[0]),
  100. "chain": [serialize_certificate(x) for x in chain[1:]],
  101. "cert_index": entry['index'],
  102. "seen": time.time()
  103. })
  104. add_all_domains(cert_data)
  105. cert_data['source'] = {
  106. "url": operator_information['url'],
  107. "name": operator_information['description']
  108. }
  109. return cert_data