So, while developing a web application, there comes a time when we need to process some of the tasks in the background, perhaps asynchronously. For example, your user would upload photos and the app would post them to multiple social networks. We would definitely want to offload the uploading task to some background workers.
Django and Celery makes background task processing a breeze. In this article, we shall see how we can setup Django and Celery to start processing our background tasks. We would use Redis to maintain our task queue.
How does it work?
- We define some tasks in our application. These tasks are expected to run for a pretty long time.
- We run the celery workers. Celery knows how to find and load these tasks. The workers keep waiting on us.
- We add some jobs to the workers queue from our web app. The workers now have something to work on. So they start taking the jobs from the queue and start processing them.
- We can query the status of the jobs from our web app to know whats happening.
- The easy to use Python API makes it really simple to use. You don’t need any specialisation or anything in Redis.
Setting Up
Let’s first install the Redis server:
|
sudo apt-get install redis-server |
The version that comes from Ubuntu official repo is quite old. You can install the latest version from 3rd party PPAs.
Install Celery with Redis support:
|
pip install celery-with-redis |
And then install django-celery package:
|
pip install django-celery |
Configuration
Add “djcelery” to your installed apps list:
|
INSTALLED_APPS = ( 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.sites', 'django.contrib.messages', 'django.contrib.staticfiles', 'app', 'djcelery', # Must be added to the INSTALLED_APPS 'south', ) |
Modify your main app’s settings.py file to add the celery specific settings:
|
import djcelery djcelery.setup_loader() BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' |
Now, inside your main application directory (the directory in which settings.py is located), create a file named “celery.py” with these contents:
|
from __future__ import absolute_import import os from celery import Celery from django.conf import settings # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings') app = Celery('project') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) |
The above codes do a few things:
- It creates our own Celery instance.
- We ask the celery instance to load necessary configs from our project’s settings file.
- We make the instance auto discover tasks from our INSTALLED_APPS.
Also let’s modify the “__init__.py” file in the same directory to make the celery app available more easily:
|
from __future__ import absolute_import from .celery import app as celery_app |
This would allow us to use the same app instance for shared tasks across reusable django apps.
Defining Tasks
Now let’s create a tasks.py file in one of our INSTALLED_APPS and add these contents:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
from project import celery_app from time import sleep @celery_app.task() def UploadTask(message): # Update the state. The meta data is available in task.info dicttionary # The meta data is useful to store relevant information to the task # Here we are storing the upload progress in the meta. UploadTask.update_state(state='PROGRESS', meta={'progress': 0}) sleep(30) UploadTask.update_state(state='PROGRESS', meta={'progress': 30}) sleep(30) return message def get_task_status(task_id): # If you have a task_id, this is how you query that task task = UploadTask.AsyncResult(task_id) status = task.status progress = 0 if status == u'SUCCESS': progress = 100 elif status == u'FAILURE': progress = 0 elif status == 'PROGRESS': progress = task.info['progress'] return {'status': status, 'progress': progress} |
Now we have defined our own celery app, we have our tasks. It’s now time to launch the workers and start adding tasks.
Processing Tasks
Before we can start processing tasks, we have to launch the celery daemon first. This is how we do it:
|
celery worker --app=project.celery:app --loglevel=INFO |
Here, we tell celery to use the celery instance we defined and configured earlier. Here “project” is the main app, the package that contains our settings.py along with celery.py. The “app” the variable name which holds the celery instance.
Now let’s use the Django shell to add and query jobs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
$ python manage.py shell [snipped] >>> from app.tasks import * # Please notice the "delay" method, which is a handy shortcut to apply_async. # It allows us to call the task with exactly the same parameters # as the original function. If you need more custom options, use apply_async. >>> t = UploadTask.delay("hello world!") # t is now a AsyncResult object. t.id is the task id for the task # you can directly use t to query the task. say - t.status >>> get_task_status(t.id) {'status': u'PROGRESS', 'progress': 0} #(After 35 secs delay) >>> get_task_status(t.id) {'status': u'PROGRESS', 'progress': 30} #(After waiting for another 35 secs or so) >>> get_task_status(t.id) {'status': u'SUCCESS', 'progress': 100} |
So as we can see, out task was processed by celery. And we could easily query the status. We would generally use the meta data to store any task related information.