watcher.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. import asyncio
  2. import logging
  3. import math
  4. import sys
  5. import os
  6. import datetime
  7. import json
  8. import requests
  9. import aiohttp
  10. import requests
  11. from certstream.certlib import parse_ctl_entry
  12. class TransparencyWatcher(object):
  13. # These are a list of servers that we shouldn't even try to connect to. In testing they either had bad
  14. # DNS records, resolved to un-routable IP addresses, or didn't have valid SSL certificates.
  15. BAD_CT_SERVERS = [
  16. "alpha.ctlogs.org",
  17. "clicky.ct.letsencrypt.org",
  18. "ct.akamai.com",
  19. "ct.filippo.io/behindthesofa",
  20. "ct.gdca.com.cn",
  21. "ct.izenpe.com",
  22. "ct.izenpe.eus",
  23. "ct.sheca.com",
  24. "ct.startssl.com",
  25. "ct.wosign.com",
  26. "ctlog.api.venafi.com",
  27. "ctlog.gdca.com.cn",
  28. "ctlog.sheca.com",
  29. "ctlog.wosign.com",
  30. "ctlog2.wosign.com",
  31. "flimsy.ct.nordu.net:8080",
  32. "log.certly.io",
  33. "nessie2021.ct.digicert.com/log",
  34. "plausible.ct.nordu.net",
  35. "www.certificatetransparency.cn/ct",
  36. "ctserver.cnnic.cn",
  37. ]
  38. MAX_BLOCK_SIZE = 64
  39. def __init__(self, _loop):
  40. self.loop = _loop
  41. self.stopped = False
  42. self.logger = logging.getLogger('certstream.watcher')
  43. self.stream = asyncio.Queue(maxsize=3000)
  44. self.lastseen = None
  45. self.logger.info("Initializing the CTL watcher")
  46. def _initialize_ts_logs(self):
  47. try:
  48. self.transparency_logs = requests.get('https://www.gstatic.com/ct/log_list/all_logs_list.json').json()
  49. except Exception as e:
  50. self.logger.fatal("Invalid response from certificate directory! Exiting :(")
  51. sys.exit(1)
  52. self.logger.info("Retrieved transparency log with {} entries to watch.".format(len(self.transparency_logs['logs'])))
  53. for entry in self.transparency_logs['logs']:
  54. if entry['url'].endswith('/'):
  55. entry['url'] = entry['url'][:-1]
  56. self.logger.info(" + {}".format(entry['description']))
  57. async def _print_memory_usage(self):
  58. import objgraph
  59. import gc
  60. while True:
  61. print("Stream backlog : {}".format(self.stream.qsize()))
  62. gc.collect()
  63. objgraph.show_growth()
  64. await asyncio.sleep(60)
  65. def get_tasks(self):
  66. self._initialize_ts_logs()
  67. coroutines = []
  68. if os.getenv("DEBUG_MEMORY", False):
  69. coroutines.append(self._print_memory_usage())
  70. for log in self.transparency_logs['logs']:
  71. if log['url'] not in self.BAD_CT_SERVERS:
  72. coroutines.append(self.watch_for_updates_task(log))
  73. return coroutines
  74. def stop(self):
  75. self.logger.info('Got stop order, exiting...')
  76. self.stopped = True
  77. for task in asyncio.Task.all_tasks():
  78. task.cancel()
  79. async def watch_for_updates_task(self, operator_information):
  80. try:
  81. latest_size = 0
  82. name = operator_information['description']
  83. while not self.stopped:
  84. try:
  85. async with aiohttp.ClientSession(loop=self.loop) as session:
  86. async with session.get("https://{}/ct/v1/get-sth".format(operator_information['url'])) as response:
  87. info = await response.json()
  88. except aiohttp.ClientError as e:
  89. self.logger.info('[{}] Exception -> {}'.format(name, e))
  90. await asyncio.sleep(600)
  91. continue
  92. tree_size = info.get('tree_size')
  93. # TODO: Add in persistence and id tracking per log
  94. if latest_size == 0:
  95. latest_size = tree_size
  96. if latest_size < tree_size:
  97. self.logger.info('[{}] [{} -> {}] New certs found, updating!'.format(name, latest_size, tree_size))
  98. try:
  99. async for result_chunk in self.get_new_results(operator_information, latest_size, tree_size):
  100. for entry in result_chunk:
  101. cert_data = parse_ctl_entry(entry, operator_information)
  102. # if cert_data['update_type'] == 'X509LogEntry':
  103. # print(cert_data['source']['url'], cert_data['leaf_cert']['subject']['CN'], cert_data['leaf_cert']['extensions']['subjectAltName'])
  104. self.lastseen = datetime.datetime.now().isoformat()
  105. await self.stream.put(cert_data)
  106. except aiohttp.ClientError as e:
  107. self.logger.info('[{}] Exception -> {}'.format(name, e))
  108. await asyncio.sleep(600)
  109. continue
  110. except Exception as e:
  111. print("Encountered an exception while getting new results! -> {}".format(e))
  112. return
  113. latest_size = tree_size
  114. else:
  115. self.logger.debug('[{}][{}|{}] No update needed, continuing...'.format(name, latest_size, tree_size))
  116. await asyncio.sleep(30)
  117. except Exception as e:
  118. print("Encountered an exception while getting new results! -> {}".format(e))
  119. return
  120. async def get_new_results(self, operator_information, latest_size, tree_size):
  121. # The top of the tree isn't actually a cert yet, so the total_size is what we're aiming for
  122. total_size = tree_size - latest_size
  123. start = latest_size
  124. end = start + self.MAX_BLOCK_SIZE
  125. chunks = math.ceil(total_size / self.MAX_BLOCK_SIZE)
  126. self.logger.info("Retrieving {} certificates ({} -> {}) for {}".format(tree_size-latest_size, latest_size, tree_size, operator_information['description']))
  127. async with aiohttp.ClientSession(loop=self.loop) as session:
  128. for _ in range(chunks):
  129. # Cap the end to the last record in the DB
  130. if end >= tree_size:
  131. end = tree_size - 1
  132. assert end >= start, "End {} is less than start {}!".format(end, start)
  133. assert end < tree_size, "End {} is less than tree_size {}".format(end, tree_size)
  134. url = "https://{}/ct/v1/get-entries?start={}&end={}".format(operator_information['url'], start, end)
  135. async with session.get(url) as response:
  136. certificates = await response.json()
  137. if 'error_message' in certificates:
  138. print("error!")
  139. for index, cert in zip(range(start, end+1), certificates['entries']):
  140. cert['index'] = index
  141. yield certificates['entries']
  142. start += self.MAX_BLOCK_SIZE
  143. end = start + self.MAX_BLOCK_SIZE + 1
  144. async def mux_ctl_stream(watcher):
  145. logger = logging.getLogger('certstream.watcher')
  146. watch_suffix = os.getenv('WATCH_SUFFIX', None)
  147. slack_webhook = os.getenv('SLACK_WEBHOOK_URL', None)
  148. if not watch_suffix:
  149. return
  150. suffixes = []
  151. for suffix in watch_suffix.split(','):
  152. suffix = suffix.strip()
  153. suffixes.append(suffix)
  154. logger.info('Watching for: %s', suffixes)
  155. while True:
  156. cert_data = await watcher.stream.get()
  157. cn = cert_data['leaf_cert']['subject']['CN']
  158. alt = cert_data['leaf_cert']['extensions'].get('subjectAltName', '')
  159. source = cert_data['source']['url']
  160. found = False
  161. if cn and cn.endswith(tuple(suffixes)):
  162. found = True
  163. alts = []
  164. found_alts = []
  165. for dnsname in alt.split(','):
  166. dnsname = dnsname.strip().replace('DNS:', '')
  167. if dnsname.endswith(tuple(suffixes)):
  168. found = True
  169. found_alts.append(dnsname)
  170. else:
  171. alts.append(dnsname)
  172. if not found:
  173. continue
  174. if slack_webhook:
  175. alt_msg = ', '.join(found_alts)
  176. if found_alts and alts:
  177. alt_msg += ' and '
  178. if alts:
  179. alt_msg += '%s others' % len(alts)
  180. slack_data = {
  181. 'attachments': [
  182. {
  183. 'fallback': '%s (%s): %s' % (cn, source, alt),
  184. 'color': '#36a64f',
  185. 'fields': [
  186. {
  187. 'title': 'Common Name',
  188. 'value': cn,
  189. 'short': False
  190. },
  191. {
  192. 'title': 'Alt Name',
  193. 'value': alt_msg,
  194. 'short': False
  195. }
  196. ],
  197. 'footer': source
  198. }
  199. ]
  200. }
  201. response = requests.post(
  202. slack_webhook, data=json.dumps(slack_data),
  203. headers={'Content-Type': 'application/json'}
  204. )
  205. if response.status_code != 200:
  206. logger.exception(ValueError(
  207. 'Slack returned an error %s, the response is: %s',
  208. response.status_code, response.text
  209. ))
  210. else:
  211. logger.info('%s: %s, %s', cert_data['source']['url'], cert_data['leaf_cert']['subject']['CN'], cert_data['leaf_cert']['extensions'].get('subjectAltName', ''))
  212. if __name__ == "__main__":
  213. loop = asyncio.get_event_loop()
  214. watcher = TransparencyWatcher(loop)
  215. asyncio.ensure_future(asyncio.gather(*watcher.get_tasks()))
  216. asyncio.ensure_future(mux_ctl_stream(watcher))
  217. loop.run_forever()