在Python开发过程中我们经常需要执行定时任务,而此类任务我们通常有如下选项:
- 自己造轮子
- 使用schedule库
- 使用Celery定时任务
- 使用APScheduler
自己造轮子实现,最大的优势就是灵活性,调试方便,对于某些特定系统也许也是一种选择,不过对于大多数应用来说,我们应当尽可能地使用开源的成熟的方案。下面对后三种方案分别讨论:
使用schedule库
schedule库是一个轻量级的定时任务方案,优势是使用简单,也不需要做什么配置;缺点是无法动态添加任务,也无法将任务持久化。
安装
pip install schedule
使用
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
while True:
schedule.run_pending()
time.sleep(1)
使用Celery
Celery在Python领域可谓大名鼎鼎,我们通常将Celery作为一个任务队列来使用,不过Celery也同时提供了定时任务功能。通常,当我们的解决方案中已经在使用Celery的时候可以考虑同时使用其定时任务功能,但是Celery无法在Flask这样的系统中动态添加定时任务(在Django中有相应的插件可以实现动态添加任务),而且如果对于不使用Celery的项目,单独为定时任务搭建Celery显得过于重量级了。(搭建Celery比较麻烦,还需要配置诸如RabbitMQ之类消息分发程序)。
Celery安装在此不再赘述,大家可以参考官网的资料
使用
Celery虽然无法动态添加定时任务,但是可以在程序固定位置添加定时任务,如下:
from celery import Celery
from celery.schedules import crontab
app = Celery()
# 此处on_after_configure装饰符意味着当Celery app配置完成之后调用该hook函数
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls test('hello') every 10 seconds.
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# Calls test('world') every 30 seconds
sender.add_periodic_task(30.0, test.s('world'), expires=10)
# Executes every Monday morning at 7:30 a.m.
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)
- 这里调用
add_periodic_task
用于添加一个定时任务,相当于在Celery config文件中的beat_schedule设置项中添加了一项,如下:app.conf.beat_schedule = { 'add-every-30-seconds': { 'task': 'tasks.add', 'schedule': 30.0, 'args': (16, 16) }, }
- 在
add_periodic_task
中指定job function时需要用.s()
来调用
使用APScheduler
笔者认为APScheduler是在实际项目最好用的一个工具库。它不仅可以让我们在程序中动态添加和删除我们的定时任务,还支持持久化,且其持久化方案支持很多形式,包括(Memory, MongoDB, SQLAlchemy, Redis, RethinkDB, ZooKeeper), 也可以非常好与一些Python framework集成(包括asyncio, gevent, Tornado, Twisted, Qt). 笔者所在的项目使用的是Flask框架,也有相应的插件可以供我们直接使用。
但是笔者没有使用插件,而是直接将APScheduler集成于项目代码中。
初始化scheduler
# 可以在初始化Flask的时候调用,并将返回的scheduler赋给app
def init_scheduler():
# 这里用于持久化的设置,代码中演示使用MongoDB
# client用于设置你自己的MongoDB的handler, 即MongoClient对象
jobstores = {
'default': MongoDBJobStore(client=your_db_handler, collection="schedule_job")
}
executors = {
'default': ThreadPoolExecutor(20)
}
job_defaults = {
'coalesce': False,
'max_instances': 5
}
# 这里使用BackgroundScheduler即可
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
# 注意这里一定要调用start启动scheduler
scheduler.start()
return scheduler
添加定时任务
APScheduler将定时任务分为三种:
- interval: 比如每隔5分钟执行一次任务
- cron: 比如每天早上5点执行一次任务
- date: 比如在2018年5月5日执行一次任务
我们以添加cron job为例:
def test_job(name):
print "hello, %s" % name
def add_daily_job(name):
exec_time = datetime.now() + timedelta(minutes=2)
hour = exec_time.strftime("%H")
minute = exec_time.strftime("%M")
# 这里要选择'cron'
# 另外,job_id可以根据你自己的情况设定,其会被用于remove_job
current_app.scheduler.add_job(
test_job, 'cron', hour=hour, minute=minute,
args=[name], id=job_id)
删除定时任务
通过在add_job时使用的job_id可以删除对应的定时任务。实际上在我们添加任务的时候,APScheduler会把相应的任务信息存储于我们jobstore中设置的持久化存储方案,这里使用的是MongoDB,然后当删除的时候会将相应的任务从MongoDB中删除。
def remove_daily_job(job_id):
current_app.scheduler.remove_job(job_id)
总结:
APScheduler在实际使用过程中拥有最大的灵活性,可以满足我们的大部分定时任务的相关需求;Celery比较重量级,通常如果项目中已有Celery在使用,而且不需要动态添加定时任务时可以考虑使用;schedule非常轻量级,使用简单,但是不支持任务的持久化,也无法动态添加删除任务,所以主要用于简单的小型应用。