Celery In Python
Brokers
rabbitmq
1 | broker_url = 'amqp://guest:guest@localhost:5672//' |
redis
1 | app.conf.broker_url = 'redis://localhost:6379/0' |
redis://:password@hostname:port/db_number
Quick start
Simple
1 | from celery import Celery |
1 | $ celery -A tasks worker --loglevel=info |
Keep result
1 | app = Celery('tasks', backend='redis://localhost', broker='pyamqp://') |
Now with the result backend configured, let’s call the task again. This time you’ll hold on to the AsyncResult instance returned when you call a task:
1 | 4, 4) result = add.delay( |
The ready() method returns whether the task has finished processing or not:
1 | result.ready() |
You can wait for the result to complete, but this is rarely used since it turns the asynchronous call into a synchronous one:
1 | 1) result.get(timeout= |
In case the task raised an exception, get() will re-raise the exception, but you can override this by specifying the propagate argument:
1 | False) result.get(propagate= |
If the task raised an exception you can also gain access to the original traceback:
1 | result.traceback |
See celery.result for the complete result object reference.
Configuration
直接配置
1 | app.conf.task_serializer = 'json' |
From obj
1 | app.config_from_object('celeryconfig') |
celeryconfig.py:
1 | broker_url = 'pyamqp://' |
Using Celery in your Application
1 | proj/__init__.py |
proj/celery.py
1 | from __future__ import absolute_import, unicode_literals |
proj/tasks.py
1 | from __future__ import absolute_import, unicode_literals |
start
1 | $ celery -A proj worker -l info |
In the background
The daemonization scripts uses the celery multi command to start one or more workers in the background:
1 | $ celery multi start w1 -A proj -l info |
You can restart it too:
1 | $ celery multi restart w1 -A proj -l info |
or stop it:
1 | $ celery multi stop w1 -A proj -l info |
The stop command is asynchronous so it won’t wait for the worker to shutdown. You’ll probably want to use the stopwait command instead, this ensures all currently executing tasks is completed before exiting:
1 | $ celery multi stopwait w1 -A proj -l info |
About the –app argument
The –app argument specifies the Celery app instance to use, it must be in the form of module.path:attribute
But it also supports a shortcut form If only a package name is specified, where it’ll try to search for the app instance, in the following order:
With –app=proj:
an attribute named proj.app, or
an attribute named proj.celery, or
any attribute in the module proj where the value is a Celery application, or
If none of these are found it’ll try a submodule named proj.celery:
an attribute named proj.celery.app, or
an attribute named proj.celery.celery, or
Any attribute in the module proj.celery where the value is a Celery application.
This scheme mimics the practices used in the documentation – that is, proj:app for a single contained module, and proj.celery:app for larger projects.
Routing
Celery supports all of the routing facilities provided by AMQP, but it also supports simple routing where messages are sent to named queues.
The task_routes setting enables you to route tasks by name and keep everything centralized in one location:
1 | app.conf.update( |
You can also specify the queue at runtime with the queue argument to apply_async:
1 | from proj.tasks import add |
You can then make a worker consume from this queue by specifying the celery worker -Q option:
1 | $ celery -A proj worker -Q hipri |
You may specify multiple queues by using a comma separated list, for example you can make the worker consume from both the default queue, and the hipri queue, where the default queue is named celery for historical reasons:
1 | $ celery -A proj worker -Q hipri,celery |
The order of the queues doesn’t matter as the worker will give equal weight to the queues.
To learn more about routing, including taking use of the full power of AMQP routing, see the Routing Guide.
Celery Best Practice
Deni Bertović’s article about Celery best practices
No.1: Don’t use the database as your AMQP Broker
A database is not built for doing the things a proper AMQP broker like RabbitMQ is designed for.
AMQP - Advanced Message Queue Protocol
Heaven IO disk problem
Something like RabbitMQ which implement amqp use memory to reside in.
No.2: Use more Queues (ie. not just the default one)
Don’t put all tasks in same queue, because
No.3: Use priority workers
The way to solve the issue above is to have taskA in one queue, and taskB in another and then assign x workers to process Q1 and all the other workers to process the more intensive Q2 as it has more tasks coming
So, define your queues manually:
1 | CELERY_QUEUES = ( |
And your routes that will decide which task goes where:
1 | CELERY_ROUTES = { |
Which will allow you to run workers for each task:
1 | celery worker -E -l INFO -n workerA -Q for_task_A |
No.4: Use Celery’s error handling mechanisms
Default retry number and retry delay
1 | @app.task(bind=True, default_retry_delay=300, max_retries=5) |
No.5: Use Flower
No.6: Keep track of results only if you really need them
Tutor
Concurrency with Eventlet
Eventlet : A concurrent networking library for Python that allows you to change how you run your code, not how you write it.
You can enable the Eventlet pool by using the celery worker -P worker option.
1 | $ celery -A proj worker -P eventlet -c 1000 |