Browse Source

app: added celery worker

Nikola Kotur 10 years ago
parent
commit
57c7338bfe
17 changed files with 106 additions and 35 deletions
  1. 2 0
      .gitignore
  2. 0 25
      app.py
  3. 5 0
      bin/celery.sh
  4. 1 0
      bin/run.sh
  5. 10 0
      celery_config.py
  6. 5 0
      conf/development.cfg
  7. 2 1
      database/models.py
  8. 1 1
      db_create.py
  9. 1 1
      db_downgrade.py
  10. 1 1
      db_migrate.py
  11. 1 1
      db_upgrade.py
  12. 48 0
      flask_app.py
  13. 1 1
      main.py
  14. 7 3
      phosic/routes.py
  15. 15 0
      phosic/tasks.py
  16. 5 0
      requirements.txt
  17. 1 1
      templates/job.html

+ 2 - 0
.gitignore

@@ -5,3 +5,5 @@
 
 app.db
 uploads/*
+
+celerybeat-schedule

+ 0 - 25
app.py

@@ -1,25 +0,0 @@
-"""
-"""
-
-import os
-
-from flask import Flask
-from flask.ext.sqlalchemy import SQLAlchemy
-
-here = os.path.abspath(os.path.dirname(__file__))
-
-app = Flask(__name__)
-
-# Load configuration.
-conf_file = os.environ.get("CONFIG", None)
-if not conf_file:
-    raise Exception("Missing CONFIG environment variable with the configuration")
-app.config.from_pyfile(conf_file)
-if app.config['DEBUG']:
-    app.testing = True
-
-# Database initialization.
-db = SQLAlchemy(app)
-from database import models
-
-config = app.config

+ 5 - 0
bin/celery.sh

@@ -0,0 +1,5 @@
+#!/bin/bash
+
+export CONFIG=conf/development.cfg
+
+celery -A phosic.tasks -b "amqp://phosic:phosic@lab//" worker --loglevel=info --beat

+ 1 - 0
bin/run.sh

@@ -1,4 +1,5 @@
 #!/bin/bash
 
 export CONFIG=conf/development.cfg
+
 python main.py

+ 10 - 0
celery_config.py

@@ -0,0 +1,10 @@
+
+from celery.schedules import crontab
+
+CELERYBEAT_SCHEDULE = {
+    'every-minute': {
+        'task': 'phosic.tasks.add',
+        'schedule': crontab(minute='*/30'),
+        'args': (1,2),
+    },
+}

+ 5 - 0
conf/development.cfg

@@ -12,3 +12,8 @@ RECAPTCHA_PRIVATE_KEY = "6LcrufQSAAAAAEfnYns8o-LPGjlD0s6u6veYWEc0"
 
 SQLALCHEMY_DATABASE_URI = "sqlite:////home/kotnik/code/snakepit/phosic/phosic/app.db"
 SQLALCHEMY_MIGRATE_REPO = "/home/kotnik/code/snakepit/phosic/phosic/db_repository"
+
+CELERY_BROKER_URL = "amqp://phosic:phosic@lab//"
+
+PHOSIC_TASK_DELAY = 30
+PHOSIC_TASK_MAX_EXECUTION_TIME = 150

+ 2 - 1
database/models.py

@@ -1,4 +1,4 @@
-from app import db
+from flask_app import db
 
 JOB_PENDING = 0
 JOB_STARTED = 1
@@ -8,6 +8,7 @@ JOB_DELETED = 3
 
 class Job(db.Model):
     id = db.Column(db.Integer, primary_key=True)
+    task_uuid = db.Column(db.String(36), index=True, unique=True)
     created = db.Column(db.DateTime)
     finished = db.Column(db.DateTime)
     expires = db.Column(db.DateTime)

+ 1 - 1
db_create.py

@@ -4,7 +4,7 @@ import os.path
 
 from migrate.versioning import api
 
-from app import db, config
+from flask_app import db, config
 
 db.create_all()
 

+ 1 - 1
db_downgrade.py

@@ -1,7 +1,7 @@
 #!/usr/bin/env python2
 
 from migrate.versioning import api
-from app import config
+from flask_app import config
 
 v = api.db_version(
     config['SQLALCHEMY_DATABASE_URI'], config['SQLALCHEMY_MIGRATE_REPO']

+ 1 - 1
db_migrate.py

@@ -2,7 +2,7 @@
 
 import imp
 from migrate.versioning import api
-from app import db, config
+from flask_app import db, config
 
 migration = config['SQLALCHEMY_MIGRATE_REPO'] + '/versions/%03d_migration.py' % (api.db_version(config['SQLALCHEMY_DATABASE_URI'], config['SQLALCHEMY_MIGRATE_REPO']) + 1)
 tmp_module = imp.new_module('old_model')

+ 1 - 1
db_upgrade.py

@@ -1,7 +1,7 @@
 #!/usr/bin/env python2
 
 from migrate.versioning import api
-from app import config
+from flask_app import config
 
 api.upgrade(config['SQLALCHEMY_DATABASE_URI'], config['SQLALCHEMY_MIGRATE_REPO'])
 print 'Current database version: ' + str(api.db_version(

+ 48 - 0
flask_app.py

@@ -0,0 +1,48 @@
+"""
+"""
+
+import os
+
+from flask import Flask
+from flask.ext.sqlalchemy import SQLAlchemy
+from celery import Celery
+
+def make_app():
+    app = Flask(__name__)
+
+    # Load configuration.
+    conf_file = os.environ.get("CONFIG", None)
+    if not conf_file:
+        raise Exception("Missing CONFIG environment variable with the configuration")
+    app.config.from_pyfile(conf_file)
+    if app.config['DEBUG']:
+        app.testing = True
+
+    return app
+
+# Celery.
+def make_celery(app=None):
+    if not app:
+        app = make_app()
+    celery = Celery('tasks', broker=app.config['CELERY_BROKER_URL'])
+    celery.conf.update(app.config)
+    TaskBase = celery.Task
+    class ContextTask(TaskBase):
+        abstract = True
+        def __call__(self, *args, **kwargs):
+            with app.app_context():
+                return TaskBase.__call__(self, *args, **kwargs)
+    celery.Task = ContextTask
+    return celery
+
+# Flask application.
+app = make_app()
+
+# Celery.
+celery = make_celery(app)
+
+# Database initialization.
+db = SQLAlchemy(app)
+from database import models
+
+config = app.config

+ 1 - 1
main.py

@@ -1,4 +1,4 @@
-from app import app, config
+from flask_app import app, config
 from phosic import __all__
 
 if __name__ == '__main__':

+ 7 - 3
phosic/routes.py

@@ -4,10 +4,10 @@ import datetime
 from flask import render_template, redirect, url_for
 from werkzeug import secure_filename
 
-from app import app, db, models
+from flask_app import app, db, models
 from forms import JobForm
 from utils import generate_uniqid
-
+import tasks
 
 @app.route('/',  methods=['GET', 'POST'])
 def home():
@@ -25,11 +25,15 @@ def home():
         _, pic_extension = os.path.splitext(pic_filename)
         form.pic.data.save(jobdir + uniqid + pic_extension)
 
-        # TODO: Send job to processing
+        task = tasks.make_video.apply_async(
+            countdown=app.config['PHOSIC_TASK_DELAY'],
+            expires=app.config['PHOSIC_TASK_MAX_EXECUTION_TIME']
+        )
 
         # Create database item
         job = models.Job(
             uniqid=uniqid,
+            task_uuid=task.id,
             email=form.email.data,
             created=datetime.datetime.utcnow(),
             mp3_name=mp3_filename[:255],

+ 15 - 0
phosic/tasks.py

@@ -0,0 +1,15 @@
+from celery import Celery
+
+from flask_app import make_celery, db
+
+# celery = Celery('tasks')
+celery = make_celery()
+celery.config_from_object('celery_config')
+
+@celery.task()
+def calculate_statistics(x, y):
+    return x + y
+
+@celery.task()
+def make_video():
+    return True

+ 5 - 0
requirements.txt

@@ -10,6 +10,11 @@ Flask-WTF
 flask-sqlalchemy
 sqlalchemy-migrate
 
+# ----------------------
+# Celery
+# ----------------------
+celery
+
 # ----------------------
 # Production Server
 # ----------------------

+ 1 - 1
templates/job.html

@@ -5,7 +5,7 @@
 
 {% block main %}
 <p>I am a job {{ job.id }}</p>
-<p>My id is {{ job.uniqid }}</p>
+<p>My uuid is {{ job.task_uuid }}</p>
 <p>My pic is: {{ job.pic_name }}</p>
 <p>My mp3 is: {{ job.mp3_name }}</p>