123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- import asyncio
- import logging
- import math
- import sys
- import os
- import datetime
- import aiohttp
- import requests
- from certstream.certlib import parse_ctl_entry
- class TransparencyWatcher(object):
- # These are a list of servers that we shouldn't even try to connect to. In testing they either had bad
- # DNS records, resolved to un-routable IP addresses, or didn't have valid SSL certificates.
- BAD_CT_SERVERS = [
- "alpha.ctlogs.org",
- "clicky.ct.letsencrypt.org",
- "ct.akamai.com",
- "ct.filippo.io/behindthesofa",
- "ct.gdca.com.cn",
- "ct.izenpe.com",
- "ct.izenpe.eus",
- "ct.sheca.com",
- "ct.startssl.com",
- "ct.wosign.com",
- "ctlog.api.venafi.com",
- "ctlog.gdca.com.cn",
- "ctlog.sheca.com",
- "ctlog.wosign.com",
- "ctlog2.wosign.com",
- "flimsy.ct.nordu.net:8080",
- "log.certly.io",
- "nessie2021.ct.digicert.com/log",
- "plausible.ct.nordu.net",
- "www.certificatetransparency.cn/ct",
- "ctserver.cnnic.cn",
- ]
- MAX_BLOCK_SIZE = 64
- def __init__(self, _loop):
- self.loop = _loop
- self.stopped = False
- self.logger = logging.getLogger('certstream.watcher')
- self.stream = asyncio.Queue(maxsize=3000)
- self.lastseen = None
- self.logger.info("Initializing the CTL watcher")
- def _initialize_ts_logs(self):
- try:
- self.transparency_logs = requests.get('https://www.gstatic.com/ct/log_list/all_logs_list.json').json()
- except Exception as e:
- self.logger.fatal("Invalid response from certificate directory! Exiting :(")
- sys.exit(1)
- self.logger.info("Retrieved transparency log with {} entries to watch.".format(len(self.transparency_logs['logs'])))
- for entry in self.transparency_logs['logs']:
- if entry['url'].endswith('/'):
- entry['url'] = entry['url'][:-1]
- self.logger.info(" + {}".format(entry['description']))
- async def _print_memory_usage(self):
- import objgraph
- import gc
- while True:
- print("Stream backlog : {}".format(self.stream.qsize()))
- gc.collect()
- objgraph.show_growth()
- await asyncio.sleep(60)
- def get_tasks(self):
- self._initialize_ts_logs()
- coroutines = []
- if os.getenv("DEBUG_MEMORY", False):
- coroutines.append(self._print_memory_usage())
- for log in self.transparency_logs['logs']:
- if log['url'] not in self.BAD_CT_SERVERS:
- coroutines.append(self.watch_for_updates_task(log))
- return coroutines
- def stop(self):
- self.logger.info('Got stop order, exiting...')
- self.stopped = True
- for task in asyncio.Task.all_tasks():
- task.cancel()
- async def watch_for_updates_task(self, operator_information):
- try:
- latest_size = 0
- name = operator_information['description']
- while not self.stopped:
- try:
- async with aiohttp.ClientSession(loop=self.loop) as session:
- async with session.get("https://{}/ct/v1/get-sth".format(operator_information['url'])) as response:
- info = await response.json()
- except aiohttp.ClientError as e:
- self.logger.info('[{}] Exception -> {}'.format(name, e))
- await asyncio.sleep(600)
- continue
- tree_size = info.get('tree_size')
- # TODO: Add in persistence and id tracking per log
- if latest_size == 0:
- latest_size = tree_size
- if latest_size < tree_size:
- self.logger.info('[{}] [{} -> {}] New certs found, updating!'.format(name, latest_size, tree_size))
- try:
- async for result_chunk in self.get_new_results(operator_information, latest_size, tree_size):
- for entry in result_chunk:
- cert_data = parse_ctl_entry(entry, operator_information)
- # if cert_data['update_type'] == 'X509LogEntry':
- # print(cert_data['source']['url'], cert_data['leaf_cert']['subject']['CN'], cert_data['leaf_cert']['extensions']['subjectAltName'])
- self.lastseen = datetime.datetime.now().isoformat()
- await self.stream.put(cert_data)
- except aiohttp.ClientError as e:
- self.logger.info('[{}] Exception -> {}'.format(name, e))
- await asyncio.sleep(600)
- continue
- except Exception as e:
- print("Encountered an exception while getting new results! -> {}".format(e))
- return
- latest_size = tree_size
- else:
- self.logger.debug('[{}][{}|{}] No update needed, continuing...'.format(name, latest_size, tree_size))
- await asyncio.sleep(30)
- except Exception as e:
- print("Encountered an exception while getting new results! -> {}".format(e))
- return
- async def get_new_results(self, operator_information, latest_size, tree_size):
- # The top of the tree isn't actually a cert yet, so the total_size is what we're aiming for
- total_size = tree_size - latest_size
- start = latest_size
- end = start + self.MAX_BLOCK_SIZE
- chunks = math.ceil(total_size / self.MAX_BLOCK_SIZE)
- self.logger.info("Retrieving {} certificates ({} -> {}) for {}".format(tree_size-latest_size, latest_size, tree_size, operator_information['description']))
- async with aiohttp.ClientSession(loop=self.loop) as session:
- for _ in range(chunks):
- # Cap the end to the last record in the DB
- if end >= tree_size:
- end = tree_size - 1
- assert end >= start, "End {} is less than start {}!".format(end, start)
- assert end < tree_size, "End {} is less than tree_size {}".format(end, tree_size)
- url = "https://{}/ct/v1/get-entries?start={}&end={}".format(operator_information['url'], start, end)
- async with session.get(url) as response:
- certificates = await response.json()
- if 'error_message' in certificates:
- print("error!")
- for index, cert in zip(range(start, end+1), certificates['entries']):
- cert['index'] = index
- yield certificates['entries']
- start += self.MAX_BLOCK_SIZE
- end = start + self.MAX_BLOCK_SIZE + 1
- async def mux_ctl_stream(watcher):
- while True:
- cert_data = await watcher.stream.get()
- print(cert_data['source']['url'], cert_data['leaf_cert']['subject']['CN'], cert_data['leaf_cert']['extensions'].get('subjectAltName', ''))
- if __name__ == "__main__":
- loop = asyncio.get_event_loop()
- watcher = TransparencyWatcher(loop)
- asyncio.ensure_future(asyncio.gather(*watcher.get_tasks()))
- asyncio.ensure_future(mux_ctl_stream(watcher))
- loop.run_forever()
|