123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- #!/usr/bin/env python3
- """
- """
- __author__ = "Nikola Kotur"
- __version__ = "0.9"
- __license__ = "Apache License 2.0"
- import os
- import sys
- import argparse
- import threading
- import tempfile
- import subprocess
- import time
- from multiprocessing import Queue
- from pprint import pprint as pp
- import pyudev
- from logzero import logger
- class FakeDev():
- device_node = '/dev/fake'
- device_type = 'dummy'
- class USBDetector():
- def __init__(self, handler, config):
- self.usb_plugged_in = handler
- self.config = config
- def run(self):
- thread = threading.Thread(target=self._work)
- thread.daemon = True
- thread.start()
- thread.join()
- def _work(self):
- if self.config.dry_run:
- self.usb_plugged_in(FakeDev(), self.config)
- return
- self.context = pyudev.Context()
- self.monitor = pyudev.Monitor.from_netlink(self.context)
- self.monitor.filter_by(subsystem='block')
- logger.info("Starting to monitor for usb")
- self.monitor.start()
- for device in iter(self.monitor.poll, None):
- logger.debug("Got USB event: %s", device.action)
- if device.action == 'add' and device.device_type == 'partition':
- self.usb_plugged_in(device, self.config)
- def execute(shell_command):
- shell_command = shell_command.split(' ')
- try:
- res = subprocess.check_output(shell_command, stderr=subprocess.PIPE)
- return res.decode('utf-8')
- except subprocess.CalledProcessError as e:
- print('exit code: {}'.format(e.returncode))
- print('stdout: {}'.format(e.output.decode(sys.getfilesystemencoding())))
- print('stderr: {}'.format(e.stderr.decode(sys.getfilesystemencoding())))
- return None
- def group_name(full_path):
- return os.path.basename(full_path).split('.')[0]
- def parse_video_files(usb_dir, base_dir, extension, fake=False):
- files = {}
- for file in os.listdir(os.path.join(usb_dir, base_dir)):
- if file.endswith(extension):
- full_path = (os.path.join(usb_dir, base_dir, file))
- file_parts = file.split('_')
- part_date = file_parts[0]
- if part_date not in files:
- files[part_date] = []
- files[part_date].append(full_path)
- return files
- def join_video_files(dest, sources):
- with tempfile.TemporaryDirectory() as tmpdirname:
- list_file = os.path.join(tmpdirname, 'list')
- with open(list_file, 'w') as lfh:
- for source_line in sources:
- lfh.write("file '%s'\n" % source_line)
- ffmpeg_line = 'ffmpeg -safe 0 -f concat -i %s -c copy %s' % (list_file, dest)
- output = execute(ffmpeg_line)
- if output != None:
- logger.info(f'{group_name(dest)}: removing source files...')
- for s_file in sources:
- os.remove(s_file)
- logger.info(f'{group_name(dest)}: removed source files')
- return True
- return False
- def encoder(q, config, file_groups):
- for file_group, files in file_groups:
- logger.info(f"{file_group}: joining...")
- outfile_name = '%s.mp4' % file_group
- outfile_path = os.path.join(config.destination, outfile_name)
- if join_video_files(outfile_path, files):
- logger.info(f"{file_group}: joined")
- q.put(outfile_path)
- q.put(None)
- def uploader(q, config):
- while True:
- file_group = q.get()
- if file_group is None:
- break
- logger.info(f"{group_name(file_group)}: uploading...")
- output = execute('ncftpput -u %s -p %s %s %s %s' % (config.ftp_user, config.ftp_password, config.ftp_host, config.ftp_path, file_group))
- if output != None:
- logger.info(f"{group_name(file_group)}: uploaded")
- logger.info(f"{group_name(file_group)}: removing...")
- os.remove(file_group)
- logger.info(f"{group_name(file_group)}: removed")
- def plugged_in(dev, config):
- logger.info('Detected %s (%s)', dev.device_node, dev.device_type)
- with tempfile.TemporaryDirectory() as tmpdirname:
- logger.info('Mounting %s on %s', dev.device_node, tmpdirname)
- if not config.dry_run:
- output = execute('mount %s %s' % (dev.device_node, tmpdirname))
- q = Queue()
- t1 = threading.Thread(target=lambda: encoder(q, config, parse_video_files(tmpdirname, config.source, 'MP4', fake=config.dry_run).items()))
- t2 = threading.Thread(target=lambda: uploader(q, config))
- t1.start()
- t2.start()
- t1.join()
- logger.info('Umounting %s from %s', dev.device_node, tmpdirname)
- if not config.dry_run:
- output = execute('umount %s' % (dev.device_node))
- t2.join()
- logger.info('Done')
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("--dry-run", help="fake everything, really do nothing", action="store_true", default=False)
- parser.add_argument("-s", "--source", help="source directory with dashcam videos", action="store", dest="source", default="DCIM/Movie")
- parser.add_argument("-d", "--destination", help="destination directory for joined dashcam videos", action="store", dest="destination", default="/var/dashcam")
- parser.add_argument("--ftp-host", help="ftp host (server)", action="store", default="192.168.88.242")
- parser.add_argument("--ftp-path", help="ftp path", action="store", default="/dashcam")
- parser.add_argument("--ftp-user", help="ftp user", action="store", default="dashcam")
- parser.add_argument("--ftp-password", help="ftp user", action="store", default="")
- parser.add_argument("--version", action="version", version="%(prog)s (version {version})".format(version=__version__))
- monitor = USBDetector(plugged_in, parser.parse_args())
- monitor.run()
|