Android 实时监控CPU、MEM、FPS、NET信息

1、需求

在用户操作手机时(或模拟用户操作手机),实时检测手机CPU、FPS、MEM、NET信息,确实此数据变化是否异常

2、环境

Win7+Python3+Appium

3、代码目录说明

4、运行方式

4.1 连接手机,并导入相关第三方库(pip )

4.2 修改全局变量PACKAGE_NAME、ACTIVITY_NAME的值(即确认被测应用的包名和类名)

4.3 重写User类的user_swipe_screen()方法,为读取设备相关信息做准备

4.4 运行main函数,并输入模拟用户操作的次数

5、结果说明

5.1 测试完成后,会在main.py同级目录生成下面文件

5.2 可以打开csv文件并插入折线图

源代码:

# -*- coding: utf-8 -*-
# @Time    : 2021/8/18 16:50
# @Author  : Shuangqi
# @File    : main.py
import csv
import os
import re
import threading
import time
from time import sleep
from selenium.webdriver.support.wait import WebDriverWait
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from appium import webdriver

# 手机型号
PHONE_MODEL = os.popen("adb shell getprop ro.product.model").read().strip('\n')
# 安卓版本
ANDROID_VERSION = os.popen("adb shell getprop ro.build.version.release").read().strip('\n')

# Package/Activity
PACKAGE_NAME = "com.ss.android.article.news"
ACTIVITY_NAME = "com.ss.android.article.news.activity.MainActivity"

# 应用版本号
APP_VERSION = os.popen(f'adb shell pm dump {PACKAGE_NAME} | findstr "versionName"').read().strip("\n").split("=")[1]

print("================================")
print("【测试信息】")
print(f"【机型】:{PHONE_MODEL}")
print(f"【安卓】:Android {ANDROID_VERSION}")
print(f"【包名】:{PACKAGE_NAME}")
print(f"【类名】:{ACTIVITY_NAME}")
print(f"【版本】:{APP_VERSION}")
print("================================")

# 测试运行次数(即滑动次数)
RUNNING_TIMES = int(input("请输入滑动测试次数:"))


class Utils(object):
    # 根据包名获取Pid
    @staticmethod
    def get_pid_by_package_name(pkg):
        pid = os.popen(f'adb shell ps | findstr "{pkg}"').read()
        ps_info = re.findall("\S+", pid)
        return ps_info[1]

    # 写入CSV
    @staticmethod
    def get_csv_writer(dirs, file_name, field_names):
        if not os.path.exists(dirs):
            os.makedirs(dirs)
        now_time = time.strftime('%Y%m%d-%H%M%S', time.localtime(time.time()))
        file_path = dirs + file_name + "_" + now_time + ".csv"
        mem_csv = open(file_path, 'w', newline='', encoding="GBK")
        writer = csv.DictWriter(mem_csv, fieldnames=field_names)
        writer.writeheader()
        return writer


class Phone(object):

    def __init__(self):
        desired_caps = {
            'platformName': 'Android',  # 平台名称
            'deviceName': 'test',  # 设备名称(任写)
            'platformVersion': ANDROID_VERSION,  # 安卓版本
            'appPackage': PACKAGE_NAME,  # 启动包名
            'appActivity': ACTIVITY_NAME,  # 启动 Acclivity
            'noReset': False,  # False即重置应用
            'newCommandTimeout': 60000  # 超时时间(一分钟)
        }
        if ANDROID_VERSION == "11":
            # Android 11 添加此参数
            desired_caps["automationName"] = "uiautomator2"
        # 开始初始化
        print("Appium 正在初始化...")
        # 启动服务/通过4723端口来建立一个会话
        self.driver = webdriver.Remote('http://localhost:4723/wd/hub', desired_caps)
        # 隐式等待15s
        self.driver.implicitly_wait(15)
        # 获取屏幕宽度
        self.window_width = self.driver.get_window_size()['width']
        # 获取屏幕高度
        self.window_height = self.driver.get_window_size()['height']
        print("Appium 初始化完成...")

    # 等待元素
    def wait_element(self, way, value, timeout=10, poll_frequency=0.5):
        """
        :param way:         定位方式
        :param value:       值
        :param timeout:     超时时长(s)
        :param poll_frequency: 刷新频率(s)
        :return:
        """
        try:
            if way == "text":
                WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency).until(
                    lambda x: x.find_element_by_android_uiautomator('new UiSelector().text("%s")' % value),
                    message=f'【text:{value}】等待超时')
            elif way == "id":
                WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency).until(
                    lambda x: x.find_element_by_id(value), message=f'【id:{value}】等待超时')
            elif way == "desc":
                WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency).until(
                    lambda x: x.find_element_by_accessibility_id(value), message=f'【desc:{value}】等待超时')
            # xpath参数示例://*[@text="xxx"]
            elif way == "xpath":
                WebDriverWait(self.driver, timeout=timeout, poll_frequency=poll_frequency).until(
                    lambda x: x.find_element_by_xpath(value), message=f'【xpath:{value}】等待超时')
            else:
                raise TypeError(f"无此定位元素方式:{way},定位方式支持:text/id/desc/xpath")
        except TimeoutException:
            raise

    # 查找元素
    def find_element(self, way, value):
        """
        :param way:     定位方式
        :param value:   值
        :return:    返回元素
        """
        try:
            if way == "text":
                # return self.driver.find_element_by_name(value)   已经凉了
                return self.driver.find_element_by_android_uiautomator('new UiSelector().text("%s")' % value)
            elif way == "id":
                return self.driver.find_element_by_id(value)
            elif way == "desc":
                return self.driver.find_element_by_accessibility_id(value)
            # xpath参数示例://*[@text="xxx"]
            elif way == "xpath":
                return self.driver.find_element_by_xpath(value)
            else:
                raise TypeError(f"无此定位元素方式:{way},定位方式支持:text/id/desc/xpath")
        except NoSuchElementException:
            raise

    # 滑动屏幕(根据屏幕宽度,不需要坐标点)
    def swipe_screen(self, swipe_type):
        """
        :param swipe_type: 滑动方式(左/右/上/下/)
        :return:
        """
        if swipe_type == "down":
            self.driver.swipe(self.window_width / 2, self.window_height / 4, self.window_width / 2,
                              3 * self.window_height / 4)
        elif swipe_type == "up":
            self.driver.swipe(self.window_width / 2, 3 * self.window_height / 4, self.window_width / 2,
                              self.window_height / 4)
        elif swipe_type == "left":
            self.driver.swipe(self.window_width / 4, self.window_height / 2, 3 * self.window_width / 4,
                              self.window_height / 2)
        elif swipe_type == "right":
            self.driver.swipe(3 * self.window_width / 4, self.window_height / 2, self.window_width / 4,
                              self.window_height / 2)
        else:
            raise TypeError(f"参数错误:{swipe_type},传入参数:down/up/left/right")


class User(Phone):

    def user_swipe_screen_start(self):
        # 调用一个线程滑动屏幕
        t = threading.Thread(target=self.user_swipe_screen)
        t.start()

    def user_swipe_screen(self):
        global RUNNING_TIMES
        # 进入app的准备工作
        try:
            self.wait_element("text", "同意")
            self.find_element("text", "同意").click()
            self.wait_element("text", "允许")
            self.find_element("text", "允许").click()
            # 低配置机器提示 开启流畅模式
            self.find_element("text", "开启").click()
            self.wait_element("text", "流畅模式")
            self.driver.keyevent(4)
            self.wait_element("text", "首页")
            # 滑动屏幕
            while True:
                self.swipe_screen("up")
                # print(f"第{i}次滑动屏幕!")
                RUNNING_TIMES -= 1
                print(f"用户滑动次数:{RUNNING_TIMES}")
                if RUNNING_TIMES == 0:
                    break
        except NoSuchElementException:
            raise
        finally:
            sleep(5)
            self.driver.quit()
            print("Driver成功退出!")


class FPS(object):
    TIME = "time"
    FPS = "FPS"

    def __init__(self):
        self.is_first = True
        self.last_time = 0.0
        self.last_fps = 0
        self.is_running = True

    def get_fps_info_start(self):
        t = threading.Thread(target=self.get_fps_info)
        t.start()

    def get_fps_info(self):
        fps_dirs = f"./{PHONE_MODEL}/fps/"
        fps_file_name = f"fps_" + PHONE_MODEL + "_" + PACKAGE_NAME + "_" + APP_VERSION
        fps_field_names = [self.TIME, self.FPS]
        fps_command = f'adb shell dumpsys gfxinfo {PACKAGE_NAME} | findstr "Total frames"'
        writer = Utils.get_csv_writer(fps_dirs, fps_file_name, fps_field_names)
        while RUNNING_TIMES != 0:
            if self.is_first:
                self.last_time = time.time_ns()
                self.last_fps = int(re.findall("\d+", os.popen(fps_command).read())[0])
                self.is_first = False
            current_time = time.time_ns()
            current_fps = int(re.findall("\d+", os.popen(fps_command).read())[0])
            time_delta = (current_time - self.last_time) / 1000000000.0
            fps_delta = current_fps - self.last_fps
            self.last_time = current_time
            self.last_fps = current_fps
            now_fps = "{:.2f}".format(fps_delta / time_delta)
            writer.writerow({self.TIME: time.strftime("%H:%M:%S"), self.FPS: now_fps})
            # 间隔0.5s获取数据
            print(f"FPS:【{now_fps}】")
            sleep(0.5)


class CPU(object):
    TIME = "time"
    CPU_RATE = "进程CPU占比(%)"

    def get_cpu_info_start(self):
        t = threading.Thread(target=self.get_cpu_info)
        t.start()

    # 总的CPU信息
    def get_cpu_usage(self):
        cpu_usage = 0.0
        cpu_path = "/proc/stat"
        info = os.popen(f'adb shell cat {cpu_path}').read()
        result = re.split("\s+", info.split("\n")[0])
        for i in range(2, len(result)):
            cpu_usage += float(result[i])
        return cpu_usage

    # 通过ID获取cpu信息(例如此时的今日头条进程id为10808)
    def get_process_cpu_usage(self):
        pid = Utils.get_pid_by_package_name(PACKAGE_NAME)
        cpu_path = "/proc/" + pid + "/stat"
        info = os.popen(f'adb shell cat {cpu_path}').read()
        result = re.split("\s+", info)
        # 进程总的cpu占用
        cpu_usage = float(result[13]) + float(result[14])
        return cpu_usage

    # 计算CPU占用
    def get_cpu_info(self):
        cpu_dirs = f"./{PHONE_MODEL}/cpu/"
        cpu_file_name = f"cpu_" + PHONE_MODEL + "_" + PACKAGE_NAME + "_" + APP_VERSION
        cpu_field_names = [self.TIME, self.CPU_RATE]
        writer = Utils.get_csv_writer(cpu_dirs, cpu_file_name, cpu_field_names)
        while RUNNING_TIMES != 0:
            start_all_cpu = self.get_cpu_usage()
            start_p_cpu = self.get_process_cpu_usage()
            sleep(1)
            end_all_cpu = self.get_cpu_usage()
            end_p_cpu = self.get_process_cpu_usage()
            cpu_rate = 0.0
            if (end_all_cpu - start_all_cpu) != 0:
                cpu_rate = (end_p_cpu - start_p_cpu) * 100.00 / (
                        end_all_cpu - start_all_cpu)
                if cpu_rate < 0:
                    cpu_rate = 0.00
                elif cpu_rate > 100:
                    cpu_rate = 100.00
            writer.writerow({self.TIME: time.strftime("%H:%M:%S"), self.CPU_RATE: format(cpu_rate, ".2f")})
            print(f"CPU:【{format(cpu_rate, '.2f')}%】")
            sleep(0.5)


class MEM(object):
    TIME = "time"
    NATIVE_HEAP = "Native Heap(MB)"
    DALVIK_HEAP = "Dalvik Heap(MB)"

    def get_mem_info_start(self):
        t = threading.Thread(target=self.get_mem_info)
        t.start()

    def get_mem_info(self):
        mem_dirs = f"./{PHONE_MODEL}/mem/"
        mem_file_name = f"mem_" + PHONE_MODEL + "_" + PACKAGE_NAME + "_" + APP_VERSION
        mem_field_names = [self.TIME, self.NATIVE_HEAP, self.DALVIK_HEAP]
        writer = Utils.get_csv_writer(mem_dirs, mem_file_name, mem_field_names)
        while RUNNING_TIMES != 0:
            # native_info
            native_info = os.popen(f'adb shell dumpsys meminfo {PACKAGE_NAME} | findstr "Native Heap"').read()
            native_pss = format(int(re.findall(r"\d+", native_info)[0]) / 1000.0, ".2f")
            # dalvik_info
            dalvik_info = os.popen(f'adb shell dumpsys meminfo {PACKAGE_NAME} | findstr "Dalvik Heap"').read()
            dalvik_pss = format(int(re.findall(r"\d+", dalvik_info)[0]) / 1000.0, ".2f")
            writer.writerow(
                {self.TIME: time.strftime("%H:%M:%S"), self.NATIVE_HEAP: native_pss, self.DALVIK_HEAP: dalvik_pss})
            print(f"native_pss:【{native_pss} MB】")
            print(f"dalvik_pss:【{dalvik_pss} MB】")
            sleep(1)
            sleep(0.5)


class NET(object):
    TIME = "time"
    DOWN_SPEED = "下载速度(KB/s)"
    UP_SPEED = "上传速度(KB/s)"
    AVERAGE_DOWN_SPEED = "平均下载速度(KB/s)"
    AVERAGE_UP_SPEED = "平均上传速度(KB/s)"
    TOTAL_DOWN_SPEED = "下载总流量(KB)"
    TOTAL_UP_SPEED = "上传总流量(KB)"

    def __init__(self):
        self.is_first = True
        self.last_time = 0
        self.last_net_up = 0
        self.last_net_down = 0
        self.start_time = 0
        self.start_net_up = 0
        self.start_net_down = 0

    def get_net_info(self):
        t = threading.Thread(target=self.get_net_info)
        t.start()

    def get_net_info_start(self):
        net_command = "adb shell cat /proc/" + Utils.get_pid_by_package_name(PACKAGE_NAME) + '/net/dev | findstr "wlan"'
        net_dirs = f"./{PHONE_MODEL}/net/"
        net_file_name = f"net_" + PHONE_MODEL + "_" + PACKAGE_NAME + "_" + APP_VERSION
        net_field_names = [self.TIME,
                           self.DOWN_SPEED,
                           self.UP_SPEED,
                           self.AVERAGE_DOWN_SPEED,
                           self.AVERAGE_UP_SPEED,
                           self.TOTAL_DOWN_SPEED,
                           self.TOTAL_UP_SPEED]
        writer = Utils.get_csv_writer(net_dirs, net_file_name, net_field_names)
        while RUNNING_TIMES != 0:
            if self.is_first:
                self.start_time = time.time_ns()
                self.last_time = self.start_time
                net_info = os.popen(net_command).read()
                net_array = re.split("\s+", net_info)
                self.start_net_down = int(net_array[2])
                self.last_net_down = self.start_net_down
                self.start_net_up = int(net_array[10])
                self.last_net_up = self.start_net_up
                self.is_first = False
            current_time = time.time_ns()
            current_info = os.popen(net_command).read()
            current_array = re.split("\s+", current_info)
            current_net_down = int(current_array[2])
            current_net_up = int(current_array[10])
            time_delta = (current_time - self.last_time) / 1000000000.0
            time_total = (current_time - self.start_time) / 1000000000.0
            net_delta_up = (current_net_up - self.last_net_up) / 1024.0
            net_delta_down = (current_net_down - self.last_net_down) / 1024.0
            net_total_up = (current_net_up - self.start_net_up) / 1024.0
            net_total_down = (current_net_down - self.start_net_down) / 1024.0
            net_speed_up = net_delta_up / time_delta
            net_speed_down = net_delta_down / time_delta
            net_average_speed_up = net_total_up / time_total
            net_average_speed_down = net_total_down / time_total
            writer.writerow({self.TIME: time.strftime("%H:%M:%S"),
                             self.DOWN_SPEED: "{:.2f}".format(net_speed_down),
                             self.UP_SPEED: "{:.2f}".format(net_speed_up),
                             self.AVERAGE_DOWN_SPEED: "{:.2f}".format(net_average_speed_down),
                             self.AVERAGE_UP_SPEED: "{:.2f}".format(net_average_speed_up),
                             self.TOTAL_DOWN_SPEED: "{:.2f}".format(net_total_down),
                             self.TOTAL_UP_SPEED: "{:.2f}".format(net_total_up)
                             })
            print("下载速度:{:.2f} KB/s".format(net_speed_down))
            print("上传速度:{:.2f} KB/s".format(net_speed_up))
            print("平均下载速度:{:.2f} KB/s".format(net_average_speed_down))
            print("平均上传速度:{:.2f} KB/s".format(net_average_speed_up))
            print("下载总流量:{:.0f} KB/s".format(net_total_down))
            print("上传总流量:{:.0f} KB/s".format(net_total_up))
            self.last_time = current_time
            self.last_net_up = current_net_up
            self.last_net_down = current_net_down
            time.sleep(0.5)


if __name__ == '__main__':
    user = User()
    user.user_swipe_screen_start()
    print("===========【获取FPS】===========")
    fps = FPS()
    fps.get_fps_info_start()
    print("===========【获取CPU】===========")
    cpu = CPU()
    cpu.get_cpu_info_start()
    print("===========【获取MEM】===========")
    mem = MEM()
    mem.get_mem_info_start()
    print("===========【获取NET】===========")
    net = NET()
    net.get_net_info_start()

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,378评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,356评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,702评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,259评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,263评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,036评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,349评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,979评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,469评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,938评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,059评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,703评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,257评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,262评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,501评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,792评论 2 345

推荐阅读更多精彩内容