一、背景介绍
- 由于被测系统的复杂度不断上升,导致传统的测试方法成本上升且测试效率大幅下降;因此引入自动化测试是必然趋势。
鉴于接口测试相对于UI测试更加稳定,且相对容易自动化持续集成;故以接口方式作为自动化项目切入,主要目的如下: - 1、用于回归测试,解决手工回归测试带来的繁琐且重复工作;
- 2、线上或线下巡检测试,结合jenkins部署持续集成,及时发现运行环境存在的问题;
- 3、提升团队自动化测试能力,为系统业务提供强有力的测试手段;
二、构思设计及主要实现点
2.2)、需实现的功要点:
1、 可持续集成采用方式:构建部署 gitlab + jenkins+ docker 方式
2、 自动化执行-触发点多元化:
2.1)、API接口方式:通过接口字段传参方式,触发自动化测试用例执行;
2.2)、jenkins构建部署方式:通过被测对象构建时,流水线触发自动化测试用例执行;
2.3)、命令行方式:在部署的docke容器内或者本地开发调试时,通过命令行方式触发执行;
3、 编程代码与测试数据分离:自动化服务部署后,因测试用例数据仍是高频操作模块,所以代码和测试数据是需要做分离的,整块测试数据以yaml方式设计管理;
4、 高频的用例数据及文件处理方式:
4.1)、可通过前端页面交互方式,支持对测试用例文件上传、下载操作;
4.2)、可通过API接口方式对用例数据进行增删改查(即编辑数据无需进入docker容器内修改,更加便捷);
5、 缓存机制处理:主要对于账号登录信息(如token)、被测接口有依赖的前置数据、流程用例方面依赖接口的数据等,需要做缓存机制处理;
6、 测试报告功能:为了便于项目所有人查看,测试报告需支持在线链接查看(此块引用Allure服务);
说明:此块在线 Allure server是引用GitHub上的大佬(kochetkovma),有需要可直接去搜索下
7、 日志记录留痕:基本的日志记录信息,这是常规必备点。
2.2)、接口自动化测试-架构图:
-
其实看这张图就足够了,表达的思路都是这里,也画了我差不多一天时间,吐血~
三、直接看最后的成果(以触发点1和2为例):
1、触发点1:通过接口请求触发(引用postman工具):
- 步骤1、调用触发执行接口:传入执行命令(以pytest命令为基础);如图“-m smoke_flow_one -s”只执行指定标签的用例;
- 步骤2、执行接口响应:接口返回本次执行用例的状态(msg字段:通过或不通过)以及测试报告链接(data字段);
-
步骤3、测试报告:打开链接可查看到本次执行用例的详细情况;
2、触发点2:被测对象jar包更新-触发执行(jenkins):
- 步骤1、jenkins流水线配置:被测服务在流水线配置中添加触发自动化测试节点,在被测对象部署完毕后会进入自动化执行环节;
- 步骤2、流水线节点:根据自动化测试节点判断本次构建是否测试通过,且可通过jenkins logs内的链接查看完整测试报告;
-
步骤3、测试报告:打开链接查看本次执行用例的详细情况;
四、高频操作的测试用例数据及文件处理:
4.1、数据操作:通过接口调用修改用例数据-->>增删改查(如下以“查”和“改”为示例):
-
步骤1、根据传入的类型字段及指定文件,可查询用例数据:
-
步骤2、根据传入的字段参数,可修改对应的用例数据:
4.2、文件操作:以页面形式对用例文件进行上传/下载操作(水平有限,只能搞简陋版的html页面了):
-
步骤1、对文件上传:对已修改的用例文件或者需要新增文件,可上传到自定义指定目录;
-
步骤2、对文件下载:通过html页面进入各目录,可自定义选择文件下载;
四、自动化项目结构-示例( 因琐碎点太多,只抽取主流点作为说明)
- 作为一个测试编程新手,比不了专业的开发人员;各位大佬,如有优化或其他构思思路欢迎指点评论,我再调整优化;
4.1、项目目录结构说明:
4.2、入口路由方面(以触发1为例)-->示例:
1)、入口(Flask框架):摘取server.py一小段接口样例
'''摘取server.py其中一个接口'''
app = Flask(__name__)
@app.route("/autotest/runcase", methods=['POST'])
def run_auto_test():
"""运行自动化:return:"""
try:
command = request.json.get("command").strip()
except AttributeError:
return jsonify({"code": -1, 'msg': '缺少必要参数:command'})
# executor.submit(__run_jobs, run_command)
re_data, re_start = run_py(['0', command])
re_msg = '通过' if re_start == 0 else '不通过'
return jsonify({"code": re_start, "msg": "自动化测试:" + re_msg, "data": re_data})
'''摘取runner.py部分中pytest命令处理模块'''
lock = threading.Lock()
def run_py(cmd_system):
case_config = Case_Config()
now_time = time.strftime('%Y%m%d-%H%M%S')
report_data = 'TestReport/report_data/{}/'.format(now_time)
report_html = 'TestReport/html/{}/'.format(now_time)
run_sys = 'pytest {} --alluredir={} --clean-alluredir'.format('Testsuites/TestCase/' + str(cmd_system[1]),
report_data)
with lock: # 加锁
sys_start = os.system(run_sys)
if sys_start != 0 and sys_start != 1 and sys_start != 256:
sys_start_msg = f'自动化执行出现异常(start={sys_start}),本次执行命令:{run_sys}'
logger.error(sys_start_msg)
return {"msg": sys_start_msg}, -1
re_data = '1、执行命令:run_sys=' + run_sys
try:
if cmd_system[2] in ('debug', 'false', False):
run_allure = '{} generate {} -o {} --clean'.format(r'allure-2.17.0\bin\allure', report_data, report_html)
re_data = re_data + '\n2、执行命令:run_allure={};'.format(run_allure)
os.system(run_allure)
except IndexError:
ip_port = case_config.yml_cf_data()['allure_ip_port']
allure_report_url = AllureServer(ip_port).auto_report(report_data, now_time)
re_data = {'报告链接地址': allure_report_url, "执行命令": run_sys}
test_result = case_config.yml_cf_data('Testsuites/TestData/com_data/test_result.yml', 'test_result')
logger.info('***用例自动化测试结果:' + test_result)
re_start_code = 0 if test_result == 'pass' and sys_start == 0 else -1
logger.info(re_data)
return re_data, re_start_code
2)、摘取其中一段test_x.py测试用例代码-->示例:
from Testsuites.TestModel import InitTestData
import pytest
import allure
USE_CASE_PATH = 'tp_zds_app/auto_getuser.yml'
@pytest.mark.run(order=131)
@allure.epic("Web-后端管理页面")
@allure.feature("案例事务操作模块")
@allure.story("推送案例")
class TestAutoGetUser:
@classmethod
def setup_class(cls): # 类初始化(前置,全局一次); teardown_class(cls)=后置全局一次
cls.init_data = InitTestData.init_test_data(USE_CASE_PATH, api_host='host_gateway', login_token=None,
pgsql_connect=True)
cls.init_case_run = cls.init_data['case_run']
cls.regist_no = cls.init_data['local_var']['regist_no']
cls.sql_Pre = cls.init_data['local_var']['sql_Pre']
def setup_method(self): # 方法初始化
print('\n === 测试案件前置动作 ===')
run_sql = self.init_data['regular_data'](self.sql_Pre, {'regist_no': self.regist_no})
self.init_data['pgsql_db'](run_sql, 'delete')
@pytest.mark.smoke_flow_four
@pytest.mark.smoke_main
def test_auto_getuser_1(self, set_global_data):
case_data = self.init_data['case_yml_data']['case_001'] #定义哪一条用例
request_data = case_data['request_data']['body'] #定义接口入参数据
expect_data = case_data['expect_data']['response_data'] #定义断言模块
status_code = case_data['expect_data']['status_code'] #定义接口状态码
sql_check = self.init_data['regular_data'](case_data['other_data']['sql_check'], {'regist_no': self.regist_no}) #接口场景涉及的sql执行,regular_data方法为用例内的变量状态作用,可支持多个;
self.init_case_run(case_data['case_name'], status_code=status_code, request_data=request_data,
expect_data=expect_data, sql_check=sql_check) #执行自动化测试
@pytest.mark.parametrize('case_code',['case_002','case_003','case_004']) #数据驱动:可执行多条相同验证场景用例,也可以调用方法CaseIdMake(case_002:case004),代表执行2到4用例
def test_auto_getuser_2_4(self, get_global_data,case_code): #set_global_data和get_global_data方法是用于单次程序执行中,存/取全局变量(用于依赖接口数据之间的临时交互);
case_data = self.init_data['case_yml_data'][case_code]
request_data = case_data['request_data']['body']
expect_data = case_data['expect_data']['response_data']
status_code = case_data['expect_data']['status_code']
self.init_case_run(case_data['case_name'], status_code=status_code, request_data=request_data, expect_data=expect_data)
4.3、yaml文件用例/配置数据-->>示例:
- 3.1)、测试用例数据:
local_var:
remarks: 案例信息模块-更改案例状态
headers: &headers { "Content-Type": "application/json" }
api_name: &api_name auto_getuser
cache_data: &cache_data { 'path': 'user_cache/users_current.yml','key': 'users_current' }
case_001:
case_name: 无法联系客户
other_data: { 'sql_check': { 'sql_node': [ "select status from case_info where regist_no='&{regist_no}&' and is_deleted='f'" ], 'actual_node': [ 'expect_data' ] } }
request_data: { "body": [ { "taskType": 4,"customized": false,"exchangeReasonId": 2 } ],"headers": *headers }
expect_data: { 'status_code': 200,'response_data': [ { "status": 50 } ] }
remarks: 无
case_002:
case_name: 客户不同意使用
other_data: { 'sql_check': {} }
request_data: { "body": [ { "taskType": 4,"customized": false,"exchangeReasonId": 3 } ],"headers": *headers }
expect_data: { 'status_code': 200,'response_data':{"msg":"任务提交成功"} }
remarks: 无
case_003:
case_name: 异常场景-类型错误
other_data: { 'sql_check': {} }
request_data: { "body": [ { "taskType": "错误类型","customized": false,"exchangeReasonId": 3 } ],"headers": *headers }
expect_data: { 'status_code': 409,'response_data':{"msg":"类型错误"} }
remarks: 无
case_004:
case_name: 异常场景-缺少字段
other_data: { 'sql_check': {} }
request_data: { "body": [ { "customized": false,"exchangeReasonId": 3 } ],"headers": *headers }
expect_data: { 'status_code': 409,'response_data':{"msg":"请求异常缺少参数"} }
remarks: 无
- 3.2)、测试接口配置文件:
local_var: &local_var
url: &url /api/api/
headers: &headers { 'Content-Type': 'application/json' }
case_mine: { 'name': '个人会话列表接口','method': 'get','headers': *headers,'url': '/api/v2/case/mine','data': { } }
case_send_sms: { 'name': '短信发送(变更手机号码)','method': 'post','headers': *headers,'url': '/api/case/send-sms','data': { } }
auto_getuser: { 'name': '获取案例信息', 'method': 'post', 'headers': *headers, 'url': '/api/auto/getuser', 'data': { } }
cases_all: { 'name': '会话历史查询接口','method': 'get','headers': *headers,'url': '/api/cases/all','data': { } }
4.4、内部核心的InitTestData使用介绍-->>示例:
class InitTestData:
@staticmethod
def init_test_data(use_case_path: str, api_name: Optional[str] = None, headers: bool = True,
login_token: Optional[str] = 'web',pgsql_connect: Optional[Any] = None,
api_file: Optional[str] = None, api_host: Optional[str] = 'host_web') -> Dict[str, Any]:
"""
初始化测试方法,做二次整合封装处理
:param use_case_path: 测试用例路径。
:param api_name: API名称。
:param headers: 是否包含头信息。
:param login_token: 登录令牌数据文件名。
:param pgsql_connect: SQL数据连接对象。
:param api_file: API文件名。
:param api_host: API主机名。
:return: 包含系统配置、正则方法对象、头部信息、用例执行对象、用例YAML数据、本地变量、获取配置数据对象和获取数据库调用对象。
"""
def get_auth_header(file_name='_token.txt') -> Optional[Dict[str, str]]:
if not login_token:
return None
token = case_config.cache_token(file=login_token + file_name)
return {'Authorization': f'Bearer {token}'}
case_config = Case_Config(use_case_path)
yml_conf_data = case_config.yml_cf_data
system_config = yml_conf_data()
regular_data = case_config.regular_data
yml_data = case_config.yml_data
case_yml_data = yml_data()
local_var = case_yml_data['local_var']
if pgsql_connect:
pgsql_connect = system_config['pgsql_zds']
auth_header = None
# 如果headers为False即不传头部,login_token判断是否需要带token
if headers:
auth_header = get_auth_header()
headers_dict = {**auth_header} if auth_header else {}
headers_dict.update(local_var.get('headers', {}))
headers_value = headers_dict if headers else {}
api_name_value = api_name or local_var['api_name']
class_case_run = CaseRun(api_name_value, pgsql_connect=pgsql_connect, headers=headers_value, api_file=api_file,
api_host=system_config[api_host])
return {
'system_config': system_config, #此对象用于获取系统配置;
'regular_data': regular_data, #此对象用于变量转换、替换,即&{variable}&正则方式转换;
'headers': headers_value, #此对象用于满足不同接口场景下,头部的入参情况(即可默认头部、新增、为空等方式)
'case_run': class_case_run.case_run, #此对象用于发起测试动作
'case_yml_data': case_yml_data, #用于测试用例数据读取作用
'local_var': local_var, #单个yml用例的全局变量获取作用
'yml_conf_data': yml_conf_data, #主要用于依赖接口的缓存数据:存储和读取作用,以yml文件形式
'yml_data': yml_data, #用于其他测试场景下,自行读取指定yml数据
'pgsql_db': class_case_run.pgsql_db #用于满足,额外场景需数据库调用的操作
}
4.5、测试报告处理逻辑-->>示例:
import requests
import time, json
import zipfile, os
from Testsuites.TestModel import Case_Config
class AllureServer(object):
"""调用allure-server服务生成报告,并返回链接地址"""
def __init__(self, ip_port):
self.result_url = 'http://' + ip_port + '/api/result'
self.report_url = 'http://' + ip_port + '/api/report'
self.test_result = 'Testsuites/TestData/com_data/test_result.yml'
def zip_file(self, src_dir, zip_name='allure.zip'):
z = zipfile.ZipFile(src_dir + zip_name, 'w', zipfile.ZIP_DEFLATED)
for dir_path, dir_names, filenames in os.walk(src_dir):
fpath = dir_path.replace(src_dir, '')
fpath = fpath and fpath + os.sep or ''
for filename in filenames:
if filename != zip_name:
z.write(os.path.join(dir_path, filename), fpath + filename)
z.close()
return zip_name
def api_result(self, file_name):
file_zip = open(file_name, 'rb')
files = {'allureResults': (file_name, file_zip, 'application/x-zip-compressed')}
re_data = requests.post(self.result_url, files=files)
return json.loads(re_data.text)
def api_report(self, case_uuid, date_time=None, now_time=None): #第一步:报告数据源整合及上传
date_time = date_time if date_time else time.strftime('%Y%m%d')
now_time = now_time if now_time else str(int(time.time() * 1000))
json_data = {"reportSpec": {"path": [date_time, now_time], "executorInfo": {"buildName": now_time}},
"results": [case_uuid], "deleteResults": False}
re_data = requests.post(self.report_url, json=json_data, headers={"Content-Type": "application/json"})
return json.loads(re_data.text)
def auto_report(self, file_path, *time_data): #第二步:报告生成html方式,并引出报告链接
zip_name = self.zip_file(file_path)
result_uuid = self.api_result(file_path + zip_name)['uuid']
time.sleep(2)
report_data = self.api_report(result_uuid, *time_data)['latest']
Case_Config().yml_cache('report_link', report_data, no_cache=self.test_result)
return report_data
五、日志模块方面:
class Log(object):
level_relations = { 'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'crit': logging.CRITICAL} # 日志级别关系映射
def __init__(self, filename, level='info', when='D', backCount=3,
fmt='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s'):
self.logger = logging.getLogger(filename)
format_str = logging.Formatter(fmt) # 设置日志格式
self.logger.setLevel(self.level_relations.get(level)) # 设置日志级别
sh = logging.StreamHandler() # 往屏幕上输出
sh.setFormatter(format_str) # 设置屏幕上显示的格式
th = handlers.TimedRotatingFileHandler(filename=filename, when=when, backupCount=backCount,
encoding='utf-8') # 往文件里写入#指定间隔时间自动生成文件的处理器
# 实例化TimedRotatingFileHandler
# interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:
# S 秒\M 分\H 小时\D 天\W 每星期(interval==0时代表星期一)\midnight 每天凌晨
th.setFormatter(format_str) # 设置文件里写入的格式
self.logger.addHandler(sh) # 把对象加到logger里
self.logger.addHandler(th)
def create_file():
filepath = os.path.realpath(__file__)
output_dir = os.path.abspath(os.path.join(filepath, "../../logs"))
if not os.path.exists(output_dir):
os.makedirs(output_dir)
log_name = '{}.log'.format(time.strftime('%Y-%m-%d'))
filename = os.path.join(output_dir, log_name)
return filename
filename = create_file()
log = Log(filename, level='debug')
logger = log.logger
**备注:其他细节就不摘取示例了,内容太多。主要目的是表达搭建的构思以及看下示例,有个初步的概念;
六、内部方法使用介绍:
-
6.1)、类初始化使用说明:
-
6.2)、接口用例执行功能点说明:
七、结束收尾语:
- 1、自动化测试项目写写停停花了两个多月,虽还有很多需要完善和优化的,但都已完成原始架构的设计;好歹算是个成品了。
- 2、目前自动化项目已经在实际系统上使用,整体效果还不错,基本上能满足常规的回归测试场景,提升测试效率,非常nice ~
- 3、各位测试伙伴们,新手测试的第一次构思搭建这自动化项目,不妥之处望给与指点纠正。
- 4、还有动动小手,点点赞撒;
- 5、对外可看示例的网站地址:if True测试网 (zhu-iftrue.cloud)