#!/usr/bin/env python3 """ """ __author__ = "Nikola Kotur" __version__ = "1.0" __license__ = "Apache License 2.0" import os import sys import argparse import threading import tempfile import subprocess import time from multiprocessing import Queue from pprint import pprint as pp import pyudev from logzero import logger class FakeDev(): device_node = '/dev/fake' device_type = 'dummy' class USBDetector(): def __init__(self, handler, config): self.usb_plugged_in = handler self.config = config def run(self): thread = threading.Thread(target=self._work) thread.daemon = True thread.start() thread.join() def _work(self): if self.config.dry_run: self.usb_plugged_in(FakeDev(), self.config) return self.context = pyudev.Context() self.monitor = pyudev.Monitor.from_netlink(self.context) self.monitor.filter_by(subsystem='block') logger.info("Starting to monitor for usb") self.monitor.start() for device in iter(self.monitor.poll, None): logger.debug("Got USB event: %s", device.action) if device.action == 'add' and device.device_type == 'partition': self.usb_plugged_in(device, self.config) def execute(shell_command): shell_command = shell_command.split(' ') try: res = subprocess.check_output(shell_command, stderr=subprocess.PIPE) return res.decode('utf-8') except subprocess.CalledProcessError as e: print('exit code: {}'.format(e.returncode)) print('stdout: {}'.format(e.output.decode(sys.getfilesystemencoding()))) print('stderr: {}'.format(e.stderr.decode(sys.getfilesystemencoding()))) return None def group_name(full_path): return os.path.basename(full_path).split('.')[0] def parse_video_files(usb_dir, base_dir, extension): files = {} for file in os.listdir(os.path.join(usb_dir, base_dir)): if file.endswith(extension): full_path = (os.path.join(usb_dir, base_dir, file)) file_parts = file.split('_') part_date = file_parts[0] # Detect newer VIOFO if len(part_date) == 14: # Newer VIOFO, just take the date part part_date = part_date[0:8] if part_date not in files: files[part_date] = [] files[part_date].append(full_path) return files def join_video_files(dest, sources): with tempfile.TemporaryDirectory() as tmpdirname: list_file = os.path.join(tmpdirname, 'list') with open(list_file, 'w') as lfh: for source_line in sources: lfh.write("file '%s'\n" % source_line) ffmpeg_line = 'ffmpeg -safe 0 -f concat -i %s -c copy %s' % (list_file, dest) output = execute(ffmpeg_line) if output != None: logger.info(f'{group_name(dest)}: removing source files...') for s_file in sources: os.remove(s_file) logger.info(f'{group_name(dest)}: removed source files') return True return False def encoder(q, config, file_groups): for file_group, files in file_groups: logger.info(f"{file_group}: joining...") outfile_name = '%s.mp4' % file_group outfile_path = os.path.join(config.destination, outfile_name) if join_video_files(outfile_path, files): logger.info(f"{file_group}: joined") q.put(outfile_path) q.put(None) def uploader(q, config): while True: file_group = q.get() if file_group is None: break logger.info(f"{group_name(file_group)}: uploading...") output = execute('ncftpput -u %s -p %s %s %s %s' % (config.ftp_user, config.ftp_password, config.ftp_host, config.ftp_path, file_group)) if output != None: logger.info(f"{group_name(file_group)}: uploaded") logger.info(f"{group_name(file_group)}: removing...") os.remove(file_group) logger.info(f"{group_name(file_group)}: removed") def plugged_in(dev, config): logger.info('Detected %s (%s)', dev.device_node, dev.device_type) with tempfile.TemporaryDirectory() as tmpdirname: logger.info('Mounting %s on %s', dev.device_node, tmpdirname) if not config.dry_run: output = execute('mount %s %s' % (dev.device_node, tmpdirname)) q = Queue() t1 = threading.Thread(target=lambda: encoder(q, config, parse_video_files(tmpdirname, config.source, 'MP4').items())) t2 = threading.Thread(target=lambda: uploader(q, config)) t1.start() t2.start() t1.join() logger.info('Umounting %s from %s', dev.device_node, tmpdirname) if not config.dry_run: output = execute('umount %s' % (dev.device_node)) t2.join() logger.info('Done') if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--dry-run", help="fake everything, really do nothing", action="store_true", default=False) parser.add_argument("-s", "--source", help="source directory with dashcam videos", action="store", dest="source", default="DCIM/Movie") parser.add_argument("-d", "--destination", help="destination directory for joined dashcam videos", action="store", dest="destination", default="/var/dashcam") parser.add_argument("--ftp-host", help="ftp host (server)", action="store", default="192.168.88.242") parser.add_argument("--ftp-path", help="ftp path", action="store", default="/dashcam") parser.add_argument("--ftp-user", help="ftp user", action="store", default="dashcam") parser.add_argument("--ftp-password", help="ftp user", action="store", default="") parser.add_argument("--version", action="version", version="%(prog)s (version {version})".format(version=__version__)) parser.add_argument("--test-grouping", help="just test grouping of videos", action="store_true", default=False) options = parser.parse_args() if options.test_grouping: pp(parse_video_files(options.source, '', 'MP4')) sys.exit(0) monitor = USBDetector(plugged_in, options) monitor.run()