Sfoglia il codice sorgente

Implemented parallel joining and uploading

Nikola Kotur 3 anni fa
parent
commit
7dd723eff5
1 ha cambiato i file con 59 aggiunte e 61 eliminazioni
  1. 59 61
      sync.py

+ 59 - 61
sync.py

@@ -1,10 +1,9 @@
 #!/usr/bin/env python3
 """
-Module Docstring
 """
 
 __author__ = "Nikola Kotur"
-__version__ = "0.1"
+__version__ = "0.9"
 __license__ = "Apache License 2.0"
 
 import os
@@ -13,26 +12,34 @@ import argparse
 import threading
 import tempfile
 import subprocess
+import time
+from multiprocessing import Queue
 from pprint import pprint as pp
 
 import pyudev
 from logzero import logger
 
 
+class FakeDev():
+    device_node = '/dev/fake'
+    device_type = 'dummy'
+
+
 class USBDetector():
-    ''' Monitor udev for detection of usb '''
- 
-    def __init__(self, handler, source_dir, dest_dir, ftp_args):
+    def __init__(self, handler, config):
         self.usb_plugged_in = handler
-        self.source_dir = source_dir
-        self.dest_dir = dest_dir
-        self.ftp_args = ftp_args
+        self.config = config
+
+    def run(self):
         thread = threading.Thread(target=self._work)
         thread.daemon = True
         thread.start()
         thread.join()
 
     def _work(self):
+        if self.config.dry_run:
+            self.usb_plugged_in(FakeDev(), self.config)
+            return
         self.context = pyudev.Context()
         self.monitor = pyudev.Monitor.from_netlink(self.context)
         self.monitor.filter_by(subsystem='block')
@@ -41,7 +48,7 @@ class USBDetector():
         for device in iter(self.monitor.poll, None):
             logger.debug("Got USB event: %s", device.action)
             if device.action == 'add' and device.device_type == 'partition':
-                self.usb_plugged_in(device, self.source_dir, self.dest_dir, self.ftp_args)
+                self.usb_plugged_in(device, self.config)
 
 
 def execute(shell_command):
@@ -56,7 +63,7 @@ def execute(shell_command):
         return None
 
 
-def parse_video_files(usb_dir, base_dir, extension):
+def parse_video_files(usb_dir, base_dir, extension, fake=False):
     files = {}
     for file in os.listdir(os.path.join(usb_dir, base_dir)):
         if file.endswith(extension):
@@ -80,76 +87,67 @@ def join_video_files(dest, sources):
         output = execute(ffmpeg_line)
     if output != None:
         logger.info('Removing source files')
-        for s_file in sources:
-            os.remove(s_file)
+        # for s_file in sources:
+        #     os.remove(s_file)
         return True
     return False
 
 
-def plugged_in(dev, source_dir, dest_dir, ftp):
+def encoder(q, config, file_groups):
+    for file_group, files in file_groups:
+        logger.info(f"Joining {file_group}")
+        outfile_name = '%s.mp4' % file_group
+        outfile_path = os.path.join(config.destination, outfile_name)
+        if join_video_files(outfile_path, files):
+            logger.info(f"Joined {file_group}")
+            q.put(outfile_path)
+    q.put(None)
+
+
+def uploader(q, config):
+    while True:
+        file_group = q.get()
+        if file_group is None:
+            break
+        logger.info(f"Uploading {file_group}")
+        output = execute('ncftpput -u %s -p %s %s %s %s' % (config.ftp_user, config.ftp_password, config.ftp_host, config.ftp_path, file_group))
+        if output != None:
+            logger.info('Removing %s', file_group)
+            os.remove(up_file)
+        logger.info(f"Uploaded {file_group}")
+
+
+def plugged_in(dev, config):
     logger.info('Detected %s (%s)', dev.device_node, dev.device_type)
     with tempfile.TemporaryDirectory() as tmpdirname:
         logger.info('Mounting %s on %s', dev.device_node, tmpdirname)
-        output = execute('mount %s %s' % (dev.device_node, tmpdirname))
+        if not config.dry_run:
+            output = execute('mount %s %s' % (dev.device_node, tmpdirname))
 
-        # Join files
-        joined_files = []
-        for file_group, files in parse_video_files(tmpdirname, source_dir, 'MP4').items():
-            logger.info('Joining video group %s', file_group)
-            outfile_name = '%s.mp4' % file_group
-            outfile_path = os.path.join(dest_dir, outfile_name)
-            if join_video_files(outfile_path, files):
-                joined_files.append(outfile_path)
+        q = Queue()
+        t1 = threading.Thread(target=lambda: encoder(q, config, parse_video_files(tmpdirname, config.source, 'MP4', fake=config.dry_run).items()))
+        t2 = threading.Thread(target=lambda: uploader(q, config))
+        t1.start()
+        t2.start()
+        t1.join()
 
         logger.info('Umounting %s from %s', dev.device_node, tmpdirname)
-        output = execute('umount %s' % (dev.device_node))
-
-        # Beam files to FTP
-        for up_file in joined_files:
-            logger.info('Uploading %s to %s', up_file, ftp['host'])
-            output = execute('ncftpput -u %s -p %s %s %s %s' % (ftp['user'], ftp['password'], ftp['host'], ftp['path'], up_file))
-            if output != None:
-                logger.info('Removing %s', up_file)
-                os.remove(up_file)
+        if not config.dry_run:
+            output = execute('umount %s' % (dev.device_node))
+        t2.join()
 
     logger.info('Done')
 
 
-def main(args):
-    """ Main entry point of the app """
-    ftp_args = {
-        'host': args.ftp_host,
-        'path': args.ftp_path,
-        'user': args.ftp_user,
-        'password': args.ftp_password,
-    }
-    usb_detect = USBDetector(plugged_in, args.source, args.destination, ftp_args)
-
-
 if __name__ == "__main__":
     parser = argparse.ArgumentParser()
-    # Directory options
+    parser.add_argument("--dry-run", help="fake everything, really do nothing", action="store_true", default=False)
     parser.add_argument("-s", "--source", help="source directory with dashcam videos", action="store", dest="source", default="DCIM/Movie")
     parser.add_argument("-d", "--destination", help="destination directory for joined dashcam videos", action="store", dest="destination", default="/var/dashcam")
-    # FTP options
     parser.add_argument("--ftp-host", help="ftp host (server)", action="store", default="192.168.88.242")
     parser.add_argument("--ftp-path", help="ftp path", action="store", default="/dashcam")
     parser.add_argument("--ftp-user", help="ftp user", action="store", default="dashcam")
     parser.add_argument("--ftp-password", help="ftp user", action="store", default="")
-
-    # Optional verbosity counter (eg. -v, -vv, -vvv, etc.)
-    parser.add_argument(
-        "-v",
-        "--verbose",
-        action="count",
-        default=0,
-        help="Verbosity (-v, -vv, etc)")
-
-    # Specify output of "--version"
-    parser.add_argument(
-        "--version",
-        action="version",
-        version="%(prog)s (version {version})".format(version=__version__))
-
-    args = parser.parse_args()
-    main(args)
+    parser.add_argument("--version", action="version", version="%(prog)s (version {version})".format(version=__version__))
+    monitor = USBDetector(plugged_in, parser.parse_args())
+    monitor.run()