Locust作为开源的压力测试工具,这几年备受关注,用法已经被大神们开发的差不多了。最近版本大更新,我重新学习一些基础用法,却发现参数化这一块的代码不能满足我的需求。所以这次我从一个小白的角度出发,争取用简单的方式,实现Locust获取外部csv数据并参数化。
在直接编写代码之前,要了解两个库:pandas与queue。(想直接看代码麻烦手动跳到最后。)
pandas——Python的数据分析支持库
Python Data Analysis Library 或 pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。Pandas 纳入了大量库和一些标准的数据模型,提供了高效地操作大型数据集所需的工具。
简言之,pandas高效,方便,好用。
它有两种独有的数据结构:Series 和 DataFrame。
- Series:与Python基本的数据结构List相近,Series中只允许存储相同的数据类型
- DataFrame:二维的表格型数据结构。很多功能与R中的data.frame类似。可以将DataFrame理解为Series的容器。
在获取csv数据时,我使用了DataFrame数据结构,这种表格形式更方便理解与操作,下面是基本用法:
from pandas import Series,DataFrame
# 使用dict定义DataFrame
data = {"name":['google','baidu','yahoo'],"marks":[100,200,300],"price":[1,2,3]}
# 读取csv文件
reader = pandas.read_csv(file_path)
# 将数据转换为DataFrame格式
df = pd.DataFrame(reader)
# 遍历行数据
for index, row in dFrame.iterrows():
print(index, row[i]) i为列序号,从0开始
queue——一种同步的队列类
队列queue 多应用在多线程应用中,多线程访问共享变量。对于多线程而言,访问共享变量时,队列queue是线程安全的。从queue队列的具体实现中,可以看出queue使用了1个线程互斥锁(pthread.Lock()),以及3个条件标量(pthread.condition()),来保证了线程安全。【线程安全:一个函数,当且仅当被多个并发线程反复调用时,能够一直产生正确的结果,才能够被称为线程安全的(thread-safe)】
我主要使用queue.Queue来实例化参数队列,将数据put到对列中,再从队列中get出来,传给相应请求。下面为基本用法:
import queue
# 实例化一个队列
queue_data = queue.Queue()
# put数据
queue_data.put_nowait(user)
# get数据
queue_data.get_nowait(user)
put_nowait()与get_nowait()实际为block=False
的put()与get(),消息队列如果没有空间可写入/获取,则会立刻抛出“queue.Full/queue.Empty”异常
pandas与queue结合,实现参数化
import queue
import pandas as pd
import sys
from locust import task, HttpUser, between, TaskSet
# 预防栈溢出,设置嵌套层数上限为10000
sys.setrecursionlimit(10000)
# 测试数据csv文件路径
file_path = "test_data.csv"
# 读取csv文件
reader = pd.read_csv(file_path)
# 将数据转换为DataFrame格式
df = pd.DataFrame(reader)
# 定义csv文件的单列数据获取函数
def queue_data(dFrame, i, **kwargs):
data = queue.Queue() # 先进先出
# queue_data = queue.LifoQueue() 后进先出
for index, row in dFrame.iterrows():
# print(index, row[i]) i为列序号,从0开始
try:
data.put_nowait(row[i])
except queue.Full:
print("队列溢出")
return data
class MyTaskSet(TaskSet):
# 在task中使用queue数据
@task
def task1(self):
# 使用queue中的数据进行参数化
url = "/postData"
try:
row0_data = self.user.row0.get_nowait()
row1_data = self.user.row1.get_nowait()
payload = {"row0": row0_data, "row1": row1_data}
print(payload)
res = self.client.post(url, json=payload, name="使用queue中的数据进行参数化")
except queue.Empty:
# 队列取空后退出
print("queue is empty")
exit()
class MyUser(HttpUser):
host = "http://example.com"
tasks = [MyTaskSet]
wait_time = between(1, 3)
# 获取单列数据放入对应队列中
row0 = queue_data(df, 0)
row1 = queue_data(df, 1)