知乎文章:
Celery 快速入门
某博客:
Celery 快速入门
周期任务
celery 分布式广播通知
class based celery tasks
celery 官方总结的常见问题
celery有什么难理解的
记一次 celery 中的内存泄露问题
自己的要求:必须精通celery 因为自已要用到( tm 在其他应用里面 的 tasks 文件里面 @share_task 还是没试成功, 老是报错, 文章后面会提一下)
其实用 @app.task 吧,能解决大部分任务了,share_task 的在找一下原因
简介:
Celery是一个简单,灵活且可靠的分布式系统,可以处理大量消息,同时为操作提供维护该系统所需的工具。
这是一个任务队列,着重于实时处理,同时还支持任务调度
安装redis的celery:
pip install -U "celery[redis]"
创建任务实例:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
# broker 是你redis 的地址,默认使用 0 数据库
@app.task
def send_mail(email):
print("send mail to ", email)
import time
time.sleep(5)
return "success"
# 函数用app.task 装饰器修饰之后,就会成为Celery中的一个Task。
启动任务:
先启动redis-server:
redis-server.exe
进入redis 安装目录写,或者配置好全局变量后直接写
运行一个任务:
最好是进入模块所在的目录运行 其中tasks 是模块名字
celery -A tasks worker --loglevel=info
要调用我们的任务,您可以使用[delay()
]
新启动命令窗口,进到模块目录 :
from 模块 import 任务函数
运行 funcname.delay([args..])
结果如下图
调用周期任务,必须先添加完整的定时表:
from celery import Celery
from celery.schedules import crontab
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)
或者手动:
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'
启动周期任务:
celery -A tests beat
启动worker 节点,运行任务:
celery -A tests worker
上面的tests 都是写任务那个模块的名字
结果:
目录下 celery 的任务运行
[django 中使用celery]%E5%8A%A1/)
我们项目大致使用celery 的思路:
pip install django_celery_beat
并且 注册到 app :
django_celery_beat,
settings.py
# CELERY_RESULT_BACKEND = 'django-db'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = False # fix bug https://github.com/celery/django-celery-beat/issues/109
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERYD_MAX_TASKS_PER_CHILD = 10
celery.py:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ops.settings')
app = Celery('ops',
broker=broker)
app.config_from_object('django.conf:settings', namespace='CELERY')
# 然后 就是settings 里面那些 配置了
# 最后来一句
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
关于 share_task 和 task 的区别
简单说就是 share_task 不需要绑定到具体的 celery app 实例, >@shared_task将为每个应用程序创建任务的独立实例,从而使任务可重用
这个就是说,你的 任务和 celery 应用是解耦的。大概是这样,
你的任务也可以直接在其他项目中使用。和本身dj 项目没关系?
原来我 celery 里有个 配置顺序写错了,看了官方文档特意强调了这个。我才认识到这个问题
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'transform_server.settings')
app = Celery('transform_server', broker="redis://:{passwd}@localhost:6379/0".format(passwd=encrypt_passwd)) # 1
app.config_from_object('django.conf:settings', namespace='CELERY') # 2
app.autodiscover_tasks()
os.environ.setdefault 这一句写在 app 初始化之前。否则 其他的应用的 tasks 注册不了
看 celery 解释(ps:改了task 的代码,记得要重启一下,celery 的服务,不然用的还是老代码)
在生产环境 就最好不要使用 celery -A 这种方式了, 因为这个是 开发调试用的.
生产推荐使用 supervisor
安装 :
1 pip install supervisor
参考: https://www.cnblogs.com/debochan/p/10823984.html
2 echo_supervisord_conf > supervisord.conf
3 supervisord.conf 在最后一行添加内容:
[program:celery]
#Set full path to celery program if using virtualenv
command=celery -A worker transform_server --loglevel=INFO # -A 在前面 worker 在后面
directory=/usr/src/transform_server
# environment=CELERY_CONFIG_MODULE="celerytask.celeryconfig-dev"
numprocs=1 ;进程数
autostart=true ;当supervisor启动时,程序将会自动启动
autorestart=true ;自动重启
4 启动supervisor:
supervisord -c supervisord.conf
5 关闭supervisord
supervisorctl -c supervisord.conf shutdown
6.重启supervisord
supervisorctl -c supervisord.conf reload
supervisor 日志配置文件 默认在 /tmp/supervisord.log 目录
注意 由于celery 启动 并不是和 服务一起的,他是两个进程的服务,所以不要像,用 celery 改变当前服务的内存方面的东西,比如我想 直接在任务函数里面修改 server 程序的 类属性,就不会生效的, celery 能做的就是 除了进城之外的,还是哪个话题,进城通信, 而且是不同机器的,只能通过数据库,等持久化的中间层了。或者通过 request 请求。
关于 request 请求更改服务进城的属性,已经验证过了,是会影响当前进城的其他地方的是使用