mitmproxy + python 实现游戏协议测试

mitmproxy + python 实现游戏协议测试

本文侧重介绍如何使用 python 和 mitmproxy 实现拦截数据包、重发数据包,以及解析 protobuf 数据内容,对于相关依赖的安装不做介绍。

一、游戏协议安全测试内容

参考https://testerhome.com/topics/29053,这篇文章讲的很清楚。


二、实现原理

想直接使用的同学可以跳到第三部分。

mitmproxy 作为代理,可以获取客户端与服务端通信的数据,并且可以拦截、修改和自主发送数据。当配合其证书使用时,还可以解密 wss 连接中的 websocket 数据。

  • Websotcket 数据处理源码分析

在 http 代理的过程中若发现 upgrade websocket 请求,则创建 WebSocketLayer 实例,并调用其<u style="box-sizing: border-box; outline-style: none; --tw-border-opacity:1; border-color: rgb(229 231 235/var(--tw-border-opacity));">call</u>方法。

# mitmproxy/proxy/protocol/http.py
"""以下为Httplayer的_process_flow方法的部分代码"""
if f.response.status_code == 101:
    # Handle a successful HTTP 101 Switching Protocols Response,
    # received after e.g. a WebSocket upgrade request.
    # Check for WebSocket handshake
    is_websocket = (
        websockets.check_handshake(f.request.headers) and
        websockets.check_handshake(f.response.headers)
    )
    if is_websocket and not self.config.options.websocket:
        self.log(
          "Client requested WebSocket connection, but the protocol is disabled.",
          "info"
        )

    if is_websocket and self.config.options.websocket:
        layer = WebSocketLayer(self, f)
    else:
        layer = self.ctx.next_layer(self)
    layer()

WebSocketLayer 初始化时会创建用于此次 websocket 通信的编解码器。

# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer类的init方法,省略部分代码"""
def __init__(self, ctx, handshake_flow):
    super().__init__(ctx)
    self.handshake_flow = handshake_flow

    self.connections: dict[object, WSConnection] = {}

    client_extensions = []
    server_extensions = []
    # 判断交互数据是否使用deflate压缩
    if 'Sec-WebSocket-Extensions' in handshake_flow.response.headers:
        if PerMessageDeflate.name in handshake_flow.response.headers['Sec-WebSocket-Extensions']:
            client_extensions = [PerMessageDeflate()]
            server_extensions = [PerMessageDeflate()]
    # self.client_conn和self.server_conn继承自ctx,即原http的client和server,原理为父类的__getattr__(self, name)方法返回的是getattr(self.ctx, name)。WSConnection是一个websocket协议编解码器,实际不会发送任何网络IO,文档地址:https://python-hyper.org/projects/wsproto/en/latest/basic-usage.html
    # 负责和解码server收到信息和编码server发送的信息
    self.connections[self.client_conn] = WSConnection(ConnectionType.SERVER)
    # 负责和解码client收到信息和编码client发送的信息
    self.connections[self.server_conn] = WSConnection(ConnectionType.CLIENT)

    # 构造发送给Server的websocket的握手请求
    request = Request(extensions=client_extensions,host=handshake_flow.request.host,target=handshake_flow.request.path)
    # send()方法只会构造一个适用于对应conn的数据,并不会真正发送数据,recv_data()会将信息解码,需要通过next(conn.events())获取解码后数据
    # 按上所说,下面两行代码的操作是将握手请求按client编码后发送给server编码器,然后让server编码器解码
    data = self.connections[self.server_conn].send(request)
    self.connections[self.client_conn].receive_data(data)

    event = next(self.connections[self.client_conn].events())
    assert isinstance(event, events.Request)
    # 返回给客户端接受连接响应
    data = self.connections[self.client_conn].send(AcceptConnection(extensions=server_extensions))
    self.connections[self.server_conn].receive_data(data)
    assert isinstance(next(self.connections[self.server_conn].events()), events.AcceptConnection)

WebSocketLayer 实例的<u style="box-sizing: border-box; outline-style: none; --tw-border-opacity:1; border-color: rgb(229 231 235/var(--tw-border-opacity));">call</u>方法负责处理后续 websocket 通信

# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer类的call方法,省略部分代码"""
def __call__(self):
    self.flow = WebSocketFlow(self.client_conn, self.server_conn, self.handshake_flow)
    self.flow.metadata['websocket_handshake'] = self.handshake_flow.id
    self.handshake_flow.metadata['websocket_flow'] = self.flow.id
    # 调用addons中的websocket_start(self, flow)对flow进行处理
    self.channel.ask("websocket_start", self.flow)

    conns = [c.connection for c in self.connections.keys()]
    close_received = False

    try:
        while not self.channel.should_exit.is_set():
            # 往client或server插入信息,self.flow._inject_messages_client/self.flow._inject_messages_server是队列,后续实现在连接中主动发消息就是通过往队列中插入数据实现
            self._inject_messages(self.client_conn, self.flow._inject_messages_client)
            self._inject_messages(self.server_conn, self.flow._inject_messages_server)
        # select监视原http的client和server连接的可读事件
            r = tcp.ssl_read_select(conns, 0.1)
            for conn in r:
                source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn
                other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn
                is_server = (source_conn == self.server_conn)

                frame = websockets.Frame.from_file(source_conn.rfile)
                # 将从conn中获取的数据放入编解码器,此方法并没有返回值,所以data是None
                data = self.connections[source_conn].receive_data(bytes(frame))
                # data是None,不解此举有何意义
                source_conn.send(data)

                if close_received:
                    return
        # 处理编解码器中解码后的数据,event由pop取出,后续不会再用到。
                for event in self.connections[source_conn].events():
                    if not self._handle_event(event, source_conn, other_conn, is_server):
                        if not close_received:
                            close_received = True
    except (socket.error, exceptions.TcpException, SSL.Error) as e:
        s = 'server' if is_server else 'client'
        self.flow.error = flow.Error("WebSocket connection closed unexpectedly by {}: {}".format(s, repr(e)))
        # 调用addons中的websocket_start(self, flow)对flow进行处理
        self.channel.tell("websocket_start", self.flow)
    finally:
        self.flow.ended = True
        # 调用addons中的websocket_end(self, flow)对flow进行处理
        self.channel.tell("websocket_end", self.flow)

WebSocketLayer 实例中处理 Message Event 的方法

# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer类的_handle_message方法,_handle_event中,若isinstance(event, events.Message),则会调用此函数"""
def _handle_message(self, event, source_conn, other_conn, is_server):
    fb = self.server_frame_buffer if is_server else self.client_frame_buffer
    fb.append(event.data)

    if event.message_finished:
        original_chunk_sizes = [len(f) for f in fb]

        if isinstance(event, events.TextMessage):
            message_type = wsproto.frame_protocol.Opcode.TEXT
            payload = ''.join(fb)
        else:
            message_type = wsproto.frame_protocol.Opcode.BINARY
            payload = b''.join(fb)

        fb.clear()

        websocket_message = WebSocketMessage(message_type, not is_server, payload)
        length = len(websocket_message.content)
        self.flow.messages.append(websocket_message)
        # 调用addons中的websocket_message(self, flow)对flow进行处理
        self.channel.ask("websocket_message", self.flow)

        # WebsocketMessage的属性killed用于判断该信息是否需要被转发,可在websocket_message函数中调用message的kill()方法置为True
        if not self.flow.stream and not websocket_message.killed:
            def get_chunk(payload):
                if len(payload) == length:
                    # message has the same length, we can reuse the same sizes
                    pos = 0
                    for s in original_chunk_sizes:
                        yield (payload[pos:pos + s], True if pos + s == length else False)
                        pos += s
                else:
                    # just re-chunk everything into 4kB frames
                    # header len = 4 bytes without masking key and 8 bytes with masking key
                    chunk_size = 4092 if is_server else 4088
                    chunks = range(0, len(payload), chunk_size)
                    for i in chunks:
                        yield (payload[i:i + chunk_size], True if i + chunk_size >= len(payload) else False)

            # 将收到的信息重新编码后向对端发送
            for chunk, final in get_chunk(websocket_message.content):
                data = self.connections[other_conn].send(Message(data=chunk, message_finished=final))
                other_conn.send(data)

    if self.flow.stream:
        data = self.connections[other_conn].send(Message(data=event.data, message_finished=event.message_finished))
        other_conn.send(data)
    return True

  • Tcp 数据处理源码分析

TCP 数据处理触发条件

# mitmproxy/proxy/root_context.py
"""RootContext类_next_layer方法,省略部分代码"""
"""
4\. Check for --tcp
判断Option中tcp_hosts, 类型是一个列表,包含需要转换成tcp流信息的server address正则表达式,例如['192\.168\.\d+\.\d+']
"""
if self.config.check_tcp(top_layer.server_conn.address):
    return protocol.RawTCPLayer(top_layer)

"""
6\. Check for raw tcp mode
判断Option中rawtcp,类型是bool,若为true,则将不能处理的流转换成tcp流处理,建议开启,默认是false
"""
is_ascii = (
    len(d) == 3 and
    # expect A-Za-z
    all(65 <= x <= 90 or 97 <= x <= 122 for x in d)
)
if self.config.options.rawtcp and not is_ascii:
    return protocol.RawTCPLayer(top_layer)

TCP 信息处理 RawTCPLayer 类源码

class RawTCPLayer(base.Layer):
    chunk_size = 4096

    def __init__(self, ctx, ignore=False):
        self.ignore = ignore
        super().__init__(ctx)

    def __call__(self):
        self.connect()

        if not self.ignore:
            f = tcp.TCPFlow(self.client_conn, self.server_conn, self)
            # 调用addons中的tcp_start(self, flow)对flow进行处理
            self.channel.ask("tcp_start", f)

        # 创建一个长度为4096的空bytearray
        buf = memoryview(bytearray(self.chunk_size))

        client = self.client_conn.connection
        server = self.server_conn.connection
        conns = [client, server]

        # https://github.com/openssl/openssl/issues/6234
        for conn in conns:
            if isinstance(conn, SSL.Connection) and hasattr(SSL._lib, "SSL_clear_mode"):
                SSL._lib.SSL_clear_mode(conn._ssl, SSL._lib.SSL_MODE_AUTO_RETRY)

        try:
            while not self.channel.should_exit.is_set():
                r = mitmproxy.net.tcp.ssl_read_select(conns, 10)
                for conn in r:
                    dst = server if conn == client else client
                    try:
                        # 将从conn中recv的数据存入buf,返回size
                        size = conn.recv_into(buf, self.chunk_size)
                    except (SSL.WantReadError, SSL.WantWriteError):
                        continue
                    if not size:
                        conns.remove(conn)
                        # Shutdown connection to the other peer
                        if isinstance(conn, SSL.Connection):
                            # We can't half-close a connection, so we just close everything here.
                            # Sockets will be cleaned up on a higher level.
                            return
                        else:
                            dst.shutdown(socket.SHUT_WR)

                        if len(conns) == 0:
                            return
                        continue
            # 将recv的数据转成TCPMessage
                    tcp_message = tcp.TCPMessage(dst == server, buf[:size].tobytes())
                    if not self.ignore:
                        f.messages.append(tcp_message)
                        # 调用addons中的tcp_message(self, flow)对flow进行处理
                        self.channel.ask("tcp_message", f)
                    # 发送tcp_message中的content
                    dst.sendall(tcp_message.content)

        except (socket.error, exceptions.TcpException, SSL.Error) as e:
            if not self.ignore:
                f.error = flow.Error("TCP connection closed unexpectedly: {}".format(repr(e)))
                # 调用addons中的tcp_error(self, flow)对flow进行处理
                self.channel.tell("tcp_error", f)
        finally:
            if not self.ignore:
                # 调用addons中的tcp_end(self, flow)对flow进行处理
                self.channel.tell("tcp_end", f)


三、开启 mitmproxy 并加载 addon

首先需要安装两个库:mitmproxy 和 mitmdump

1、编写 websocket 的 addon
"""
简略版用于websocket的Addon
后续改进可以增加判断host,避免拦截到不需要处理的连接,或者将Queue改成redis
"""
import asyncio
from multiprocessing import Queue

import mitmproxy.websocket

class WebsocketAddon:
    def __init__(self, input_q: Queue = Queue(), output_q: Queue = Queue()):
        self._input_q = input_q
        self._output_q = output_q

    async def inject(self, flow: mitmproxy.websocket.WebSocketFlow):
        while not flow.ended and not flow.error:
            # 增加间隔,否则会阻塞event
            await asyncio.sleep(0.5)
            while not self._input_q.empty():
                # WebSocketFlow的内置方法,用于主动插入信息,这里我只主动插入client->server的信息
                flow.inject_message(flow.server_conn, self._input_q.get())

    def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):
        # 加入发送websocket消息的task,参考了官方的示例脚本,地址:https://docs.mitmproxy.org/stable/addons-examples/#websocket-inject-message
        asyncio.get_event_loop().create_task(self.inject(flow))

    def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):
        message = flow.messages[-1]
        self._output_q.put({
          'from_client': message.from_client, 
          'data': message.content
        })
        # message.kill()可以让Layer不转发该条信息,我这里的目的是拦截掉所有客户端发送的数据,由自己编辑后再发送
        if message.from_client:
            message.kill()

2、编写 socket 的 addon
"""
简略版用于socket的Addon
和websocket版差别不大,插入数据和拦截数据有区别
"""
import asyncio
from multiprocessing import Queue

import mitmproxy.tcp

class SocketAddon:
    def __init__(self, input_q: Queue = Queue(), output_q: Queue = Queue()):
        self._input_q = input_q
        self._output_q = output_q

    async def inject(self, flow: mitmproxy.websocket.WebSocketFlow):
        while flow.live and not flow.error:
            await asyncio.sleep(0.5)
            while not self._input_q.empty():
                    # 直接向对端发送socket信息完成插入
                    flow.server_conn.connection.sendall(payload)

    def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):
        asyncio.get_event_loop().create_task(self.inject(flow))

    def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):
        message = flow.messages[-1]
        self._output_q.put({
          'from_client': message.from_client, 
          'data': message.content
        })
        if message.from_client:
           # socket发送0字节,conn.sendall(b'')将不会发送任何数据
           message.content = b''

3、开启 mitmproxy 并完成处理函数
import multiprocessing

from mitmdump import Options, DumpMaster

def start_proxy(input_q: multiprocessing.Queue(), output_q: multiprocessing.Queue()):
        addons = [
        # 自主选择是使用Websocket还是Socket
            WebsocketAddon(input_q, output_q)
        # SocketAddon(input_q, output_q)
        ]
        opts = Options(listen_host='0.0.0.0', listen_port=1080, scripts=None, mode='socks5',
                     rawtcp=True,
                   # 需要转换tcp数据成的ip正则
                   tcp_hosts=['.*'],
                   flow_detail=0, termlog_verbosity='error', show_clientconnect_log=True, )
        m = DumpMaster(opts)
        m.addons.add(*addons)
        m.run()

def deal_client_message_func(client_message: [bytes, str]):
        if type(client_message) is bytes:
        return client_message.decode('utf-8').encode('gbk')
    elif type(client_message) is str:
        return f"test {client_message}"

def simple_handel_message_func(input_q: multiprocessing.Queue(), output_q: multiprocessing.Queue()):
    while True:
            if not output_q.empty()
            message = output_q.get()
                print(f"{'客户端' if message['from_client'] else '服务端'} 包内容:{message['data']}")
            if message['from_client']:
                input_q.push(deal_client_message_func(message['data']))

def main():
    input_queue = multiprocessing.Queue()
    output_queue = multiprocessing.Queue()
    # 使用子进程启动proxy
    multiprocessing.Process(target=start_proxy, args=(input_queue, output_queue)).start()
    simple_handel_message_func(input_queue, output_queue)

四、总结

对于想实现开头文中所提到的功能还需要实现客户端,以及对于 protobuf 协议的编解码,这里限于篇幅不再讨论,后续有机会再更新。

另外,之所以 mitmproxy 选择 socks5 模式,是因为 socks 协议支持代理除了 http、https 以外更多种类的协议,windows 开启 socks5 代理的工具:proxifer,android 开启 socks5 代理工具:postern。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容

  • 最近和老婆迷上了头脑王者2。赢一局25金币,输一局掉100金币,这设置妥妥的亏本啊,玩下来才发现运营者的考虑是想让...
    木兮家先生阅读 5,663评论 2 5
  • 这个项目可以理解为针对互联网IT人打造的中文版awesome-go。已有的awesome-go项目, 汇总了很多g...
    零一间阅读 2,658评论 0 5
  • 1、顾名思义,mitmproxy 就是用于 MITM 的 proxy,MITM 即[中间人攻击],用于中间人攻击的...
    SkTj阅读 7,008评论 0 4
  • 贤人1 如果从耳朵听这两个字,你一定会认为是闲人。但是今天不讲闲人,而是讲贤人。 ...
    阳小果的春天阅读 155评论 0 0
  • 使用selenium模拟浏览器进行数据抓取无疑是当下最通用的数据采集方案,它通吃各种数据加载方式,能够绕过客户JS...
    warmi_阅读 13,415评论 5 7