123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- import asyncio
- import logging
- import math
- import sys
- import os
- import datetime
- import json
- import requests
- import aiohttp
- import requests
- from certstream.certlib import parse_ctl_entry
- class TransparencyWatcher(object):
- # These are a list of servers that we shouldn't even try to connect to. In testing they either had bad
- # DNS records, resolved to un-routable IP addresses, or didn't have valid SSL certificates.
- BAD_CT_SERVERS = [
- "alpha.ctlogs.org",
- "clicky.ct.letsencrypt.org",
- "ct.akamai.com",
- "ct.filippo.io/behindthesofa",
- "ct.gdca.com.cn",
- "ct.izenpe.com",
- "ct.izenpe.eus",
- "ct.sheca.com",
- "ct.startssl.com",
- "ct.wosign.com",
- "ctlog.api.venafi.com",
- "ctlog.gdca.com.cn",
- "ctlog.sheca.com",
- "ctlog.wosign.com",
- "ctlog2.wosign.com",
- "flimsy.ct.nordu.net:8080",
- "log.certly.io",
- "nessie2021.ct.digicert.com/log",
- "plausible.ct.nordu.net",
- "www.certificatetransparency.cn/ct",
- "ctserver.cnnic.cn",
- ]
- MAX_BLOCK_SIZE = 64
- def __init__(self, _loop):
- self.loop = _loop
- self.stopped = False
- self.logger = logging.getLogger('certstream.watcher')
- self.stream = asyncio.Queue(maxsize=3000)
- self.lastseen = None
- self.logger.info("Initializing the CTL watcher")
- def _initialize_ts_logs(self):
- try:
- self.transparency_logs = requests.get('https://www.gstatic.com/ct/log_list/all_logs_list.json').json()
- except Exception as e:
- self.logger.fatal("Invalid response from certificate directory! Exiting :(")
- sys.exit(1)
- self.logger.info("Retrieved transparency log with {} entries to watch.".format(len(self.transparency_logs['logs'])))
- for entry in self.transparency_logs['logs']:
- if entry['url'].endswith('/'):
- entry['url'] = entry['url'][:-1]
- self.logger.info(" + {}".format(entry['description']))
- async def _print_memory_usage(self):
- import objgraph
- import gc
- while True:
- print("Stream backlog : {}".format(self.stream.qsize()))
- gc.collect()
- objgraph.show_growth()
- await asyncio.sleep(60)
- def get_tasks(self):
- self._initialize_ts_logs()
- coroutines = []
- if os.getenv("DEBUG_MEMORY", False):
- coroutines.append(self._print_memory_usage())
- for log in self.transparency_logs['logs']:
- if log['url'] not in self.BAD_CT_SERVERS:
- coroutines.append(self.watch_for_updates_task(log))
- return coroutines
- def stop(self):
- self.logger.info('Got stop order, exiting...')
- self.stopped = True
- for task in asyncio.Task.all_tasks():
- task.cancel()
- async def watch_for_updates_task(self, operator_information):
- try:
- latest_size = 0
- name = operator_information['description']
- while not self.stopped:
- try:
- async with aiohttp.ClientSession(loop=self.loop) as session:
- async with session.get("https://{}/ct/v1/get-sth".format(operator_information['url'])) as response:
- info = await response.json()
- except aiohttp.ClientError as e:
- self.logger.info('[{}] Exception -> {}'.format(name, e))
- await asyncio.sleep(600)
- continue
- tree_size = info.get('tree_size')
- # TODO: Add in persistence and id tracking per log
- if latest_size == 0:
- latest_size = tree_size
- if latest_size < tree_size:
- self.logger.info('[{}] [{} -> {}] New certs found, updating!'.format(name, latest_size, tree_size))
- try:
- async for result_chunk in self.get_new_results(operator_information, latest_size, tree_size):
- for entry in result_chunk:
- cert_data = parse_ctl_entry(entry, operator_information)
- # if cert_data['update_type'] == 'X509LogEntry':
- # print(cert_data['source']['url'], cert_data['leaf_cert']['subject']['CN'], cert_data['leaf_cert']['extensions']['subjectAltName'])
- self.lastseen = datetime.datetime.now().isoformat()
- await self.stream.put(cert_data)
- except aiohttp.ClientError as e:
- self.logger.info('[{}] Exception -> {}'.format(name, e))
- await asyncio.sleep(600)
- continue
- except Exception as e:
- print("Encountered an exception while getting new results! -> {}".format(e))
- return
- latest_size = tree_size
- else:
- self.logger.debug('[{}][{}|{}] No update needed, continuing...'.format(name, latest_size, tree_size))
- await asyncio.sleep(30)
- except Exception as e:
- print("Encountered an exception while getting new results! -> {}".format(e))
- return
- async def get_new_results(self, operator_information, latest_size, tree_size):
- # The top of the tree isn't actually a cert yet, so the total_size is what we're aiming for
- total_size = tree_size - latest_size
- start = latest_size
- end = start + self.MAX_BLOCK_SIZE
- chunks = math.ceil(total_size / self.MAX_BLOCK_SIZE)
- self.logger.info("Retrieving {} certificates ({} -> {}) for {}".format(tree_size-latest_size, latest_size, tree_size, operator_information['description']))
- async with aiohttp.ClientSession(loop=self.loop) as session:
- for _ in range(chunks):
- # Cap the end to the last record in the DB
- if end >= tree_size:
- end = tree_size - 1
- assert end >= start, "End {} is less than start {}!".format(end, start)
- assert end < tree_size, "End {} is less than tree_size {}".format(end, tree_size)
- url = "https://{}/ct/v1/get-entries?start={}&end={}".format(operator_information['url'], start, end)
- async with session.get(url) as response:
- certificates = await response.json()
- if 'error_message' in certificates:
- print("error!")
- for index, cert in zip(range(start, end+1), certificates['entries']):
- cert['index'] = index
- yield certificates['entries']
- start += self.MAX_BLOCK_SIZE
- end = start + self.MAX_BLOCK_SIZE + 1
- async def mux_ctl_stream(watcher):
- logger = logging.getLogger('certstream.watcher')
- watch_suffix = os.getenv('WATCH_SUFFIX', None)
- slack_webhook = os.getenv('SLACK_WEBHOOK_URL', None)
- if not watch_suffix:
- return
- suffixes = []
- for suffix in watch_suffix.split(','):
- suffix = suffix.strip()
- suffixes.append(suffix)
- logger.info('Watching for: %s', suffixes)
- while True:
- cert_data = await watcher.stream.get()
- cn = cert_data['leaf_cert']['subject']['CN']
- alt = cert_data['leaf_cert']['extensions'].get('subjectAltName', '')
- source = cert_data['source']['url']
- found = False
- if cn and cn.endswith(tuple(suffixes)):
- found = True
- alts = []
- found_alts = []
- for dnsname in alt.split(','):
- dnsname = dnsname.strip().replace('DNS:', '')
- if dnsname.endswith(tuple(suffixes)):
- found = True
- found_alts.append(dnsname)
- else:
- alts.append(dnsname)
- if not found:
- continue
- if slack_webhook:
- alt_msg = ', '.join(found_alts)
- if found_alts and alts:
- alt_msg += ' and '
- if alts:
- alt_msg += '%s others' % len(alts)
- slack_data = {
- 'attachments': [
- {
- 'fallback': '%s (%s): %s' % (cn, source, alt),
- 'color': '#36a64f',
- 'fields': [
- {
- 'title': 'Common Name',
- 'value': cn,
- 'short': False
- },
- {
- 'title': 'Alt Name',
- 'value': alt_msg,
- 'short': False
- }
- ],
- 'footer': source
- }
- ]
- }
- response = requests.post(
- slack_webhook, data=json.dumps(slack_data),
- headers={'Content-Type': 'application/json'}
- )
- if response.status_code != 200:
- logger.exception(ValueError(
- 'Slack returned an error %s, the response is: %s',
- response.status_code, response.text
- ))
- else:
- logger.info('%s: %s, %s', cert_data['source']['url'], cert_data['leaf_cert']['subject']['CN'], cert_data['leaf_cert']['extensions'].get('subjectAltName', ''))
- if __name__ == "__main__":
- loop = asyncio.get_event_loop()
- watcher = TransparencyWatcher(loop)
- asyncio.ensure_future(asyncio.gather(*watcher.get_tasks()))
- asyncio.ensure_future(mux_ctl_stream(watcher))
- loop.run_forever()
|