为了实现快速高效使用计算集群解决大量测试用例管理和执行的问题,基于Celery和Django的分布式自动化测试,其由API服务器层、用例管理层、任务调度层和任务执行层组成四层架构,实现了定时调度测试、分布式执行、失败重试等功能。能够快速部署和配置测试执行节点,实现了充分利用计算集群资源、提高测试效率的目的。
自动化测试Celery工作原理:
Django Celery部署
- 1. 安装
celery
首先,我们必须拥有一个broker
消息队列用于发送和接收消息。Celery
官网给出了多个broker
的备选方案:RabbitMQ
、Redis
、Database
以及其他的消息中间件。我这边使用的是Redis
作为消息中间人。
django-celery-beat
定时任务
django-celery-results
存储Celery
任务结果第三方插件,我这边是根据业务逻辑重新设计了数据结构
pip install celery==5.0.5
pip install redis==3.5.3
pip install django-celery-beat==2.2.0
pip install django-celery-results==2.0.1
- 2. 注册
APP
INSTALLED_APPS = [
....
'django_celery_beat',
'django_celery_results',
]
- 3. 配置
settings.py
# 设置代理人broker
broker_url = f'redis://{HOST}:6379'
# 使用django orm 作为结果存储
result_backend = 'django-db'
# celery 的启动工作数量设置
worker_concurrency = 5
# 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
worker_prefetch_multiplier = 5
# celery 的 worker 执行多少个任务后进行重启操作
worker_max_tasks_per_child = 100
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
worker_disable_rate_limits = True
# 指定任务接受的序列化类型
accept_content = ['json']
# 指定任务序列化方式
task_serializer = 'json'
# 指定结果序列化的方式
result_serializer = 'json'
# celery beat配置(周期性任务设置)
timezone = 'Asia/Shanghai'
enable_utc = False
beat_sync_every = 1
# settings USE_TZ=False时添加该选项,否启动 django celery beat 的时候,出现这个错误TypeError: can't compare offset-naive and offset-aware datetimes
DJANGO_CELERY_BEAT_TZ_AWARE = False
# 休眠最大秒数
beat_max_loop_interval = 300
beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'
- 4. 新增
celery_tasks
文件
"""目录结构"""
├── celery_tasks
│ ├── init.py
│ ├── celery.py
# celery.py
# 将Celery连接到应用程序
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# 为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ServerDjango.settings')
app = Celery('celery_tasks')
# 加载配置
app.config_from_envvar('DJANGO_SETTINGS_MODULE')
# 设置app自动加载任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# init.py
from celery_tasks.celery import app as celery_app
__all__ = ['celery_app']
在celery.py
中设定了对settings.py
中INSTALLED_APPS
做autodiscover_tasks
,Celery
便会去查看INSTALLD_APPS
下包含的所有app
目录中的tasks.py
文件,找到标记为task
的方法,将它们注册为celery task
。需要注意的是,与一般的.py
中实现celery
不同,tasks.py
必须建在各app
的根目录下,且不能随意命名。
- 例:
tasks.py
from celery import shared_task
@shared_task
def tailf_log(channel_name, file_path):
"""跟踪日志"""
channel_layer = get_channel_layer()
try:
with open(file_path, encoding='utf-8') as f:
while True:
line = f.readline()
if line:
async_to_sync(channel_layer.send)(
channel_name,
{
"type": "send.message",
"message": str(line)
}
)
else:
time.sleep(0.5)
except Exception as e:
f.close()
print(e)
- 5. 分别启动
woker
和beat
celery -A celery_tasks worker -l info # 启动woker
celery -A celery_tasks beat -l info --scheduler django_celery_beat.schedulers.DatabaseScheduler #启动beat 调度器使用数据库
- 依据现有业务逻辑增加了任务失败重试机制、任务返回后计算下次任务执行时间以及当前任务消耗时间功能。
import celery
from celery.schedules import crontab
from django_celery_beat.models import PeriodicTask
from ManageApps.my_tasks.models import UserTasks
class CeleryTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
task = UserTasks.objects.get(task_id=kwargs['task_id'])
if kwargs["task_id"] and task.retry:
# 失败重试,默认300s
self.retry(exc=exc, countdown=300, max_retries=1)
return super(CeleryTask, self).on_failure(exc, task_id, args, kwargs, einfo)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if kwargs["task_id"]: # 关键字参数task_id, 判断是否为定时任务
task = _next_run_time(kwargs['task_id'])
elapsed_time = (datetime.datetime.now() - task.last_run_at).total_seconds()
UserTaskResult.objects.create(**{"task_id": kwargs['task_id'], "result_id": task_id,
"elapsed": round(elapsed_time, 2), "status": status})
return super(CeleryTask, self).after_return(status, retval, task_id, args, kwargs, einfo)
def _next_run_time(task_id):
"""计算任务下次运行时间"""
per_task = PeriodicTask.objects.get(id=task_id)
my_task = UserTasks.objects.get(task_id=task_id)
if per_task.crontab_id and my_task.start_time:
# 周期任务
cron_obj = CrontabSchedule.objects.get(id=per_task.crontab_id)
cron = crontab(minute=cron_obj.minute, hour=cron_obj.hour, day_of_week=cron_obj.day_of_week,
day_of_month=cron_obj.day_of_month, month_of_year=cron_obj.month_of_year)
now = cron.now() # 当前运行时间
result = cron.remaining_delta(last_run_at=now)
ends_in = (result[0] + result[1]).replace(tzinfo=None)
my_task.start_time = ends_in
elif per_task.interval_id and my_task.start_time:
# 间隔任务
interval = IntervalSchedule.objects.get(id=per_task.interval_id)
offset = datetime.timedelta(minutes=+0)
if interval.period == 'minutes':
offset = datetime.timedelta(minutes=+interval.every)
elif interval.period == 'days':
offset = datetime.timedelta(days=+interval.every)
elif interval.period == 'hours':
offset = datetime.timedelta(hours=+interval.every)
elif interval.period == 'seconds':
offset = datetime.timedelta(seconds=+interval.every)
elif interval.period == 'microseconds':
offset = datetime.timedelta(microseconds=+interval.every)
my_task.start_time = datetime.datetime.now() + offset
else:
# 第一次运行写入当前时间
my_task.start_time = datetime.datetime.now()
my_task.save()
return my_task
- 参考
django-celery-beat、django-celery-result
二次设计任务模型
from django.db import models
from django_celery_beat.models import PeriodicTask
from django_celery_results.models import TaskResult, TASK_STATE_CHOICES
class UserTasks(models.Model):
user = models.ForeignKey('user.User', on_delete=models.CASCADE, verbose_name='所属用户', help_text='所属用户',
null=True, blank=True)
task = models.ForeignKey(to=PeriodicTask, on_delete=models.CASCADE, verbose_name='所属任务', help_text='所属任务',
null=True, blank=True)
task_tags = models.CharField(max_length=255, null=True, blank=True, verbose_name='任务标签', help_text='任务标签')
notice = models.SmallIntegerField(verbose_name='任务通知', help_text='任务通知')
failfast = models.BooleanField(default=False, blank=True, verbose_name='错误停止测试机制', help_text='错误停止测试机制')
retry = models.BooleanField(default=False, blank=True, verbose_name='重试机制', help_text='重试机制')
task_type = models.BooleanField(default=False, blank=True, verbose_name='任务类型', help_text='任务类型')
last_run_at = models.DateTimeField(blank=True, null=True, verbose_name='Last Run Datetime',
help_text='计划上次触发任务运行的日期时间')
start_time = models.DateTimeField(blank=True, null=True, verbose_name='Start Datetime',
help_text='Datetime when the schedule should begin triggering the task to run',)
class Meta:
db_table = 'tb_user_tasks'
verbose_name = '用户任务'
verbose_name_plural = verbose_name
class UserTaskResult(models.Model):
result_id = models.CharField(max_length=255, null=True, blank=True, verbose_name='Result ID', help_text='结果ID')
task = models.ForeignKey(to=PeriodicTask, on_delete=models.CASCADE, verbose_name='所属任务', help_text='所属任务',
null=True, blank=True)
create_time = models.BigIntegerField(verbose_name="创建时间", help_text="创建时间")
elapsed = models.FloatField(verbose_name="耗时/s", help_text="耗时/s", null=True, blank=True, default=0.00)
status = models.CharField(max_length=50, default='PENDING', choices=TASK_STATE_CHOICES,
verbose_name='任务状态',
help_text='Current state of the task being run')
class Meta:
db_table = 'tb_user_task_result'
verbose_name = '用户任务结果'
verbose_name_plural = verbose_name
前端页面
-
任务管理
-
任务统计