Introduction
We offen need something that schedule some tasks and run the some tasks periodically or handling the long tasks asynchronously, these all things can achieved by using Celery in Django Project.
What is Celery?
Celery is a task queue with focus on the real-time processing, which also supports task scheduling. Celety is fast, simple, highly available and flexible.
Celery need a message transport to send and recieve message which can done by Redis or RabbitMQ.
Getting Started
Let’s start installing the Celery package in your virtualenv.
Install Celery
$ pip install celery
Install Redis
We will be using Message broker as Redis, So let’s install
Linux/Mac users
You can download the latest version from here
$ wget http://download.redis.io/releases/redis-4.0.8.tar.gz
$ tar xzf redis-4.0.8.tar.gz
$ cd redis-4.0.8
$ make
Windows users
For windows user, you can get executable file of redis from here
After installing, try if it is correctly installed or not.
$ redis-cli ping
It should respond with
pong
Also install python package of the redis
$ pip install redis
First Step with Django
Now that you have successfully installed the packages, now lets get’s hand on Django Project
settings.py
Add some of the setting configuration in your settings.py
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = "YOUR_TIMEZONE"
Make sure you have changed your timezone from YOUR_TIMEZONE. You can get your timezone from here
Create an celery.py file in your main Django project directory
- src/
- manage.py
- celery_project/
- __init__.py
- settings.py
- urls.py
- celery.py
celery_project/celery.py
Add the following code in the celery.py module. This module is used to define the celery instance.
Make sure you have changed your project name (<your project name>) with your django project name
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<your project name>.settings')
app = Celery('<your project name>')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
celery_project/__init__.py
Then we need to import the app defined the celery.py to __init__.py of your main project directory. By doing this, we can ensure that app is loaded when Django project starts
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ['celery_app']
Creating Tasks
Now let’s create some task
Create a new file in your any app registered in the INSTALLED_APPS
my_app/tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task(name = "print_msg_with_name")
def print_message(name, *args, **kwargs):
print("Celery is working!! {} have implemented it correctly.".format(name))
@shared_task(name = "add_2_numbers")
def add(x, y):
print("Add function has been called!! with params {}, {}".format(x, y))
return x+y
Starting Worker Process
Open a NEW terminal and run the following command to run the worker instance of celery, and also change the directory to where your main project directory is, i,e, the directory where manage.py file is placed, and also make sure you have activated your virtualenv(if created).
Change the project name with your project name
$ celery -A <your project name> worker -l info
You will get this type of output
-------------- celery@root v4.1.0 (latentcall)
---- **** -----
--- * *** * -- Linux-4.13.0-32-generic-x86_64-with-Ubuntu-17.10-artful 2018-02-17 08:09:37
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: celery_project:0x7f9039886400
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost:6379/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. add_2_numbers
. celery_project.celery.debug_task
. print_msg_with_name
[2018-02-17 08:09:37,877: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-02-17 08:09:37,987: INFO/MainProcess] mingle: searching for neighbors
[2018-02-17 08:09:39,084: INFO/MainProcess] mingle: all alone
[2018-02-17 08:09:39,121: WARNING/MainProcess] /home/jai/Desktop/demo/lib/python3.6/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2018-02-17 08:09:39,121: INFO/MainProcess] celery@root ready.
NOTE: Check for [tasks] above, it should contain name of the task which you have created in the module tasks.py.!!
For more info and logs, you also run the worker instance in a DEBUG MODE
celery -A <your project name> worker -l info --loglevel=DEBUG
NOTE: DO NOT CLOSE THIS TERMINAL, IT SHOULD REMAIN OPEN!!
Testing the Task
Now let’s run the tasks from django shell Open up your Django shell
$ python3 manage.py shell
And run the function with delay.
>>> from my_app.tasks import print_message, add
>>> print_message.delay("Jai Singhal")
<AsyncResult: fe4f9787-9ee4-46da-856c-453d36556760>
>>> add.delay(10, 20)
<AsyncResult: ca5d2c50-87bc-4e87-92ad-99d6d9704c30>
When you check your second terminal where your celery worker instance is running, you will get this type of output, showing your tasks have been recieved and also they have successfully completed
[2018-02-17 08:12:14,375: INFO/MainProcess] Received task: my_app.tasks.print_message[fe4f9787-9ee4-46da-856c-453d36556760]
[2018-02-17 08:12:14,377: WARNING/ForkPoolWorker-4] Celery is working!! Jai Singhal have implemented it correctly.
[2018-02-17 08:12:14,382: INFO/ForkPoolWorker-4] Task my_app.tasks.print_message[fe4f9787-9ee4-46da-856c-453d36556760] succeeded in 0.004476275000342866s: None
[2018-02-17 08:12:28,344: INFO/MainProcess] Received task: my_app.tasks.add[ca5d2c50-87bc-4e87-92ad-99d6d9704c30]
[2018-02-17 08:12:28,349: WARNING/ForkPoolWorker-3] Add function has been called!! with params 10, 20
[2018-02-17 08:12:28,358: INFO/ForkPoolWorker-3] Task my_app.tasks.add[ca5d2c50-87bc-4e87-92ad-99d6d9704c30] succeeded in 0.010077004999857309s: 30
Periodic Tasks
We often need to periodically run our tasks in our django project, here celery fulfills our need with celery beat which is nothing but a scheduler, which kicks its target at a regular interval and it can defined both implictly and explictly.
Please do ensure that single scheduler is running for a schedule at a time, otherwise you’d end up with duplicate tasks
Set the timezone in the settings.py according to your time zone, which we have done that earlier in this tutorial.
timezone = 'Europe/London'
Now we can create periodic tasks by two ways, either by manually adding a code of scheduler in celery.py or by installing a package django-celery-beat which can allows us to create schedulers in the Django Admin
1. Writing scheduler manually
Add the following schedule configuration in your celery.py file
celery_project/celery.py
app.conf.beat_schedule = {
#name of the scheduler
'add-every-2-seconds': {
# task name which we have created in tasks.py
'task': 'add_2_numbers',
# set the period of running
'schedule': 2.0,
# set the args
'args': (16, 16)
},
#name of the scheduler
'print-name-every-5-seconds': {
# task name which we have created in tasks.py
'task': 'print_msg_with_name',
# set the period of running
'schedule': 5.0,
# set the args
'args': ("DjangoPY", )
},
}
Open the NEW terminal and run the following command
$ celery -A <project name> beat -l info
Make sure you are running worker process in a seperate terminal
celery -A <your project name> worker -l info
You will get the output in the terminal where you have started celery beat process
celery beat v4.1.0 (latentcall) is starting.
__ - ... __ - _
LocalTime -> 2018-02-17 09:56:30
Configuration ->
. broker -> redis://localhost:6379//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 minutes (300s)
[2018-02-17 09:56:30,268: INFO/MainProcess] beat: Starting...
[2018-02-17 09:56:36,365: INFO/MainProcess] Scheduler: Sending due task add-every-2-seconds (add_2_numbers)
[2018-02-17 09:56:38,365: INFO/MainProcess] Scheduler: Sending due task add-every-2-seconds (add_2_numbers)
[2018-02-17 09:56:39,367: INFO/MainProcess] Scheduler: Sending due task print-name-every-5-seconds (print_msg_with_name)
[2018-02-17 09:56:40,365: INFO/MainProcess] Scheduler: Sending due task add-every-2-seconds (add_2_numbers)
[2018-02-17 09:56:42,365: INFO/MainProcess] Scheduler: Sending due task add-every-2-seconds (add_2_numbers)
...
And now if you look at the worker process terminal, you will find tasks are running periodically!!
[2018-02-17 09:56:36,371: WARNING/ForkPoolWorker-1] Add function has been called!! with params 16, 16
[2018-02-17 09:56:36,464: INFO/ForkPoolWorker-1] Task add_2_numbers[7e9f9ff5-4b01-42d3-b301-99a3b078484b] succeeded in 0.09404212199842732s: 32
[2018-02-17 09:56:38,368: INFO/MainProcess] Received task: add_2_numbers[097e8b56-7090-4561-9686-77d7aae6e2d6]
[2018-02-17 09:56:38,369: WARNING/ForkPoolWorker-2] Add function has been called!! with params 16, 16
[2018-02-17 09:56:38,453: INFO/ForkPoolWorker-2] Task add_2_numbers[097e8b56-7090-4561-9686-77d7aae6e2d6] succeeded in 0.08399435899991659s: 32
[2018-02-17 09:56:39,371: INFO/MainProcess] Received task: print_msg_with_name[2b56d4a2-a358-4186-b849-66342d7635dc]
[2018-02-17 09:56:39,372: WARNING/ForkPoolWorker-1] Celery is working!! DjangoPY have implemented it correctly.
[2018-02-17 09:56:39,456: INFO/ForkPoolWorker-1] Task print_msg_with_name[2b56d4a2-a358-4186-b849-66342d7635dc] succeeded in 0.08378831899972283s: None
2. Using django-celery-beat
Let’s now do the above same thing with django-celery-beat
1. Install django-celery-beat
$ pip install django-celery-beat
2. Add into Installed apps
Add the django_celery_beat module to INSTALLED_APPS in your Django project’ settings.py:
INSTALLED_APPS = [
...
'django_celery_beat',
]
3. Run the django migrations
$ python manage.py migrate
Note: The database scheduler won’t reset when timezone related settings change, so you must do this manually:
$ python manage.py shell
>>> from django_celery_beat.models import PeriodicTask
>>> PeriodicTask.objects.update(last_run_at=None)
Now go to your Django Admin and create a Periodic Task as follows
Choose any name, and select the task which you have created, and also create a Crontab according to your need. Please refer the guide or some of the examples of Crontab from here.
Run the celery beat process in new terminal with –scheduler
$ celery -A <project name> beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
Make sure you have running worker process in a seperate terminal with django server and celery beat process
celery -A <your project name> worker -l info
Check the output logs in both of the terminals and check the logs in the respective terminals.
Running on Production
Now that celery is perfectly running locally, last thing we need to take care of the production. A question arises here that how we can run these process terminals together for all the time in our Production server, becuase we need to run both the process(beat and worker) to keep the celery working.
So here Supervisor comes in handy that helps to run both of the instances seperately.
Supervisor is a client/server system that allows its users to control and keeps it running the process in any unix-like Operating System. So we can use this, for runing celery processes.
Here is the documentation for the Supervisor.
Resources
comments powered by Disqus