Brokers

rabbitmq

1
broker_url = 'amqp://guest:guest@localhost:5672//'

redis

1
app.conf.broker_url = 'redis://localhost:6379/0'

redis://:password@hostname:port/db_number

Quick start

Simple

1
2
3
4
5
6
7
from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
return x + y
1
$ celery -A tasks worker --loglevel=info

Keep result

1
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

Now with the result backend configured, let’s call the task again. This time you’ll hold on to the AsyncResult instance returned when you call a task:

1
>>> result = add.delay(4, 4)

The ready() method returns whether the task has finished processing or not:

1
2
>>> result.ready()
False

You can wait for the result to complete, but this is rarely used since it turns the asynchronous call into a synchronous one:

1
2
>>> result.get(timeout=1)
8

In case the task raised an exception, get() will re-raise the exception, but you can override this by specifying the propagate argument:

1
>>> result.get(propagate=False)

If the task raised an exception you can also gain access to the original traceback:

1
2
>>> result.traceback

See celery.result for the complete result object reference.

Configuration

直接配置

1
2
3
4
5
6
7
8
app.conf.task_serializer = 'json'
app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)

From obj

1
app.config_from_object('celeryconfig')

celeryconfig.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

task_routes = {
'tasks.add': 'low-priority',
}

task_annotations = {
'tasks.add': {'rate_limit': '10/m'}
}

Using Celery in your Application

1
2
3
proj/__init__.py
/celery.py
/tasks.py

proj/celery.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
broker='amqp://',
backend='amqp://',
include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)

if __name__ == '__main__':
app.start()

proj/tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
return x + y


@app.task
def mul(x, y):
return x * y


@app.task
def xsum(numbers):
return sum(numbers)

start

1
$ celery -A proj worker -l info

In the background

The daemonization scripts uses the celery multi command to start one or more workers in the background:

1
2
3
4
5
$ celery multi start w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Starting nodes...
> w1.halcyon.local: OK

You can restart it too:

1
2
3
4
5
6
7
8
9
10
$ celery  multi restart w1 -A proj -l info
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
> w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
> Stopping nodes...
> w1.halcyon.local: TERM -> 64052

or stop it:

1
$ celery multi stop w1 -A proj -l info

The stop command is asynchronous so it won’t wait for the worker to shutdown. You’ll probably want to use the stopwait command instead, this ensures all currently executing tasks is completed before exiting:

1
$ celery multi stopwait w1 -A proj -l info

About the –app argument

The –app argument specifies the Celery app instance to use, it must be in the form of module.path:attribute

But it also supports a shortcut form If only a package name is specified, where it’ll try to search for the app instance, in the following order:

With –app=proj:

an attribute named proj.app, or
an attribute named proj.celery, or
any attribute in the module proj where the value is a Celery application, or
If none of these are found it’ll try a submodule named proj.celery:

an attribute named proj.celery.app, or
an attribute named proj.celery.celery, or
Any attribute in the module proj.celery where the value is a Celery application.
This scheme mimics the practices used in the documentation – that is, proj:app for a single contained module, and proj.celery:app for larger projects.

Routing

Celery supports all of the routing facilities provided by AMQP, but it also supports simple routing where messages are sent to named queues.

The task_routes setting enables you to route tasks by name and keep everything centralized in one location:

1
2
3
4
5
app.conf.update(
task_routes = {
'proj.tasks.add': {'queue': 'hipri'},
},
)

You can also specify the queue at runtime with the queue argument to apply_async:

1
2
>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')

You can then make a worker consume from this queue by specifying the celery worker -Q option:

1
$ celery -A proj worker -Q hipri

You may specify multiple queues by using a comma separated list, for example you can make the worker consume from both the default queue, and the hipri queue, where the default queue is named celery for historical reasons:

1
$ celery -A proj worker -Q hipri,celery

The order of the queues doesn’t matter as the worker will give equal weight to the queues.

To learn more about routing, including taking use of the full power of AMQP routing, see the Routing Guide.

Celery Best Practice

Deni Bertović’s article about Celery best practices

No.1: Don’t use the database as your AMQP Broker

A database is not built for doing the things a proper AMQP broker like RabbitMQ is designed for.

AMQP - Advanced Message Queue Protocol

Heaven IO disk problem

Something like RabbitMQ which implement amqp use memory to reside in.

No.2: Use more Queues (ie. not just the default one)

Don’t put all tasks in same queue, because

No.3: Use priority workers

The way to solve the issue above is to have taskA in one queue, and taskB in another and then assign x workers to process Q1 and all the other workers to process the more intensive Q2 as it has more tasks coming

So, define your queues manually:

1
2
3
4
5
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'),
Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'),
)

And your routes that will decide which task goes where:

1
2
3
4
CELERY_ROUTES = {
'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_A'},
'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'},
}

Which will allow you to run workers for each task:

1
2
celery worker -E -l INFO -n workerA -Q for_task_A
celery worker -E -l INFO -n workerB -Q for_task_B

No.4: Use Celery’s error handling mechanisms

Default retry number and retry delay

1
2
3
4
5
6
7
@app.task(bind=True, default_retry_delay=300, max_retries=5)
def my_task_A():
try:
print("doing stuff here...")
except SomeNetworkException as e:
print("maybe do some clenup here....")
self.retry(e)

No.5: Use Flower

No.6: Keep track of results only if you really need them

Tutor

Concurrency with Eventlet

Eventlet : A concurrent networking library for Python that allows you to change how you run your code, not how you write it.

You can enable the Eventlet pool by using the celery worker -P worker option.

1
$ celery -A proj worker -P eventlet -c 1000