1、需求
在用户操作手机时(或模拟用户操作手机),实时检测手机CPU、FPS、MEM、NET信息,确实此数据变化是否异常
2、环境
Win7+Python3+Appium
3、代码目录说明
4、运行方式
4.1 连接手机,并导入相关第三方库(pip )
4.2 修改全局变量PACKAGE_NAME、ACTIVITY_NAME的值(即确认被测应用的包名和类名)
4.3 重写User类的user_swipe_screen()方法,为读取设备相关信息做准备
4.4 运行main函数,并输入模拟用户操作的次数
5、结果说明
5.1 测试完成后,会在main.py同级目录生成下面文件
5.2 可以打开csv文件并插入折线图
源代码:
# -*- coding: utf-8 -*-
# @Time : 2021/8/18 16:50
# @Author : Shuangqi
# @File : main.py
import csv
import os
import re
import threading
import time
from time import sleep
from selenium.webdriver.support.wait import WebDriverWait
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from appium import webdriver
# 手机型号
PHONE_MODEL = os.popen("adb shell getprop ro.product.model").read().strip('\n')
# 安卓版本
ANDROID_VERSION = os.popen("adb shell getprop ro.build.version.release").read().strip('\n')
# Package/Activity
PACKAGE_NAME = "com.ss.android.article.news"
ACTIVITY_NAME = "com.ss.android.article.news.activity.MainActivity"
# 应用版本号
APP_VERSION = os.popen(f'adb shell pm dump {PACKAGE_NAME} | findstr "versionName"').read().strip("\n").split("=")[1]
print("================================")
print("【测试信息】")
print(f"【机型】:{PHONE_MODEL}")
print(f"【安卓】:Android {ANDROID_VERSION}")
print(f"【包名】:{PACKAGE_NAME}")
print(f"【类名】:{ACTIVITY_NAME}")
print(f"【版本】:{APP_VERSION}")
print("================================")
# 测试运行次数(即滑动次数)
RUNNING_TIMES = int(input("请输入滑动测试次数:"))
class Utils(object):
# 根据包名获取Pid
@staticmethod
def get_pid_by_package_name(pkg):
pid = os.popen(f'adb shell ps | findstr "{pkg}"').read()
ps_info = re.findall("\S+", pid)
return ps_info[1]
# 写入CSV
@staticmethod
def get_csv_writer(dirs, file_name, field_names):
if not os.path.exists(dirs):
os.makedirs(dirs)
now_time = time.strftime('%Y%m%d-%H%M%S', time.localtime(time.time()))
file_path = dirs + file_name + "_" + now_time + ".csv"
mem_csv = open(file_path, 'w', newline='', encoding="GBK")
writer = csv.DictWriter(mem_csv, fieldnames=field_names)
writer.writeheader()
return writer
class Phone(object):
def __init__(self):
desired_caps = {
'platformName': 'Android', # 平台名称
'deviceName': 'test', # 设备名称(任写)
'platformVersion': ANDROID_VERSION, # 安卓版本
'appPackage': PACKAGE_NAME, # 启动包名
'appActivity': ACTIVITY_NAME, # 启动 Acclivity
'noReset': False, # False即重置应用
'newCommandTimeout': 60000 # 超时时间(一分钟)
}
if ANDROID_VERSION == "11":
# Android 11 添加此参数
desired_caps["automationName"] = "uiautomator2"
# 开始初始化
print("Appium 正在初始化...")
# 启动服务/通过4723端口来建立一个会话
self.driver = webdriver.Remote('http://localhost:4723/wd/hub', desired_caps)
# 隐式等待15s
self.driver.implicitly_wait(15)
# 获取屏幕宽度
self.window_width = self.driver.get_window_size()['width']
# 获取屏幕高度
self.window_height = self.driver.get_window_size()['height']
print("Appium 初始化完成...")
# 等待元素
def wait_element(self, way, value, timeout=10, poll_frequency=0.5):
"""
:param way: 定位方式
:param value: 值
:param timeout: 超时时长(s)
:param poll_frequency: 刷新频率(s)
:return:
"""
try:
if way == "text":
WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency).until(
lambda x: x.find_element_by_android_uiautomator('new UiSelector().text("%s")' % value),
message=f'【text:{value}】等待超时')
elif way == "id":
WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency).until(
lambda x: x.find_element_by_id(value), message=f'【id:{value}】等待超时')
elif way == "desc":
WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency).until(
lambda x: x.find_element_by_accessibility_id(value), message=f'【desc:{value}】等待超时')
# xpath参数示例://*[@text="xxx"]
elif way == "xpath":
WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency).until(
lambda x: x.find_element_by_xpath(value), message=f'【xpath:{value}】等待超时')
else:
raise TypeError(f"无此定位元素方式:{way},定位方式支持:text/id/desc/xpath")
except TimeoutException:
raise
# 查找元素
def find_element(self, way, value):
"""
:param way: 定位方式
:param value: 值
:return: 返回元素
"""
try:
if way == "text":
# return self.driver.find_element_by_name(value) 已经凉了
return self.driver.find_element_by_android_uiautomator('new UiSelector().text("%s")' % value)
elif way == "id":
return self.driver.find_element_by_id(value)
elif way == "desc":
return self.driver.find_element_by_accessibility_id(value)
# xpath参数示例://*[@text="xxx"]
elif way == "xpath":
return self.driver.find_element_by_xpath(value)
else:
raise TypeError(f"无此定位元素方式:{way},定位方式支持:text/id/desc/xpath")
except NoSuchElementException:
raise
# 滑动屏幕(根据屏幕宽度,不需要坐标点)
def swipe_screen(self, swipe_type):
"""
:param swipe_type: 滑动方式(左/右/上/下/)
:return:
"""
if swipe_type == "down":
self.driver.swipe(self.window_width / 2, self.window_height / 4, self.window_width / 2,
3 * self.window_height / 4)
elif swipe_type == "up":
self.driver.swipe(self.window_width / 2, 3 * self.window_height / 4, self.window_width / 2,
self.window_height / 4)
elif swipe_type == "left":
self.driver.swipe(self.window_width / 4, self.window_height / 2, 3 * self.window_width / 4,
self.window_height / 2)
elif swipe_type == "right":
self.driver.swipe(3 * self.window_width / 4, self.window_height / 2, self.window_width / 4,
self.window_height / 2)
else:
raise TypeError(f"参数错误:{swipe_type},传入参数:down/up/left/right")
class User(Phone):
def user_swipe_screen_start(self):
# 调用一个线程滑动屏幕
t = threading.Thread(target=self.user_swipe_screen)
t.start()
def user_swipe_screen(self):
global RUNNING_TIMES
# 进入app的准备工作
try:
self.wait_element("text", "同意")
self.find_element("text", "同意").click()
self.wait_element("text", "允许")
self.find_element("text", "允许").click()
# 低配置机器提示 开启流畅模式
self.find_element("text", "开启").click()
self.wait_element("text", "流畅模式")
self.driver.keyevent(4)
self.wait_element("text", "首页")
# 滑动屏幕
while True:
self.swipe_screen("up")
# print(f"第{i}次滑动屏幕!")
RUNNING_TIMES -= 1
print(f"用户滑动次数:{RUNNING_TIMES}")
if RUNNING_TIMES == 0:
break
except NoSuchElementException:
raise
finally:
sleep(5)
self.driver.quit()
print("Driver成功退出!")
class FPS(object):
TIME = "time"
FPS = "FPS"
def __init__(self):
self.is_first = True
self.last_time = 0.0
self.last_fps = 0
self.is_running = True
def get_fps_info_start(self):
t = threading.Thread(target=self.get_fps_info)
t.start()
def get_fps_info(self):
fps_dirs = f"./{PHONE_MODEL}/fps/"
fps_file_name = f"fps_" + PHONE_MODEL + "_" + PACKAGE_NAME + "_" + APP_VERSION
fps_field_names = [self.TIME, self.FPS]
fps_command = f'adb shell dumpsys gfxinfo {PACKAGE_NAME} | findstr "Total frames"'
writer = Utils.get_csv_writer(fps_dirs, fps_file_name, fps_field_names)
while RUNNING_TIMES != 0:
if self.is_first:
self.last_time = time.time_ns()
self.last_fps = int(re.findall("\d+", os.popen(fps_command).read())[0])
self.is_first = False
current_time = time.time_ns()
current_fps = int(re.findall("\d+", os.popen(fps_command).read())[0])
time_delta = (current_time - self.last_time) / 1000000000.0
fps_delta = current_fps - self.last_fps
self.last_time = current_time
self.last_fps = current_fps
now_fps = "{:.2f}".format(fps_delta / time_delta)
writer.writerow({self.TIME: time.strftime("%H:%M:%S"), self.FPS: now_fps})
# 间隔0.5s获取数据
print(f"FPS:【{now_fps}】")
sleep(0.5)
class CPU(object):
TIME = "time"
CPU_RATE = "进程CPU占比(%)"
def get_cpu_info_start(self):
t = threading.Thread(target=self.get_cpu_info)
t.start()
# 总的CPU信息
def get_cpu_usage(self):
cpu_usage = 0.0
cpu_path = "/proc/stat"
info = os.popen(f'adb shell cat {cpu_path}').read()
result = re.split("\s+", info.split("\n")[0])
for i in range(2, len(result)):
cpu_usage += float(result[i])
return cpu_usage
# 通过ID获取cpu信息(例如此时的今日头条进程id为10808)
def get_process_cpu_usage(self):
pid = Utils.get_pid_by_package_name(PACKAGE_NAME)
cpu_path = "/proc/" + pid + "/stat"
info = os.popen(f'adb shell cat {cpu_path}').read()
result = re.split("\s+", info)
# 进程总的cpu占用
cpu_usage = float(result[13]) + float(result[14])
return cpu_usage
# 计算CPU占用
def get_cpu_info(self):
cpu_dirs = f"./{PHONE_MODEL}/cpu/"
cpu_file_name = f"cpu_" + PHONE_MODEL + "_" + PACKAGE_NAME + "_" + APP_VERSION
cpu_field_names = [self.TIME, self.CPU_RATE]
writer = Utils.get_csv_writer(cpu_dirs, cpu_file_name, cpu_field_names)
while RUNNING_TIMES != 0:
start_all_cpu = self.get_cpu_usage()
start_p_cpu = self.get_process_cpu_usage()
sleep(1)
end_all_cpu = self.get_cpu_usage()
end_p_cpu = self.get_process_cpu_usage()
cpu_rate = 0.0
if (end_all_cpu - start_all_cpu) != 0:
cpu_rate = (end_p_cpu - start_p_cpu) * 100.00 / (
end_all_cpu - start_all_cpu)
if cpu_rate < 0:
cpu_rate = 0.00
elif cpu_rate > 100:
cpu_rate = 100.00
writer.writerow({self.TIME: time.strftime("%H:%M:%S"), self.CPU_RATE: format(cpu_rate, ".2f")})
print(f"CPU:【{format(cpu_rate, '.2f')}%】")
sleep(0.5)
class MEM(object):
TIME = "time"
NATIVE_HEAP = "Native Heap(MB)"
DALVIK_HEAP = "Dalvik Heap(MB)"
def get_mem_info_start(self):
t = threading.Thread(target=self.get_mem_info)
t.start()
def get_mem_info(self):
mem_dirs = f"./{PHONE_MODEL}/mem/"
mem_file_name = f"mem_" + PHONE_MODEL + "_" + PACKAGE_NAME + "_" + APP_VERSION
mem_field_names = [self.TIME, self.NATIVE_HEAP, self.DALVIK_HEAP]
writer = Utils.get_csv_writer(mem_dirs, mem_file_name, mem_field_names)
while RUNNING_TIMES != 0:
# native_info
native_info = os.popen(f'adb shell dumpsys meminfo {PACKAGE_NAME} | findstr "Native Heap"').read()
native_pss = format(int(re.findall(r"\d+", native_info)[0]) / 1000.0, ".2f")
# dalvik_info
dalvik_info = os.popen(f'adb shell dumpsys meminfo {PACKAGE_NAME} | findstr "Dalvik Heap"').read()
dalvik_pss = format(int(re.findall(r"\d+", dalvik_info)[0]) / 1000.0, ".2f")
writer.writerow(
{self.TIME: time.strftime("%H:%M:%S"), self.NATIVE_HEAP: native_pss, self.DALVIK_HEAP: dalvik_pss})
print(f"native_pss:【{native_pss} MB】")
print(f"dalvik_pss:【{dalvik_pss} MB】")
sleep(1)
sleep(0.5)
class NET(object):
TIME = "time"
DOWN_SPEED = "下载速度(KB/s)"
UP_SPEED = "上传速度(KB/s)"
AVERAGE_DOWN_SPEED = "平均下载速度(KB/s)"
AVERAGE_UP_SPEED = "平均上传速度(KB/s)"
TOTAL_DOWN_SPEED = "下载总流量(KB)"
TOTAL_UP_SPEED = "上传总流量(KB)"
def __init__(self):
self.is_first = True
self.last_time = 0
self.last_net_up = 0
self.last_net_down = 0
self.start_time = 0
self.start_net_up = 0
self.start_net_down = 0
def get_net_info(self):
t = threading.Thread(target=self.get_net_info)
t.start()
def get_net_info_start(self):
net_command = "adb shell cat /proc/" + Utils.get_pid_by_package_name(PACKAGE_NAME) + '/net/dev | findstr "wlan"'
net_dirs = f"./{PHONE_MODEL}/net/"
net_file_name = f"net_" + PHONE_MODEL + "_" + PACKAGE_NAME + "_" + APP_VERSION
net_field_names = [self.TIME,
self.DOWN_SPEED,
self.UP_SPEED,
self.AVERAGE_DOWN_SPEED,
self.AVERAGE_UP_SPEED,
self.TOTAL_DOWN_SPEED,
self.TOTAL_UP_SPEED]
writer = Utils.get_csv_writer(net_dirs, net_file_name, net_field_names)
while RUNNING_TIMES != 0:
if self.is_first:
self.start_time = time.time_ns()
self.last_time = self.start_time
net_info = os.popen(net_command).read()
net_array = re.split("\s+", net_info)
self.start_net_down = int(net_array[2])
self.last_net_down = self.start_net_down
self.start_net_up = int(net_array[10])
self.last_net_up = self.start_net_up
self.is_first = False
current_time = time.time_ns()
current_info = os.popen(net_command).read()
current_array = re.split("\s+", current_info)
current_net_down = int(current_array[2])
current_net_up = int(current_array[10])
time_delta = (current_time - self.last_time) / 1000000000.0
time_total = (current_time - self.start_time) / 1000000000.0
net_delta_up = (current_net_up - self.last_net_up) / 1024.0
net_delta_down = (current_net_down - self.last_net_down) / 1024.0
net_total_up = (current_net_up - self.start_net_up) / 1024.0
net_total_down = (current_net_down - self.start_net_down) / 1024.0
net_speed_up = net_delta_up / time_delta
net_speed_down = net_delta_down / time_delta
net_average_speed_up = net_total_up / time_total
net_average_speed_down = net_total_down / time_total
writer.writerow({self.TIME: time.strftime("%H:%M:%S"),
self.DOWN_SPEED: "{:.2f}".format(net_speed_down),
self.UP_SPEED: "{:.2f}".format(net_speed_up),
self.AVERAGE_DOWN_SPEED: "{:.2f}".format(net_average_speed_down),
self.AVERAGE_UP_SPEED: "{:.2f}".format(net_average_speed_up),
self.TOTAL_DOWN_SPEED: "{:.2f}".format(net_total_down),
self.TOTAL_UP_SPEED: "{:.2f}".format(net_total_up)
})
print("下载速度:{:.2f} KB/s".format(net_speed_down))
print("上传速度:{:.2f} KB/s".format(net_speed_up))
print("平均下载速度:{:.2f} KB/s".format(net_average_speed_down))
print("平均上传速度:{:.2f} KB/s".format(net_average_speed_up))
print("下载总流量:{:.0f} KB/s".format(net_total_down))
print("上传总流量:{:.0f} KB/s".format(net_total_up))
self.last_time = current_time
self.last_net_up = current_net_up
self.last_net_down = current_net_down
time.sleep(0.5)
if __name__ == '__main__':
user = User()
user.user_swipe_screen_start()
print("===========【获取FPS】===========")
fps = FPS()
fps.get_fps_info_start()
print("===========【获取CPU】===========")
cpu = CPU()
cpu.get_cpu_info_start()
print("===========【获取MEM】===========")
mem = MEM()
mem.get_mem_info_start()
print("===========【获取NET】===========")
net = NET()
net.get_net_info_start()