import asyncio import logging import math import sys import os import datetime import json import requests import aiohttp import requests from certstream.certlib import parse_ctl_entry class TransparencyWatcher(object): # These are a list of servers that we shouldn't even try to connect to. In testing they either had bad # DNS records, resolved to un-routable IP addresses, or didn't have valid SSL certificates. BAD_CT_SERVERS = [ "alpha.ctlogs.org", "clicky.ct.letsencrypt.org", "ct.akamai.com", "ct.filippo.io/behindthesofa", "ct.gdca.com.cn", "ct.izenpe.com", "ct.izenpe.eus", "ct.sheca.com", "ct.startssl.com", "ct.wosign.com", "ctlog.api.venafi.com", "ctlog.gdca.com.cn", "ctlog.sheca.com", "ctlog.wosign.com", "ctlog2.wosign.com", "flimsy.ct.nordu.net:8080", "log.certly.io", "nessie2021.ct.digicert.com/log", "plausible.ct.nordu.net", "www.certificatetransparency.cn/ct", "ctserver.cnnic.cn", ] MAX_BLOCK_SIZE = 64 def __init__(self, _loop): self.loop = _loop self.stopped = False self.logger = logging.getLogger('certstream.watcher') self.stream = asyncio.Queue(maxsize=3000) self.lastseen = None self.logger.info("Initializing the CTL watcher") def _initialize_ts_logs(self): try: self.transparency_logs = requests.get('https://www.gstatic.com/ct/log_list/all_logs_list.json').json() except Exception as e: self.logger.fatal("Invalid response from certificate directory! Exiting :(") sys.exit(1) self.logger.info("Retrieved transparency log with {} entries to watch.".format(len(self.transparency_logs['logs']))) for entry in self.transparency_logs['logs']: if entry['url'].endswith('/'): entry['url'] = entry['url'][:-1] self.logger.info(" + {}".format(entry['description'])) async def _print_memory_usage(self): import objgraph import gc while True: print("Stream backlog : {}".format(self.stream.qsize())) gc.collect() objgraph.show_growth() await asyncio.sleep(60) def get_tasks(self): self._initialize_ts_logs() coroutines = [] if os.getenv("DEBUG_MEMORY", False): coroutines.append(self._print_memory_usage()) for log in self.transparency_logs['logs']: if log['url'] not in self.BAD_CT_SERVERS: coroutines.append(self.watch_for_updates_task(log)) return coroutines def stop(self): self.logger.info('Got stop order, exiting...') self.stopped = True for task in asyncio.Task.all_tasks(): task.cancel() async def watch_for_updates_task(self, operator_information): try: latest_size = 0 name = operator_information['description'] while not self.stopped: try: async with aiohttp.ClientSession(loop=self.loop) as session: async with session.get("https://{}/ct/v1/get-sth".format(operator_information['url'])) as response: info = await response.json() except aiohttp.ClientError as e: self.logger.info('[{}] Exception -> {}'.format(name, e)) await asyncio.sleep(600) continue tree_size = info.get('tree_size') # TODO: Add in persistence and id tracking per log if latest_size == 0: latest_size = tree_size if latest_size < tree_size: self.logger.info('[{}] [{} -> {}] New certs found, updating!'.format(name, latest_size, tree_size)) try: async for result_chunk in self.get_new_results(operator_information, latest_size, tree_size): for entry in result_chunk: cert_data = parse_ctl_entry(entry, operator_information) # if cert_data['update_type'] == 'X509LogEntry': # print(cert_data['source']['url'], cert_data['leaf_cert']['subject']['CN'], cert_data['leaf_cert']['extensions']['subjectAltName']) self.lastseen = datetime.datetime.now().isoformat() await self.stream.put(cert_data) except aiohttp.ClientError as e: self.logger.info('[{}] Exception -> {}'.format(name, e)) await asyncio.sleep(600) continue except Exception as e: print("Encountered an exception while getting new results! -> {}".format(e)) return latest_size = tree_size else: self.logger.debug('[{}][{}|{}] No update needed, continuing...'.format(name, latest_size, tree_size)) await asyncio.sleep(30) except Exception as e: print("Encountered an exception while getting new results! -> {}".format(e)) return async def get_new_results(self, operator_information, latest_size, tree_size): # The top of the tree isn't actually a cert yet, so the total_size is what we're aiming for total_size = tree_size - latest_size start = latest_size end = start + self.MAX_BLOCK_SIZE chunks = math.ceil(total_size / self.MAX_BLOCK_SIZE) self.logger.info("Retrieving {} certificates ({} -> {}) for {}".format(tree_size-latest_size, latest_size, tree_size, operator_information['description'])) async with aiohttp.ClientSession(loop=self.loop) as session: for _ in range(chunks): # Cap the end to the last record in the DB if end >= tree_size: end = tree_size - 1 assert end >= start, "End {} is less than start {}!".format(end, start) assert end < tree_size, "End {} is less than tree_size {}".format(end, tree_size) url = "https://{}/ct/v1/get-entries?start={}&end={}".format(operator_information['url'], start, end) async with session.get(url) as response: certificates = await response.json() if 'error_message' in certificates: print("error!") for index, cert in zip(range(start, end+1), certificates['entries']): cert['index'] = index yield certificates['entries'] start += self.MAX_BLOCK_SIZE end = start + self.MAX_BLOCK_SIZE + 1 async def mux_ctl_stream(watcher): logger = logging.getLogger('certstream.watcher') watch_suffix = os.getenv('WATCH_SUFFIX', None) slack_webhook = os.getenv('SLACK_WEBHOOK_URL', None) if not watch_suffix: return suffixes = [] for suffix in watch_suffix.split(','): suffix = suffix.strip() suffixes.append(suffix) logger.info('Watching for: %s', suffixes) while True: cert_data = await watcher.stream.get() cn = cert_data['leaf_cert']['subject']['CN'] alt = cert_data['leaf_cert']['extensions'].get('subjectAltName', '') source = cert_data['source']['url'] found = False if cn and cn.endswith(tuple(suffixes)): found = True alts = [] found_alts = [] for dnsname in alt.split(','): dnsname = dnsname.strip().replace('DNS:', '') if dnsname.endswith(tuple(suffixes)): found = True found_alts.append(dnsname) else: alts.append(dnsname) if not found: continue if slack_webhook: alt_msg = ', '.join(found_alts) if found_alts and alts: alt_msg += ' and ' if alts: alt_msg += '%s others' % len(alts) slack_data = { 'attachments': [ { 'fallback': '%s (%s): %s' % (cn, source, alt), 'color': '#36a64f', 'fields': [ { 'title': 'Common Name', 'value': cn, 'short': False }, { 'title': 'Alt Name', 'value': alt_msg, 'short': False } ], 'footer': source } ] } response = requests.post( slack_webhook, data=json.dumps(slack_data), headers={'Content-Type': 'application/json'} ) if response.status_code != 200: logger.exception(ValueError( 'Slack returned an error %s, the response is: %s', response.status_code, response.text )) else: logger.info('%s: %s, %s', cert_data['source']['url'], cert_data['leaf_cert']['subject']['CN'], cert_data['leaf_cert']['extensions'].get('subjectAltName', '')) if __name__ == "__main__": loop = asyncio.get_event_loop() watcher = TransparencyWatcher(loop) asyncio.ensure_future(asyncio.gather(*watcher.get_tasks())) asyncio.ensure_future(mux_ctl_stream(watcher)) loop.run_forever()