手头有一些华为的接入和汇聚交换机需要做定期的巡检,正好最近在学习python,于是乎按照网上前辈们的思路用python操作一番,主要用了pexpect、xlwt、xlrd这几个模块,由于pexpect在windows环境下执行有问题,所以下面的代码全程是在linux环境下执行。
直接上代码,如下:
import re
import os
import xlwt
import xlrd
import pexpect
import datetime
from xlwt import *
counter0 = 1 # 计数器,全局变量
counter1 = 1
counter2 = 1
class Tools:
@staticmethod
def directory_check(arg_path): # 待生成巡检文档路径检查,没有则创建
today = datetime.date.today().strftime('%Y%m%d')
path = arg_path + today + '/'
if not os.path.exists(path):
os.mkdir(path, 0o755)
return path
@staticmethod
def generator(arg_sheet, arg_n): # 生成器,读取保存有交换机信息的表格,返回交换机名称/用户名/ip/密码
i = 1
while i <= arg_n:
row_list = arg_sheet.row_values(i)
sw_sysname = row_list[1]
sw_username = row_list[2]
sw_ip = row_list[3]
sw_pwd = row_list[4]
i = i + 1
yield sw_sysname, sw_username, sw_ip, sw_pwd
@staticmethod
def get_filename(file_dir): # 获取已生成的交换机巡检文本文件,返回文件名列表
path = Tools.directory_check(file_dir)
txt_list = []
for files in os.walk(path):
for file in files:
for inspection_txt in file:
inspection_txt = ''.join(inspection_txt)
if os.path.splitext(inspection_txt)[-1] == ".txt":
txt_list.append(inspection_txt)
return txt_list
class Switch:
def __init__(self, system_name, user_name, ip_address, user_password):
self.system_name = system_name
self.user_name = user_name
self.user_password = user_password
self.ip_address = ip_address
def login(self, arg_path): # telnet登录交换机
path = Tools.directory_check(arg_path)
try:
self.child = pexpect.spawn('telnet %s' % self.ip_address)
self.child.logfile = open(path + '%s--%s--log.txt' % (self.system_name, self.ip_address), 'wb+')
self.child.expect('[Uu]sername:') # 提示输入用户名
self.child.sendline("%s" % self.user_name)
self.child.expect('[Pp]assword:') # 提示输入密码
self.child.sendline("%s" % self.user_password)
except Exception:
print("Some error occurred when telnet to the remote device!")
exit(1)
else:
return self.child
def inspect(self, command): # 执行巡检命令
sign = "---- More ----"
while True:
index = self.child.expect([sign, '<%s>' % self.system_name])
if index == 0:
self.child.send(" ")
continue
elif index == 1:
self.child.sendline(command)
break
class WorkBook:
def __init__(self):
self.power1_status = None
self.power2_status = None
self.power3_status = None
self.power4_status = None
self.up_time = None
self.temperature = None
self.fan_status = None
self.cpu_status = None
self.memory_status = None
self.eth_trunk_0 = None
self.eth_trunk_1 = None
self.eth_trunk_2 = None
self.eth_trunk_3 = None
self.eth_trunk_4 = None
self.eth_trunk_5 = None
self.eth_trunk_6 = None
self.log_counts = None
self.routingtable_counts = None
def matching(self, dir_txt): # 逐行遍历,正则匹配关键字
file = open(dir_txt, 'r+')
inspection_list = file.readlines()
i = 1
for line in inspection_list:
if re.search(r'0(\s+)PWR(I|1)(\s+)', line):
self.power1_status = line.split()[2] + ' ' + line.split()[4] + ' ' + line.split()[5]
if re.search(r'0(\s+)PWR(II|2)(\s+)', line):
self.power2_status = line.split()[2] + ' ' + line.split()[4] + ' ' + line.split()[5]
if re.search(r'1(\s+)PWR(I|1)(\s+)', line):
self.power3_status = line.split()[2] + ' ' + line.split()[4] + ' ' + line.split()[5]
if re.search(r'1(\s+)PWR(II|2)(\s+)', line):
self.power4_status = line.split()[2] + ' ' + line.split()[4] + ' ' + line.split()[5]
if ': uptime is' in line:
self.up_time = re.search(r': uptime is(.*)', line).group(1)
if 'Lower(C)' in line:
self.temperature = inspection_list[i + 2].split()[4]
if 'CurrentTemperature' in line:
self.temperature = inspection_list[i + 1].split()[1]
if 'FanID' in line:
self.fan_status = inspection_list[i + 1].split()[3]
if 'five minutes:' in line:
self.cpu_status = re.search(r'five minutes:(.*)', line, re.I).group(1)
if 'Memory Using Percentage Is:' in line:
self.memory_status = re.search(r'Memory Using Percentage Is:(.*)', line).group(1)
if re.match(r'Eth-Trunk0(\s+(up|down))', line):
self.eth_trunk_0 = line.split()[1] + ' ' + line.split()[2]
if re.match(r'Eth-Trunk1(\s+(up|down))', line):
self.eth_trunk_1 = line.split()[1] + ' ' + line.split()[2]
if re.match(r'Eth-Trunk2(\s+(up|down))', line):
self.eth_trunk_2 = line.split()[1] + ' ' + line.split()[2]
if re.match(r'Eth-Trunk3(\s+(up|down))', line):
self.eth_trunk_3 = line.split()[1] + ' ' + line.split()[2]
if re.match(r'Eth-Trunk4(\s+(up|down))', line):
self.eth_trunk_4 = line.split()[1] + ' ' + line.split()[2]
if re.match(r'Eth-Trunk5(\s+(up|down))', line):
self.eth_trunk_5 = line.split()[1] + ' ' + line.split()[2]
if re.match(r'Eth-Trunk6(\s+(up|down))', line):
self.eth_trunk_6 = line.split()[1] + ' ' + line.split()[2]
if 'Current messages :' in line:
self.log_counts = line.split()[3]
if 'Routes :' in line:
self.routingtable_counts = line.split()[5]
i += 1
def setting_style(self): # 输出巡检表格的样式设置
self.workbook = xlwt.Workbook()
self.style = XFStyle()
pattern = Pattern()
pattern.pattern = Pattern.SOLID_PATTERN
pattern.pattern_fore_colour = Style.colour_map['blue']
self.style.pattern = pattern
borders = xlwt.Borders()
borders.left = 1
borders.right = 1
borders.top = 1
borders.bottom = 1
self.style.borders = borders
self.style1 = XFStyle()
borders = xlwt.Borders()
borders.left = 1
borders.right = 1
borders.top = 1
borders.bottom = 1
self.style1.borders = borders
self.style3 = XFStyle()
borders = xlwt.Borders()
borders.left = 1
borders.right = 1
borders.top = 1
borders.bottom = 1
self.style3.borders = borders
al = xlwt.Alignment()
al.horz = 0x02
al.vert = 0x01
self.style3.alignment = al
self.SwitchCheck = self.workbook.add_sheet('SwitchCheck', cell_overwrite_ok=True)
first_col = self.SwitchCheck.col(0)
sec_col = self.SwitchCheck.col(1)
thr_col = self.SwitchCheck.col(2)
for_col = self.SwitchCheck.col(3)
first_col.width = 150 * 25
sec_col.width = 100 * 25
thr_col.width = 120 * 25
for_col.width = 320 * 25
def write_workbook_header(self): # 输出巡检表格的表头
self.SwitchCheck.write(0, 0, '设备名称', self.style)
self.SwitchCheck.write(0, 1, '管理IP', self.style)
self.SwitchCheck.write(0, 2, '巡检项目', self.style)
self.SwitchCheck.write(0, 3, '巡检结果', self.style)
def write_workbook_body(self, name, ip): # 输出巡检表格的详细内容
global counter0
global counter1
global counter2
checklist_head = [
'电源0-1',
'电源0-2',
'电源1-1',
'电源1-2',
'运行时间',
'运行温度',
'风扇',
'CPU利用率',
'内存利用率',
'聚合端口0',
'聚合端口1',
'聚合端口2',
'聚合端口3',
'聚合端口4',
'聚合端口5',
'聚合端口6',
'日志条目',
'路由表条目',
]
checklist_content = [
self.power1_status,
self.power2_status,
self.power3_status,
self.power4_status,
self.up_time,
self.temperature,
self.fan_status,
self.cpu_status,
self.memory_status,
self.eth_trunk_0,
self.eth_trunk_1,
self.eth_trunk_2,
self.eth_trunk_3,
self.eth_trunk_4,
self.eth_trunk_5,
self.eth_trunk_6,
self.log_counts,
self.routingtable_counts
]
for check_item_h in checklist_head:
self.SwitchCheck.write(counter1, 2, check_item_h, self.style1)
counter1 += 1
for check_item_c in checklist_content:
self.SwitchCheck.write(counter2, 3, check_item_c, self.style1)
counter2 += 1
self.SwitchCheck.write_merge(counter0, counter1-1, 0, 0, name, self.style3)
self.SwitchCheck.write_merge(counter0, counter1-1, 1, 1, ip, self.style3)
counter0 = counter0 + len(checklist_head)
if __name__ == "__main__":
cmd_list = [
"display cpu-usage",
"display memory",
"display temperature all",
"display environment",
"display fan",
"display power",
"display ip routing-table",
"display interface brief",
"dis version",
"display log",
"quit"
] # 巡检命令列表
log_path = "/home/test/inspection/" # 生成巡检日志文件的路径
excel_path = '/home/test/sw_info.xlsx' # 存放交换机登录信息的路径
excel = xlrd.open_workbook(excel_path) # 打开excel
sheet = excel.sheet_by_index(0) # 获取第一个sheet数据
rows = sheet.nrows # 获取sheet行数
gen = Tools.generator(sheet, rows - 1) # 实例化上面Tools类中定义的生成器
for item in gen:
sw = Switch(item[0], item[1], item[2], item[3])
sw.login(log_path)
for cmd in cmd_list:
sw.inspect(cmd)
list_target = Tools.get_filename(log_path)
WB = WorkBook()
WB.setting_style()
for item in list_target:
sw_name = item.split('--')[0]
sw_ip = item.split('--')[1]
WB.matching(Tools.directory_check(log_path) + item)
WB.write_workbook_header()
WB.write_workbook_body(sw_name, sw_ip)
WB.workbook.save('%sSwitchInspection.xls' % log_path)
说明:
1、程序会自动读取存放有交换机登录信息的excel表格,表格具体路径为 "/home/test/sw_info.xlsx";
从表格中获取交换机的ip、sysname、username、password信息,然后telnet到目标交换机,执行相关巡检命令;
表格中待巡检交换机的信息按照固定格式,逐行填写,如下:
如图中所示,表格中的交换机名称对应代码中的system_name, 用户名对应user_name, IP对应ip_address, 密码对应user_password,按照格式填写即可;
2、程序逐个telnet到交换机上执行完巡检命令后,会在 "/home/test/inspection/当前日期/" 的路径下按照不同交换机逐个生成以交换机名称和ip地址命名的txt文件,保存巡检命令执行的结果记录,如Huawei-sw-01--192.168.x.x--log.txt;
3、程序逐个读取上面的txt巡检结果文件,正则匹配关键字来获取不同巡检项目的结果数据,并填入新生成的excel表格中,新生成的excel表格路径为 "/home/test/inspection/SwitchInspection.xls"。
4、最终效果如下:
多个交换机的巡检结果会自动向下生成。
5、最后
代码总体上参考了下面这位前辈的处理思路,我按照面向对象的思想改了下代码块,整体来说写的比较乱,面向对象学得还是不到家,大家将就看看吧
参考链接:https://blog.csdn.net/weixin_42551826/article/details/81625375