Even though the Flask documentation says Celery extensions are unnecessary now, I found that I still need an extension to properly use Celery in large Flask applications. Specifically I need an init_app() method to initialize Celery after I instantiate it.
This extension also comes with a single_instance
method.
- Python 2.6, 2.7, PyPy, 3.3, and 3.4 supported on Linux and OS X.
- Python 2.7, 3.3, and 3.4 supported on Windows (both 32 and 64 bit versions of Python).
Single instance decorator inspired by Ryan Roemer.
Install:
pip install Flask-Celery-Helper
# example.py
from flask import Flask
from flask_celery import Celery
app = Flask('example')
app.config['CELERY_BROKER_URL'] = 'redis://localhost'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'
celery = Celery(app)
@celery.task()
def add_together(a, b):
return a + b
if __name__ == '__main__':
result = add_together.delay(23, 42)
print(result.get())
Run these two commands in separate terminals:
celery -A example.celery worker
python example.py
# extensions.py
from flask_celery import Celery
celery = Celery()
# application.py
from flask import Flask
from extensions import celery
def create_app():
app = Flask(__name__)
app.config['CELERY_IMPORTS'] = ('tasks.add_together', )
app.config['CELERY_BROKER_URL'] = 'redis://localhost'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'
celery.init_app(app)
return app
# tasks.py
from extensions import celery
@celery.task()
def add_together(a, b):
return a + b
# manage.py
from application import create_app
app = create_app()
app.run()
# example.py
import time
from flask import Flask
from flask_celery import Celery, single_instance
from flask_redis import Redis
app = Flask('example')
app.config['REDIS_URL'] = 'redis://localhost'
app.config['CELERY_BROKER_URL'] = 'redis://localhost'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'
celery = Celery(app)
Redis(app)
@celery.task(bind=True)
@single_instance
def sleep_one_second(a, b):
time.sleep(1)
return a + b
if __name__ == '__main__':
task1 = sleep_one_second.delay(23, 42)
time.sleep(0.1)
task2 = sleep_one_second.delay(20, 40)
results1 = task1.get(propagate=False)
results2 = task2.get(propagate=False)
print(results1) # 65
if isinstance(results2, Exception) and str(results2) == 'Failed to acquire lock.':
print('Another instance is already running.')
else:
print(results2) # Should not happen.
This project adheres to Semantic Versioning.
- Changed
- Supporting Flask 0.12, switching from
flask.ext.celery
toflask_celery
import recommendation.
- Supporting Flask 0.12, switching from
- Added
- Windows support.
single_instance
supported on SQLite/MySQL/PostgreSQL in addition to Redis.
- Changed
CELERY_RESULT_BACKEND
no longer mandatory.- Breaking changes:
flask.ext.celery.CELERY_LOCK
moved toflask.ext.celery._LockManagerRedis.CELERY_LOCK
.
- Added
- Support for non-Redis backends.
- Added
- Python 2.6 and 3.x support.
- Fixed
single_instance
arguments with functools.
- Added
include_args
argument tosingle_instance
.
- Initial release.