sync.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. #!/usr/bin/env python3
  2. """
  3. """
  4. __author__ = "Nikola Kotur"
  5. __version__ = "0.9"
  6. __license__ = "Apache License 2.0"
  7. import os
  8. import sys
  9. import argparse
  10. import threading
  11. import tempfile
  12. import subprocess
  13. import time
  14. from multiprocessing import Queue
  15. from pprint import pprint as pp
  16. import pyudev
  17. from logzero import logger
  18. class FakeDev():
  19. device_node = '/dev/fake'
  20. device_type = 'dummy'
  21. class USBDetector():
  22. def __init__(self, handler, config):
  23. self.usb_plugged_in = handler
  24. self.config = config
  25. def run(self):
  26. thread = threading.Thread(target=self._work)
  27. thread.daemon = True
  28. thread.start()
  29. thread.join()
  30. def _work(self):
  31. if self.config.dry_run:
  32. self.usb_plugged_in(FakeDev(), self.config)
  33. return
  34. self.context = pyudev.Context()
  35. self.monitor = pyudev.Monitor.from_netlink(self.context)
  36. self.monitor.filter_by(subsystem='block')
  37. logger.info("Starting to monitor for usb")
  38. self.monitor.start()
  39. for device in iter(self.monitor.poll, None):
  40. logger.debug("Got USB event: %s", device.action)
  41. if device.action == 'add' and device.device_type == 'partition':
  42. self.usb_plugged_in(device, self.config)
  43. def execute(shell_command):
  44. shell_command = shell_command.split(' ')
  45. try:
  46. res = subprocess.check_output(shell_command, stderr=subprocess.PIPE)
  47. return res.decode('utf-8')
  48. except subprocess.CalledProcessError as e:
  49. print('exit code: {}'.format(e.returncode))
  50. print('stdout: {}'.format(e.output.decode(sys.getfilesystemencoding())))
  51. print('stderr: {}'.format(e.stderr.decode(sys.getfilesystemencoding())))
  52. return None
  53. def group_name(full_path):
  54. return os.path.basename(full_path).split('.')[0]
  55. def parse_video_files(usb_dir, base_dir, extension, fake=False):
  56. files = {}
  57. for file in os.listdir(os.path.join(usb_dir, base_dir)):
  58. if file.endswith(extension):
  59. full_path = (os.path.join(usb_dir, base_dir, file))
  60. file_parts = file.split('_')
  61. part_date = file_parts[0]
  62. if part_date not in files:
  63. files[part_date] = []
  64. files[part_date].append(full_path)
  65. return files
  66. def join_video_files(dest, sources):
  67. with tempfile.TemporaryDirectory() as tmpdirname:
  68. list_file = os.path.join(tmpdirname, 'list')
  69. with open(list_file, 'w') as lfh:
  70. for source_line in sources:
  71. lfh.write("file '%s'\n" % source_line)
  72. ffmpeg_line = 'ffmpeg -safe 0 -f concat -i %s -c copy %s' % (list_file, dest)
  73. output = execute(ffmpeg_line)
  74. if output != None:
  75. logger.info(f'{group_name(dest)}: removing source files...')
  76. for s_file in sources:
  77. os.remove(s_file)
  78. logger.info(f'{group_name(dest)}: removed source files')
  79. return True
  80. return False
  81. def encoder(q, config, file_groups):
  82. for file_group, files in file_groups:
  83. logger.info(f"{file_group}: joining...")
  84. outfile_name = '%s.mp4' % file_group
  85. outfile_path = os.path.join(config.destination, outfile_name)
  86. if join_video_files(outfile_path, files):
  87. logger.info(f"{file_group}: joined")
  88. q.put(outfile_path)
  89. q.put(None)
  90. def uploader(q, config):
  91. while True:
  92. file_group = q.get()
  93. if file_group is None:
  94. break
  95. logger.info(f"{group_name(file_group)}: uploading...")
  96. output = execute('ncftpput -u %s -p %s %s %s %s' % (config.ftp_user, config.ftp_password, config.ftp_host, config.ftp_path, file_group))
  97. if output != None:
  98. logger.info(f"{group_name(file_group)}: uploaded")
  99. logger.info(f"{group_name(file_group)}: removing...")
  100. os.remove(file_group)
  101. logger.info(f"{group_name(file_group)}: removed")
  102. def plugged_in(dev, config):
  103. logger.info('Detected %s (%s)', dev.device_node, dev.device_type)
  104. with tempfile.TemporaryDirectory() as tmpdirname:
  105. logger.info('Mounting %s on %s', dev.device_node, tmpdirname)
  106. if not config.dry_run:
  107. output = execute('mount %s %s' % (dev.device_node, tmpdirname))
  108. q = Queue()
  109. t1 = threading.Thread(target=lambda: encoder(q, config, parse_video_files(tmpdirname, config.source, 'MP4', fake=config.dry_run).items()))
  110. t2 = threading.Thread(target=lambda: uploader(q, config))
  111. t1.start()
  112. t2.start()
  113. t1.join()
  114. logger.info('Umounting %s from %s', dev.device_node, tmpdirname)
  115. if not config.dry_run:
  116. output = execute('umount %s' % (dev.device_node))
  117. t2.join()
  118. logger.info('Done')
  119. if __name__ == "__main__":
  120. parser = argparse.ArgumentParser()
  121. parser.add_argument("--dry-run", help="fake everything, really do nothing", action="store_true", default=False)
  122. parser.add_argument("-s", "--source", help="source directory with dashcam videos", action="store", dest="source", default="DCIM/Movie")
  123. parser.add_argument("-d", "--destination", help="destination directory for joined dashcam videos", action="store", dest="destination", default="/var/dashcam")
  124. parser.add_argument("--ftp-host", help="ftp host (server)", action="store", default="192.168.88.242")
  125. parser.add_argument("--ftp-path", help="ftp path", action="store", default="/dashcam")
  126. parser.add_argument("--ftp-user", help="ftp user", action="store", default="dashcam")
  127. parser.add_argument("--ftp-password", help="ftp user", action="store", default="")
  128. parser.add_argument("--version", action="version", version="%(prog)s (version {version})".format(version=__version__))
  129. monitor = USBDetector(plugged_in, parser.parse_args())
  130. monitor.run()