为什么要使用celery?
Web应用程序适用于请求和响应周期。当用户访问您的应用程序的某个URL时,Web浏览器向您的服务器发送一个请求。Django收到这个请求,并用它做一些事情。通常它涉及在数据库中执行查询,处理数据。当Django做他的事情并处理请求时,用户必须等待。当Django完成处理请求的工作时,它会发送一个响应给最终会看到某些东西的用户。
理想情况下,这个请求和响应周期应该很快,否则我们会让用户等待太久。更糟的是,我们的Web服务器一次只能服务一定数量的用户。所以,如果这个过程很慢,它可以限制您的应用程序一次可以提供的页面数量。
大多数情况下,我们可以使用缓存,优化数据库查询等来解决此问题。但是有些情况下,没有其他的选择:需要做大量的工作。一个报告页面,大量的数据输出,视频/图像处理是你可能想要使用celery的几个例子。
我们在不是在整个项目中使用celery,而只是用于耗费时间的特定任务。这里的想法是尽可能快地响应用户,并将耗时的任务传递给队列,以便在后台执行,并始终保持服务器准备好响应新的请求。
安装
安装celery最简单的方法是使用点:
pip install Celery
现在我们必须安装RabbitMQ。
在Ubuntu 16.04上安装RabbitMQ
要将其安装在较新的Ubuntu版本上非常简单:
apt-get install -y erlangapt-get install rabbitmq-server
然后启用并启动RabbitMQ服务:
systemctlenable rabbitmq-serversystemctl start rabbitmq-server
检查状态以确保一切运行平稳:
systemctl status rabbitmq-server
在Mac上安装RabbitMQ
brew install rabbitmq
RabbitMQ脚本安装到/usr/local/sbin。你可以把它添加到您的.bash_profile或.profile。
vim ~/.bash_profile
然后将其添加到文件的底部:
export PATH=$PATH:/usr/local/sbin
重新启动终端,确保更改已生效。
现在您可以使用以下命令启动RabbitMQ服务器:
rabbitmq-server
celery基本设置
首先,考虑名为core的应用程序名为mysite的以下Django项目:
将CELERY_BROKER_URL配置添加到settings.py文件中:
settings.py
CELERY_BROKER_URL='amqp://localhost'
除了settings.py和urls.py文件之外,我们还要创建一个名为celery.py的新文件。
celery.py
import os
from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE','mysite.settings')
app=Celery('mysite')app.config_from_object('django.conf:settings',namespace='CELERY')
app.autodiscover_tasks()
现在编辑项目根目录下的__init__.py文件:
__init__.py
from.celeryimportappascelery_app__all__=['celery_app']
这将确保每次Django启动,我们的celery应用程序是重要的。
创建我们的第一个芹菜任务。
我们可以在Django应用程序中创建一个名为tasks.py的文件,并将所有的Celery任务放到这个文件中。我们在项目根目录中创建的Celery应用程序将收集在INSTALLED_APPS 配置中列出的所有Django应用程序中定义的所有任务。
为了测试目的,我们创建一个Celery任务,生成一些随机的用户帐户。
app/ tasks.py
from __future__import absolute_import, unicode_literals
import string
from celeryimport shared_task
from django.contrib.auth.modelsimport User
from django.utils.cryptoimport get_random_string
@shared_task
def create_random_user_accounts(total):
for i in range(total):
username = 'user_{}'.format(get_random_string(10, string.ascii_letters))
email = '{}@example.com'.format(username)
password = get_random_string(50)
User.objects.create_user(username=username, email=email, password=password)
return '{} random users created with success!'.format(total)
这里的重要部分是:
from celery import shared_task
@shared_task
def name_of_your_function(optional_param):
pass# do something heavy
然后我定义了一个表单和一个视图来处理我的Celery任务:
forms.py
from djangoimport forms
from django.core.validatorsimport MinValueValidator, MaxValueValidator
class GenerateRandomUserForm(forms.Form):
total = forms.IntegerField(
validators=[
MinValueValidator(50),
MaxValueValidator(500)
]
)
这个是需要一个50到500之间的正整数字段。如图:
然后在views中:
from django.contrib.auth.modelsimport User
from django.contribimport messages
from django.views.generic.editimport FormView
from django.shortcutsimport redirect
from .formsimport GenerateRandomUserForm
from .tasksimport create_random_user_accounts
class GenerateRandomUserView(FormView):
template_name ='core/generate_random_users.html'
form_class = GenerateRandomUserForm
def form_valid(self, form):
total = form.cleaned_data.get('total')
create_random_user_accounts.delay(total) # 重点 就是便是这个任务在cerely后台执行。然后Django继续处理我的视图,GenerateRandomUserView并顺利返回给用户。
messages.success(self.request,'We are generating your random users! Wait a moment and refresh this page.')
return redirect('users_list')
启动工作进程
打开一个新的终端选项卡,然后运行以下命令:
celery -A mysite worker -l info
将mysite更改为您的项目名称。结果是这样的:
现在我们可以测试它。我提交了500个表单,创建了500个随机用户。
同时,检查celery worker过程:
[2017-08-20 19:11:17,485: INFO/MainProcess] Received task:mysite.core.tasks.create_random_user_accounts[8799cfbd-deae-41aa-afac-95ed4cc859b0]
然后几秒钟后,如果我们刷新页面,用户在那里:
如果我们再次查看芹菜工作进程,我们可以看到它完成了执行:
[2017-08-20 19:11:45,721: INFO/ForkPoolWorker-2] Taskmysite.core.tasks.create_random_user_accounts[8799cfbd-deae-41aa-afac-95ed4cc859b0] succeededin28.225658523035236s:'500 random users created with success!'
用Supervisord管理生产中的工人流程
如果您将应用程序部署到DigitalOcean等 VPS ,则需要在后台运行辅助进程。在我的教程中,我喜欢使用Supervisord来管理Gunicorn的工作人员,所以它通常与Celery很合适。
首先安装它(在Ubuntu上):
sudo apt-get install supervisor
然后创建一个文件名为mysite的-celery.conf的文件夹中:/etc/supervisor/conf.d/mysite-celery.conf:
[program:mysite-celery]
command=/home/mysite/bin/celery worker -A mysite --loglevel=INFO
directory=/home/mysite/mysite
user=nobody
numprocs=1
stdout_logfile=/home/mysite/logs/celery.log
stderr_logfile=/home/mysite/logs/celery.log
autostart=true
autorestart=true
startsecs=10
; Need to waitfor currently executing tasks to finish at shutdown.
; Increase thisif you have verylong running tasks.
stopwaitsecs =600
stopasgroup=true
; Set Celery priority higher than default (999)
; so,if rabbitmqis supervised, it will start first.
priority=1000
在下面的例子中,Django项目是在虚拟环境中。路径为/ home / mysite /。
现在重新读取配置并添加新的过程:
sudo supervisorctl reread
sudo supervisorctl update
如果您不熟悉将Django部署到生产服务器并使用Supervisord,可以参考:如何将Django应用程序部署到supervisor。