watcher.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import asyncio
  2. import logging
  3. import math
  4. import sys
  5. import os
  6. import datetime
  7. import aiohttp
  8. import requests
  9. from certstream.certlib import parse_ctl_entry
  10. class TransparencyWatcher(object):
  11. # These are a list of servers that we shouldn't even try to connect to. In testing they either had bad
  12. # DNS records, resolved to un-routable IP addresses, or didn't have valid SSL certificates.
  13. BAD_CT_SERVERS = [
  14. "alpha.ctlogs.org",
  15. "clicky.ct.letsencrypt.org",
  16. "ct.akamai.com",
  17. "ct.filippo.io/behindthesofa",
  18. "ct.gdca.com.cn",
  19. "ct.izenpe.com",
  20. "ct.izenpe.eus",
  21. "ct.sheca.com",
  22. "ct.startssl.com",
  23. "ct.wosign.com",
  24. "ctlog.api.venafi.com",
  25. "ctlog.gdca.com.cn",
  26. "ctlog.sheca.com",
  27. "ctlog.wosign.com",
  28. "ctlog2.wosign.com",
  29. "flimsy.ct.nordu.net:8080",
  30. "log.certly.io",
  31. "nessie2021.ct.digicert.com/log",
  32. "plausible.ct.nordu.net",
  33. "www.certificatetransparency.cn/ct",
  34. "ctserver.cnnic.cn",
  35. ]
  36. MAX_BLOCK_SIZE = 64
  37. def __init__(self, _loop):
  38. self.loop = _loop
  39. self.stopped = False
  40. self.logger = logging.getLogger('certstream.watcher')
  41. self.stream = asyncio.Queue(maxsize=3000)
  42. self.lastseen = None
  43. self.logger.info("Initializing the CTL watcher")
  44. def _initialize_ts_logs(self):
  45. try:
  46. self.transparency_logs = requests.get('https://www.gstatic.com/ct/log_list/all_logs_list.json').json()
  47. except Exception as e:
  48. self.logger.fatal("Invalid response from certificate directory! Exiting :(")
  49. sys.exit(1)
  50. self.logger.info("Retrieved transparency log with {} entries to watch.".format(len(self.transparency_logs['logs'])))
  51. for entry in self.transparency_logs['logs']:
  52. if entry['url'].endswith('/'):
  53. entry['url'] = entry['url'][:-1]
  54. self.logger.info(" + {}".format(entry['description']))
  55. async def _print_memory_usage(self):
  56. import objgraph
  57. import gc
  58. while True:
  59. print("Stream backlog : {}".format(self.stream.qsize()))
  60. gc.collect()
  61. objgraph.show_growth()
  62. await asyncio.sleep(60)
  63. def get_tasks(self):
  64. self._initialize_ts_logs()
  65. coroutines = []
  66. if os.getenv("DEBUG_MEMORY", False):
  67. coroutines.append(self._print_memory_usage())
  68. for log in self.transparency_logs['logs']:
  69. if log['url'] not in self.BAD_CT_SERVERS:
  70. coroutines.append(self.watch_for_updates_task(log))
  71. return coroutines
  72. def stop(self):
  73. self.logger.info('Got stop order, exiting...')
  74. self.stopped = True
  75. for task in asyncio.Task.all_tasks():
  76. task.cancel()
  77. async def watch_for_updates_task(self, operator_information):
  78. try:
  79. latest_size = 0
  80. name = operator_information['description']
  81. while not self.stopped:
  82. try:
  83. async with aiohttp.ClientSession(loop=self.loop) as session:
  84. async with session.get("https://{}/ct/v1/get-sth".format(operator_information['url'])) as response:
  85. info = await response.json()
  86. except aiohttp.ClientError as e:
  87. self.logger.info('[{}] Exception -> {}'.format(name, e))
  88. await asyncio.sleep(600)
  89. continue
  90. tree_size = info.get('tree_size')
  91. # TODO: Add in persistence and id tracking per log
  92. if latest_size == 0:
  93. latest_size = tree_size
  94. if latest_size < tree_size:
  95. self.logger.info('[{}] [{} -> {}] New certs found, updating!'.format(name, latest_size, tree_size))
  96. try:
  97. async for result_chunk in self.get_new_results(operator_information, latest_size, tree_size):
  98. for entry in result_chunk:
  99. cert_data = parse_ctl_entry(entry, operator_information)
  100. # if cert_data['update_type'] == 'X509LogEntry':
  101. # print(cert_data['source']['url'], cert_data['leaf_cert']['subject']['CN'], cert_data['leaf_cert']['extensions']['subjectAltName'])
  102. self.lastseen = datetime.datetime.now().isoformat()
  103. await self.stream.put(cert_data)
  104. except aiohttp.ClientError as e:
  105. self.logger.info('[{}] Exception -> {}'.format(name, e))
  106. await asyncio.sleep(600)
  107. continue
  108. except Exception as e:
  109. print("Encountered an exception while getting new results! -> {}".format(e))
  110. return
  111. latest_size = tree_size
  112. else:
  113. self.logger.debug('[{}][{}|{}] No update needed, continuing...'.format(name, latest_size, tree_size))
  114. await asyncio.sleep(30)
  115. except Exception as e:
  116. print("Encountered an exception while getting new results! -> {}".format(e))
  117. return
  118. async def get_new_results(self, operator_information, latest_size, tree_size):
  119. # The top of the tree isn't actually a cert yet, so the total_size is what we're aiming for
  120. total_size = tree_size - latest_size
  121. start = latest_size
  122. end = start + self.MAX_BLOCK_SIZE
  123. chunks = math.ceil(total_size / self.MAX_BLOCK_SIZE)
  124. self.logger.info("Retrieving {} certificates ({} -> {}) for {}".format(tree_size-latest_size, latest_size, tree_size, operator_information['description']))
  125. async with aiohttp.ClientSession(loop=self.loop) as session:
  126. for _ in range(chunks):
  127. # Cap the end to the last record in the DB
  128. if end >= tree_size:
  129. end = tree_size - 1
  130. assert end >= start, "End {} is less than start {}!".format(end, start)
  131. assert end < tree_size, "End {} is less than tree_size {}".format(end, tree_size)
  132. url = "https://{}/ct/v1/get-entries?start={}&end={}".format(operator_information['url'], start, end)
  133. async with session.get(url) as response:
  134. certificates = await response.json()
  135. if 'error_message' in certificates:
  136. print("error!")
  137. for index, cert in zip(range(start, end+1), certificates['entries']):
  138. cert['index'] = index
  139. yield certificates['entries']
  140. start += self.MAX_BLOCK_SIZE
  141. end = start + self.MAX_BLOCK_SIZE + 1
  142. async def mux_ctl_stream(watcher):
  143. while True:
  144. cert_data = await watcher.stream.get()
  145. print(cert_data['source']['url'], cert_data['leaf_cert']['subject']['CN'], cert_data['leaf_cert']['extensions'].get('subjectAltName', ''))
  146. if __name__ == "__main__":
  147. loop = asyncio.get_event_loop()
  148. watcher = TransparencyWatcher(loop)
  149. asyncio.ensure_future(asyncio.gather(*watcher.get_tasks()))
  150. asyncio.ensure_future(mux_ctl_stream(watcher))
  151. loop.run_forever()