tasks.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import os
  2. import datetime
  3. import logging
  4. import shutil
  5. from subprocess import CalledProcessError
  6. from flask_app import make_celery, models, db, config
  7. from utils import check_call
  8. from sqlalchemy.sql import func
  9. log = logging.getLogger(__name__)
  10. celery = make_celery()
  11. @celery.task()
  12. def calculate_statistics():
  13. # Load initial values.
  14. stats = {}
  15. for stat_name in [ "created", "downloaded", "active", "data_upload" ]:
  16. stats[stat_name] = models.Stat.query.filter_by(name=stat_name).first()
  17. if not stats[stat_name]:
  18. stats[stat_name] = models.Stat(
  19. name=stat_name,
  20. value="",
  21. )
  22. db.session.add(stats[stat_name])
  23. # Count created videos.
  24. s_created = models.Job.query.filter((models.Job.state == models.JOB_FINISHED) | (models.Job.state == models.JOB_DELETED)).count()
  25. stats['created'].value = "%s" % s_created
  26. # Sum downloads.
  27. s_downloaded = db.session.query(func.sum(models.Job.download_count).label("downloaded")).first()
  28. stats['downloaded'].value = "%s" % s_downloaded
  29. # Count active videos.
  30. s_active = models.Job.query.filter((models.Job.state == models.JOB_PENDING) | (models.Job.state == models.JOB_STARTED) | (models.Job.state == models.JOB_FINISHED)).count()
  31. stats['active'].value = "%s" % s_active
  32. # Sum data uploaded.
  33. stats['data_upload'].value = 0
  34. for job in models.Job.query.filter((models.Job.state == models.JOB_FINISHED) | (models.Job.state == models.JOB_DELETED)).all():
  35. stats['data_upload'].value = stats['data_upload'].value + job.vid_size * job.download_count
  36. db.session.commit()
  37. return True
  38. @celery.task()
  39. def delete_expired():
  40. for job in models.Job.query.filter((models.Job.state == models.JOB_FINISHED) & (models.Job.expires < datetime.datetime.utcnow())).all():
  41. job.state = models.JOB_DELETED
  42. job.deleted = datetime.datetime.utcnow()
  43. db.session.commit()
  44. job_dir = config['UPLOAD_FOLDER'] + "/" + job.uniqid
  45. if os.path.exists(job_dir):
  46. shutil.rmtree(job_dir)
  47. log.info("Deleted %s" % job.uniqid)
  48. return True
  49. @celery.task()
  50. def make_video(job_id, pic, mp3, out):
  51. job = models.Job.query.filter_by(uniqid=job_id).first()
  52. if not job:
  53. return False
  54. try:
  55. job.state = models.JOB_STARTED
  56. db.session.commit()
  57. # Lower the picture resolution.
  58. check_call([ "/usr/bin/convert", pic, "-resize", "640", "%s.jpg" % pic ])
  59. pic = "%s.jpg" % pic
  60. pic_ident = check_call([ "/usr/bin/identify", pic ])
  61. try:
  62. hor_res = pic_ident.split(" ")[2].split("x")[1]
  63. except IndexError:
  64. check_call([ "/usr/bin/convert", pic, "-resize", "640x480!", pic ])
  65. if int(hor_res) % 2 != 0:
  66. new_res = int(hor_res) + 1
  67. check_call([ "/usr/bin/convert", pic, "-resize", "640x%s!" % new_res, pic ])
  68. check_call([ "/usr/bin/ffmpeg", "-loop", "1",
  69. "-i", pic, "-i", mp3,
  70. "-shortest", "-c:v", "libx264", "-c:a", "copy",
  71. "-profile:v", "baseline", "-level:v", "1.0",
  72. "-tune", "stillimage", out ])
  73. job.vid_size = os.path.getsize(out)
  74. job.state = models.JOB_FINISHED
  75. job.finished = datetime.datetime.utcnow()
  76. db.session.commit()
  77. except (CalledProcessError, os.error), e:
  78. # Encoding failed.
  79. job.state = models.JOB_FAILED
  80. db.session.commit()
  81. log.error("Failed processing %s: %s" % (job_id, e))
  82. return False
  83. return True