这一个非常精简的组合,但是功能实现了,目的是为了理解。后续会在一步一步的优化和改进。
操作环境
系统:CentOS7.4
Python:3.6.8
celery:4.2.1
Flask:1.0.2
1. Redis操作
安装Redis,启动服务,测试是否成功
[root@python-server01 ~]# yum install redis
[root@python-server01 ~]# systemctl start redis
[root@python-server01 ~]# redis-cli
127.0.0.1:6379> ping
PONG
127.0.0.1:6379>
2. Python操作
确保编译安装Python之前安装了bzip2-devel。如果没有,需要再安装bzip2-devel后重新编译安装一次Python。否则会出现报错:ModuleNotFoundError: No module named '_bz2'
。
yum install bzip2-devel
在Python环境下安装Flask、celery
pip install flask
pip install celery
3. Flask程序操作
主程序内容如下
import time
from flask import Flask
from celery import Celery
flask_app = Flask(__name__)
flask_app.config['CELERY_BROKER_URL'] = 'redis://127.0.0.1:6379/0'
celery = Celery(flask_app.name, broker=flask_app.config['CELERY_BROKER_URL'])
@celery.task()
def add_together(a, b):
time.sleep(60)
return a + b
@flask_app.route('/')
def hello_world():
with flask_app.app_context():
result = add_together.delay(23, 42)
return 'Hello World!'
if __name__ == '__main__':
flask_app.run(host='192.168.17.128', port=80)
4. 启动操作
先启动celery,监听任务消息。(app.celery:app是app.py,celery是app.py里面的Celery实例)
(venv3_test01) [root@python-server01 flask01]# ls
app.py __pycache__ static task.py templates
(venv3_test01) [root@python-server01 flask01]# celery -A app.celery worker --loglevel=info
当启动之后,有任务在后台工作时,会看到类似日志:
[2019-03-04 02:59:38,007: INFO/MainProcess] celery@python-server01.localdomain ready.
[2019-03-04 02:59:59,943: INFO/MainProcess] Received task: app.add_together[0c803786-cbe8-41b8-87b5-40d1d441559d]
[2019-03-04 03:01:00,010: INFO/ForkPoolWorker-1] Task app.add_together[0c803786-cbe8-41b8-87b5-40d1d441559d] succeeded in 60.06519331100026s: 65
然后启动web程序,访问测试,可以立即看到“Hello World!”字样,无需等待60秒。web日志如下:
* Running on http://192.168.17.128:80/ (Press CTRL+C to quit)
975efddd-4e58-4414-b0fb-66613dcf1c5e
192.168.17.1 - - [04/Mar/2019 03:09:11] "GET / HTTP/1.1" 200 -
ee875a60-22a0-42cb-a8f5-37f11dcf6cba
192.168.17.1 - - [04/Mar/2019 03:09:29] "GET / HTTP/1.1" 200 -
17e4157a-c8e4-480e-8d22-ff125d2d4795
192.168.17.1 - - [04/Mar/2019 03:09:30] "GET / HTTP/1.1" 200 -
优化
1. 添加对 Flask 应用上下文的支持
这样优化之后,不需要每次调用的时候都要写with flask_app.app_context()。使用起来更加方便。
参考链接:http://docs.jinkan.org/docs/flask/patterns/celery.html
优化后的代码如下
import time
from flask import Flask
from celery import Celery
flask_app = Flask(__name__)
flask_app.config['CELERY_BROKER_URL'] = 'redis://127.0.0.1:6379/0'
def make_celery(app):
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery(flask_app)
@celery.task()
def add_together(a, b):
time.sleep(60)
return a + b
@flask_app.route('/')
def hello_world():
result = add_together.delay(23, 42)
print(result)
return 'Hello World!'
if __name__ == '__main__':
flask_app.run(host='192.168.17.128', port=80)