import asyncio import logging import math import sys import os import datetime import aiohttp import requests from certstream.certlib import parse_ctl_entry class TransparencyWatcher(object): # These are a list of servers that we shouldn't even try to connect to. In testing they either had bad # DNS records, resolved to un-routable IP addresses, or didn't have valid SSL certificates. BAD_CT_SERVERS = [ "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ] MAX_BLOCK_SIZE = 64 def __init__(self, _loop): self.loop = _loop self.stopped = False self.logger = logging.getLogger('certstream.watcher') = asyncio.Queue(maxsize=3000) self.lastseen = None"Initializing the CTL watcher") def _initialize_ts_logs(self): try: self.transparency_logs = requests.get('').json() except Exception as e: self.logger.fatal("Invalid response from certificate directory! Exiting :(") sys.exit(1)"Retrieved transparency log with {} entries to watch.".format(len(self.transparency_logs['logs']))) for entry in self.transparency_logs['logs']: if entry['url'].endswith('/'): entry['url'] = entry['url'][:-1]" + {}".format(entry['description'])) async def _print_memory_usage(self): import objgraph import gc while True: print("Stream backlog : {}".format( gc.collect() objgraph.show_growth() await asyncio.sleep(60) def get_tasks(self): self._initialize_ts_logs() coroutines = [] if os.getenv("DEBUG_MEMORY", False): coroutines.append(self._print_memory_usage()) for log in self.transparency_logs['logs']: if log['url'] not in self.BAD_CT_SERVERS: coroutines.append(self.watch_for_updates_task(log)) return coroutines def stop(self):'Got stop order, exiting...') self.stopped = True for task in asyncio.Task.all_tasks(): task.cancel() async def watch_for_updates_task(self, operator_information): try: latest_size = 0 name = operator_information['description'] while not self.stopped: try: async with aiohttp.ClientSession(loop=self.loop) as session: async with session.get("https://{}/ct/v1/get-sth".format(operator_information['url'])) as response: info = await response.json() except aiohttp.ClientError as e:'[{}] Exception -> {}'.format(name, e)) await asyncio.sleep(600) continue tree_size = info.get('tree_size') # TODO: Add in persistence and id tracking per log if latest_size == 0: latest_size = tree_size if latest_size < tree_size:'[{}] [{} -> {}] New certs found, updating!'.format(name, latest_size, tree_size)) try: async for result_chunk in self.get_new_results(operator_information, latest_size, tree_size): for entry in result_chunk: cert_data = parse_ctl_entry(entry, operator_information) # if cert_data['update_type'] == 'X509LogEntry': # print(cert_data['source']['url'], cert_data['leaf_cert']['subject']['CN'], cert_data['leaf_cert']['extensions']['subjectAltName']) self.lastseen = await except aiohttp.ClientError as e:'[{}] Exception -> {}'.format(name, e)) await asyncio.sleep(600) continue except Exception as e: print("Encountered an exception while getting new results! -> {}".format(e)) return latest_size = tree_size else: self.logger.debug('[{}][{}|{}] No update needed, continuing...'.format(name, latest_size, tree_size)) await asyncio.sleep(30) except Exception as e: print("Encountered an exception while getting new results! -> {}".format(e)) return async def get_new_results(self, operator_information, latest_size, tree_size): # The top of the tree isn't actually a cert yet, so the total_size is what we're aiming for total_size = tree_size - latest_size start = latest_size end = start + self.MAX_BLOCK_SIZE chunks = math.ceil(total_size / self.MAX_BLOCK_SIZE)"Retrieving {} certificates ({} -> {}) for {}".format(tree_size-latest_size, latest_size, tree_size, operator_information['description'])) async with aiohttp.ClientSession(loop=self.loop) as session: for _ in range(chunks): # Cap the end to the last record in the DB if end >= tree_size: end = tree_size - 1 assert end >= start, "End {} is less than start {}!".format(end, start) assert end < tree_size, "End {} is less than tree_size {}".format(end, tree_size) url = "https://{}/ct/v1/get-entries?start={}&end={}".format(operator_information['url'], start, end) async with session.get(url) as response: certificates = await response.json() if 'error_message' in certificates: print("error!") for index, cert in zip(range(start, end+1), certificates['entries']): cert['index'] = index yield certificates['entries'] start += self.MAX_BLOCK_SIZE end = start + self.MAX_BLOCK_SIZE + 1 async def mux_ctl_stream(watcher): while True: cert_data = await print(cert_data['source']['url'], cert_data['leaf_cert']['subject']['CN'], cert_data['leaf_cert']['extensions'].get('subjectAltName', '')) if __name__ == "__main__": loop = asyncio.get_event_loop() watcher = TransparencyWatcher(loop) asyncio.ensure_future(asyncio.gather(*watcher.get_tasks())) asyncio.ensure_future(mux_ctl_stream(watcher)) loop.run_forever()