问题
我们希望在windows或者linux上,可以使用ssh连接远程服务器,并且能够执行一般的linux命令,同时还要能够有一定交互能力。比如需要切换root用户,输入管理员用户密码等。
解决方案
Python的paramiko库,可以支持。但实现也有挺多问题需要考虑。主要有以下几点内容:
- 命令执行,能够获取命令结果
- 命令执行,能够支持指定的预期结果
- 命令执行,要有超时能力,不能挂死。
用法1:
ssh = Ssh2Client('127.0.0.1', 22)
ssh.connect('root', 'xxxx')
result = ssh.exec('pwd')
print(result)
用法2:
ssh = Ssh2Client('127.0.0.1', 22)
ssh.connect('user-name', 'user-pwd')
ssh.exec('sudo su -', 'Password:')
ssh.exec('root-pwd')
ssh.exec('ls -l /var/root')
代码实现如下所示:
import re
import socket
import time
import paramiko
class Ssh2Client:
def __init__(self, host: str, port: int):
self.__host = host
self.__port = port
self.__ssh = None
self.__channel = None
def __del__(self):
self.__close()
def connect(self, user: str, pwd: str) -> bool:
self.__close()
self.__ssh = paramiko.SSHClient()
self.__ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.__ssh.connect(self.__host, username=user, password=pwd, port=self.__port)
return True
def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% '), timeout=30) -> str:
if not self.__channel:
self.__channel = self.__ssh.invoke_shell()
time.sleep(0.020)
self.__channel.recv(4096).decode()
if cmd.endswith('\n'):
self.__channel.send(cmd)
else:
self.__channel.send(cmd + '\n')
return self.__recv(self.__channel, end_str, timeout)
def __recv(self, channel, end_str, timeout) -> str:
result = ''
out_str = ''
max_wait_time = timeout * 1000
channel.settimeout(0.05)
while max_wait_time > 0:
try:
out = channel.recv(1024 * 1024).decode()
if not out or out == '':
continue
out_str = out_str + out
match, result = self.__match(out_str, end_str)
if match is True:
return result.strip()
else:
max_wait_time -= 50
except socket.timeout:
max_wait_time -= 50
raise Exception('recv data timeout')
def __match(self, out_str: str, end_str: list) -> (bool, str):
result = out_str
for it in end_str:
if result.endswith(it):
return True, result
return False, result
def __close(self):
if not self.__ssh:
return
self.__ssh.close()
self.__ssh = None
讨论
我们使用用法1,输出类似如下格式(用户名做了处理):
pwd
/Users/user1
[xxx:~] xxx%
这里有两个问题要处理,命令和命令提示符都一并输出了。我们需要做特殊处理。处理方法也很简单,第一行和最后一行直接去掉即可,同时考虑命令无结果输出的处理即可。修改exec方法如下:
def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% '), timeout=30) -> str:
# ...
# 以下是新增的代码
result = self.__recv(self.__channel, end_str, timeout)
begin_pos = result.find('\r\n')
end_pos = result.rfind('\r\n')
if begin_pos == end_pos:
return ''
return result[begin_pos + 2:end_pos]
现状输出结果就正确了,这个就是我们想要的结果。
/Users/user1
偶然的机会,测试输入的命令比较长,取得结果又不正确了。比如执行
ssh.exec('echo 0111111111111111111111111111111111112222222222222222222222222222233333333333333333333333444444444444444')
输出结果,有的服务器,会返回下面这个奇怪的结果:
2222222222233333333333333333333333444444444444444
ls: 0111111111111111111111111111111111112222222222222222222222222222233333333333333333333333444444444444444: No such file or directory
这个问题的原因,主要是因为ssh2输出时使用了窗口的概念,默认是80*24,输入命令如果超过长度,会自动换行,导致处理命令结果时出错,主要修改invoke_shell函数调用方式,代码如下:
def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% '), timeout=30) -> str:
if not self.__channel:
# width和height,可以指定输出窗口的大小。
self.__channel = self.__ssh.invoke_shell(term='xterm', width=4096)
time.sleep(0.020)
self.__channel.recv(4096).decode()
# ....
命令窗口的宽度设置为4096,输出结果就对了。不过如果命令超过4096,输出还会出问题,根据实际情况,设置width的值,可以设置更大一点。
ls: 0111111111111111111111111111111111112222222222222222222222222222233333333333333333333333444444444444444: No such file or directory
到目前为止,已经基本够用了。但是还有一个问题,试用ls命令返回的结果,有一些奇怪的转义字符,比如:
�[1;34mCacheVolume�[0m �[1;34mbin�[0m �[1;34mboot�[0m �[1;34mdev�[0m �[1;34metc�[0m �[1;34mhome�[0m �[1;34mlib�[0m �[1;36mlinuxrc�[0m �[1;34mlost+found�[0m �[1;34mmnt�[0m �[1;36mnfs�[0m �[1;34mopt�[0m �[1;
这个问题的处理比较麻烦,处理了很久也不行。开始使用字符串分析处理,忽略这些转义符,但总是有点麻烦,处理不够彻底。后来终于在网上搜索到,这个转义是叫ansi转义码,可以在term上显示彩色。网上给出了正则处理方法:
# 7-bit C1 ANSI sequences
self.__ansi_escape = re.compile(r'''
\x1B # ESC
(?: # 7-bit C1 Fe (except CSI)
[@-Z\\-_]
| # or [ for CSI, followed by a control sequence
\[
[0-?]* # Parameter bytes
[ -/]* # Intermediate bytes
[@-~] # Final byte
)
''', re.VERBOSE)
def __match(self, out_str: str, end_str: list) -> (bool, str):
result = self.__ansi_escape.sub('', out_str)
for it in end_str:
if result.endswith(it):
return True, result
return False, result
正则表达式比较复杂,有兴趣的同学自己分析这个re。
到目前为止,Ssh2Client已经基本实现,而且比较实用。可以处理绝大多数问题,实现也不复杂,比网上很多帖子都讲得全一些,代码可以直接拿来用。
但也不并是全部问题都能解决。比如有的linux系统,命令输出会出现换行,中文处理,都容易会导致输出结果获取不正确。不过,这些基本就是字符串分析和解码问题了。
完整的代码如下:
import re
import socket
import time
import paramiko
class Ssh2Client:
"""
ssh2客户端封装
"""
def __init__(self, host: str, port: int):
"""
功能描述:构造函数
:param host: 主机地址
:param port: 端口信息
"""
self.__host = host
self.__port = port
self.__ssh = None
self.__channel = None
# 7-bit C1 ANSI sequences
self.__ansi_escape = re.compile(r'''
\x1B # ESC
(?: # 7-bit C1 Fe (except CSI)
[@-Z\\-_]
| # or [ for CSI, followed by a control sequence
\[
[0-?]* # Parameter bytes
[ -/]* # Intermediate bytes
[@-~] # Final byte
)
''', re.VERBOSE)
def __del__(self):
self.__close()
def connect(self, user: str, pwd: str) -> bool:
"""
功能描述:连接远程主机
:param user: 用户名
:param pwd: 用户密码
:return: 连接成功还是失败
"""
self.__close()
self.__ssh = paramiko.SSHClient()
self.__ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.__ssh.connect(self.__host, username=user, password=pwd, port=self.__port)
return True
def exec(self, cmd: str, end_str=('# ', '$ ', '? ', '% ', '#', '$', '?', '%'), timeout=5) -> str:
"""
功能描述:执行命令
:param cmd: shell命令
:param end_str: 提示符
:param timeout: 超时间时间
:return: 命令执行结果
"""
if not self.__channel:
self.__channel = self.__ssh.invoke_shell(term='xterm', width=4096, height=48)
time.sleep(0.1)
self.__channel.recv(4096).decode()
if cmd.endswith('\n'):
self.__channel.send(cmd)
else:
self.__channel.send(cmd + '\n')
if end_str is None:
return self.__recv_without_end(cmd, timeout)
result = self.__recv(end_str, timeout)
begin_pos = result.find('\r\n')
end_pos = result.rfind('\r\n')
if begin_pos == end_pos:
return ''
return result[begin_pos + 2:end_pos]
def __recv_without_end(self, cmd, timeout):
"""
功能描述:接收命令执行结果,不进行任何比对。
:param cmd: 命令
:param timeout:超时时间,最长等待3秒
:return: 命令执行结果
"""
out_str = ''
if timeout > 3:
timeout = 3
max_wait_time = timeout * 1000
self.__channel.settimeout(0.1)
while max_wait_time > 0.0:
try:
start = time.perf_counter()
out = self.__channel.recv(1024 * 1024).decode()
out_str = out_str + out
max_wait_time = max_wait_time - (time.perf_counter() - start) * 1000
except socket.timeout:
max_wait_time -= 100
return out_str
def __recv(self, end_str, timeout) -> str:
"""
功能描述:根据提示符,接收命令执行结果
:param end_str: 预期结果结尾
:param timeout: 超时间
:return: 命令执行结果,去除命令输入提示符
"""
out_str = ''
max_wait_time = timeout * 1000
self.__channel.settimeout(0.05)
while max_wait_time > 0.0:
start = time.perf_counter()
try:
out = self.__channel.recv(1024 * 1024).decode()
if not out or out == '':
continue
out_str = out_str + out
match, result = self.__match(out_str, end_str)
if match is True:
return result.strip()
else:
max_wait_time = max_wait_time - (time.perf_counter() - start) * 1000
except socket.timeout:
max_wait_time = max_wait_time - (time.perf_counter() - start) * 1000
raise Exception('recv data timeout')
def __match(self, out_str: str, end_str: list) -> (bool, str):
result = self.__ansi_escape.sub('', out_str)
for it in end_str:
if result.endswith(it):
return True, result
return False, result
def __close(self):
if not self.__ssh:
return
self.__ssh.close()
self.__ssh = None