微信企业号_智能机器人_python3

前提,不使用网络上提供的第三方机器人接口(类似微软小冰、图灵机器人),而是部署自己训练的智能机器人。

TODO

部署机器人

选择明文模式

在后面会作说明为何不选安全模式。
具体配置以及要注意的问题请参见参考

选择安全模式

主要步骤:

  1. 在服务器上部署项目,并运行;
  2. 设置微信企业号回调模式,并配置;
  3. 接收用户消息并解密;
  4. 处理消息并回复。

自己使用安全模式失败了,一直提示“token验证失败”。失败原因:选择了安全模式,但微信却没有对echostr进行加密,调试代码发现echostr是明文传输的,getSHA1生成安全签名肯定是错的了。所以只能使用明文模式。(现在发现三个模式的echostr都是明文传输,使用request.args.get方法,有大神能告知为什么得到的request中的echostr没有加密吗?)

在此附上排查“token验证失败”要关注的点,以及相关的解决方法链接。

token验证失败问题排查

  1. 编码问题,针对php
  2. 时间戳问题
  3. 微信样例尝试
  4. 通过了SHA算法,却错误(尝试直接返回echostr)
  5. 获取url内容方式
  6. 服务器地址的问题
  7. 内网映射外网的原因

Token验证失败的解决方法
微信公众号开发者中心配置 Token验证失败 终极解决方案
【微信公众号】修改配置的时候提示token验证失败

以下是Python3环境下的加密解密脚本,感谢python3 WXBizMsgCrypt.py 支持中文
WXBizMsgCrypt_py3.py

    #!/usr/bin/env python
# -*- encoding:utf-8 -*-

"""
    python3对公众平台发送给公众账号的消息加解密代码.支持中文.
"""
# ------------------------------------------------------------------------

import base64
import string
import random
import hashlib
import time
import struct

import binascii
from Crypto.Cipher import AES
import xml.etree.cElementTree as ET
import socket

import ierror

""" AES加解密用 pycrypto """


class FormatException(Exception):
    pass


def throw_exception(message, exception_class=FormatException):
    """my define raise exception function"""
    raise exception_class(message)


class SHA1:
    """计算公众平台的消息签名接口"""

    def getSHA1(self, token, timestamp, nonce, encrypt):
        """用SHA1算法生成安全签名
        @param token:  票据
        @param timestamp: 时间戳
        @param encrypt: 密文
        @param nonce: 随机字符串
        @return: 安全签名
        """
        try:
            token = token.decode()
            sortlist = [token, timestamp, nonce, encrypt]
            sortlist.sort()
            sha = hashlib.sha1()
            sha.update("".join(sortlist).encode("utf8"))
            return ierror.WXBizMsgCrypt_OK, sha.hexdigest()
        except Exception as e:
            print(e)
            return ierror.WXBizMsgCrypt_ComputeSignature_Error, None


class XMLParse(object):
    """提供提取消息格式中的密文及生成回复消息格式的接口"""

    # xml消息模板
    AES_TEXT_RESPONSE_TEMPLATE = """<xml>
        <Encrypt><![CDATA[%(msg_encrypt)s]]></Encrypt>
        <MsgSignature><![CDATA[%(msg_signaturet)s]]></MsgSignature>
        <TimeStamp>%(timestamp)s</TimeStamp>
        <Nonce><![CDATA[%(nonce)s]]></Nonce>
        </xml>"""

    def extract(self, xmltext):
        """提取出xml数据包中的加密消息
        @param xmltext: 待提取的xml字符串
        @return: 提取出的加密消息字符串
        """
        try:
            xml_tree = ET.fromstring(xmltext)
            encrypt = xml_tree.find("Encrypt")
            touser_name = xml_tree.find("ToUserName")
            return ierror.WXBizMsgCrypt_OK, encrypt.text, touser_name.text
        except Exception as e:
            print(e)
            return ierror.WXBizMsgCrypt_ParseXml_Error, None, None

    def generate(self, encrypt, signature, timestamp, nonce):
        """生成xml消息
        @param encrypt: 加密后的消息密文
        @param signature: 安全签名
        @param timestamp: 时间戳
        @param nonce: 随机字符串
        @return: 生成的xml字符串
        """
        resp_dict = {
            'msg_encrypt': encrypt,
            'msg_signaturet': signature,
            'timestamp': timestamp,
            'nonce': nonce,
        }
        resp_xml = self.AES_TEXT_RESPONSE_TEMPLATE % resp_dict
        return resp_xml


class PKCS7Encoder(object):
    """提供基于PKCS7算法的加解密接口"""

    block_size = 32

    def encode(self, text):
        """ 对需要加密的明文进行填充补位
        @param text: 需要进行填充补位操作的明文
        @return: 补齐明文字符串
        """
        text_length = len(text)
        # 计算需要填充的位数
        amount_to_pad = self.block_size - (text_length % self.block_size)
        if amount_to_pad == 0:
            amount_to_pad = self.block_size
        # 获得补位所用的字符
        pad = chr(amount_to_pad).encode()
        return text + pad * amount_to_pad

    def decode(self, decrypted):
        """删除解密后明文的补位字符
        @param decrypted: 解密后的明文
        @return: 删除补位字符后的明文
        """
        pad = ord(decrypted[-1])
        if pad < 1 or pad > 32:
            pad = 0
        return decrypted[:-pad]


class Prpcrypt(object):
    """提供接收和推送给公众平台消息的加解密接口"""

    def __init__(self, key):
        # self.key = base64.b64decode(key+"=")
        self.key = key
        # 设置加解密模式为AES的CBC模式
        self.mode = AES.MODE_CBC

    def encrypt(self, text, appid):
        """对明文进行加密
        @param text: 需要加密的明文
        @return: 加密得到的字符串
        """
        # 16位随机字符串添加到明文开头
        len_str = struct.pack("I", socket.htonl(len(text.encode())))
        # text = self.get_random_str() + binascii.b2a_hex(len_str).decode() + text + appid
        text = self.get_random_str() + len_str + text.encode() + appid
        # 使用自定义的填充方式对明文进行补位填充
        pkcs7 = PKCS7Encoder()
        text = pkcs7.encode(text)
        # 加密
        cryptor = AES.new(self.key, self.mode, self.key[:16])
        try:
            ciphertext = cryptor.encrypt(text)
            # 使用BASE64对加密后的字符串进行编码
            return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext).decode('utf8')
        except Exception as e:
            return ierror.WXBizMsgCrypt_EncryptAES_Error, None

    def decrypt(self, text, appid):
        """对解密后的明文进行补位删除
        @param text: 密文
        @return: 删除填充补位后的明文
        """
        try:
            cryptor = AES.new(self.key, self.mode, self.key[:16])
            # 使用BASE64对密文进行解码,然后AES-CBC解密
            plain_text = cryptor.decrypt(base64.b64decode(text))
        except Exception as e:
            print(e)
            return ierror.WXBizMsgCrypt_DecryptAES_Error, None
        try:
            # pad = ord(plain_text[-1])
            pad = plain_text[-1]
            # 去掉补位字符串
            # pkcs7 = PKCS7Encoder()
            # plain_text = pkcs7.encode(plain_text)
            # 去除16位随机字符串
            content = plain_text[16:-pad]
            xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
            xml_content = content[4: xml_len + 4]
            from_appid = content[xml_len + 4:]
        except Exception as e:
            return ierror.WXBizMsgCrypt_IllegalBuffer, None
        if from_appid != appid:
            return ierror.WXBizMsgCrypt_ValidateAppid_Error, None
        return 0, xml_content.decode()

    def get_random_str(self):
        """ 随机生成16位字符串
        @return: 16位字符串
        """
        rule = string.ascii_letters + string.digits
        str = random.sample(rule, 16)
        return "".join(str).encode()


class WXBizMsgCrypt(object):
    # 构造函数
    # @param sToken: 公众平台上,开发者设置的Token
    # @param sEncodingAESKey: 公众平台上,开发者设置的EncodingAESKey
    # @param sAppId: 企业号的AppId
    def __init__(self, sToken, sEncodingAESKey, sAppId):
        try:
            self.key = base64.b64decode(sEncodingAESKey + "=")
            assert len(self.key) == 32
        except Exception:
            throw_exception("[error]: EncodingAESKey unvalid !", FormatException)
            # return ierror.WXBizMsgCrypt_IllegalAesKey)
        self.token = sToken.encode()
        self.appid = sAppId.encode()

    def EncryptMsg(self, sReplyMsg, sNonce, timestamp=None):
        # 将公众号回复用户的消息加密打包
        # @param sReplyMsg: 企业号待回复用户的消息,xml格式的字符串
        # @param sTimeStamp: 时间戳,可以自己生成,也可以用URL参数的timestamp,如为None则自动用当前时间
        # @param sNonce: 随机串,可以自己生成,也可以用URL参数的nonce
        # sEncryptMsg: 加密后的可以直接回复用户的密文,包括msg_signature, timestamp, nonce, encrypt的xml格式的字符串,
        # return:成功0,sEncryptMsg,失败返回对应的错误码None
        pc = Prpcrypt(self.key)
        ret, encrypt = pc.encrypt(sReplyMsg, self.appid)
        if ret != 0:
            return ret, None
        if timestamp is None:
            timestamp = str(int(time.time()))
        # 生成安全签名
        sha1 = SHA1()
        ret, signature = sha1.getSHA1(self.token, timestamp, sNonce, encrypt)

        if ret != 0:
            return ret, None
        xmlParse = XMLParse()
        return ret, xmlParse.generate(encrypt, signature, timestamp, sNonce)
    def VerifyURL(self, sMsgSignature, sTimeStamp, sNonce, sEchoStr):
        sha1 = SHA1()
        ret,signature = sha1.getSHA1(self.token, sTimeStamp, sNonce, sEchoStr)
        if ret  != 0:
            return ret, None
        if not signature == sMsgSignature:
            return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
        pc = Prpcrypt(self.key)
        ret,sReplyEchoStr = pc.decrypt(sEchoStr,self.appid)
        return ret,sReplyEchoStr

    def DecryptMsg(self, sPostData, sMsgSignature, sTimeStamp, sNonce):
        # 检验消息的真实性,并且获取解密后的明文
        # @param sMsgSignature: 签名串,对应URL参数的msg_signature
        # @param sTimeStamp: 时间戳,对应URL参数的timestamp
        # @param sNonce: 随机串,对应URL参数的nonce
        # @param sPostData: 密文,对应POST请求的数据
        #  xml_content: 解密后的原文,当return返回0时有效
        # @return: 成功0,失败返回对应的错误码
        # 验证安全签名
        xmlParse = XMLParse()
        ret, encrypt, touser_name = xmlParse.extract(sPostData)
        if ret != 0:
            return ret, None
        sha1 = SHA1()
        ret, signature = sha1.getSHA1(self.token, sTimeStamp, sNonce, encrypt)
        if ret != 0:
            return ret, None
        if not signature == sMsgSignature:
            return ierror.WXBizMsgCrypt_ValidateSignature_Error, None
        pc = Prpcrypt(self.key)
        ret, xml_content = pc.decrypt(encrypt, self.appid)
        return ret, xml_content

以下是微信明文模式的脚本文件
wechat_plaintext .py

#-*- coding:utf-8 -*-
#
from flask import Flask,request
# from WXBizMsgCrypt import WXBizMsgCrypt
from WXBizMsgCrypt_py3 import WXBizMsgCrypt
import xml.etree.cElementTree as ET
import sys
app = Flask(__name__)
app.route('/',methods=['GET','POST'])
def index():
    #以下为开发者在服务器配置中填写的相应信息
    sToken = 'weixin'#根据自己设置的信息填写
    sEncodingAESKey = 'E7zXIxxxxxxxxxxxxxxx'#根据自己设置的信息填写
    sCorpID = 'wxxxxxxxxx'#根据自己设置的信息填写
    wxcpt=WXBizMsgCrypt(sToken,sEncodingAESKey,sCorpID)
    #获取url验证时微信发送的相关参数
    #get方法
    sVerifyMsgSig=request.args.get("signature")
    sVerifyTimeStamp=request.args.get('timestamp')
    sVerifyNonce=request.args.get('nonce')
    sVerifyEchoStr=request.args.get('echostr')
    print(request.args)
    print("sVerifyMsgSig",sVerifyMsgSig)
    print("sVerifyTimeStamp",sVerifyTimeStamp)
    print("sVerifyNonce",sVerifyNonce )
    print("sVerifyEchoStr",sVerifyEchoStr )
    #验证url
    if request.method == 'GET':
        return sVerifyEchoStr
    #接收客户端消息
    if request.method == 'POST':
        sReqData = request.data
        print(request.args)
        print("request.data:",request.data)
        #解析接收的内容,XML格式
        xml_tree = ET.fromstring(sReqData)
        content = xml_tree.find("Content").text
        ToUserName=xml_tree.find("ToUserName").text
        FromUserName=xml_tree.find("FromUserName").text
        CreateTime=xml_tree.find("CreateTime").text
        MsgType=xml_tree.find("MsgType").text
        MsgId=xml_tree.find("MsgId").text
    #被动响应消息,将微信端发送的消息返回给微信端
    response='''<xml><ToUserName><![CDATA['''+FromUserName+''']]></ToUserName>\n
            <FromUserName><![CDATA['''+ToUserName+''']]></FromUserName>\n
            <CreateTime>'''+CreateTime+'''</CreateTime>\n
            <MsgType><![CDATA['''+MsgType+''']]></MsgType>\n
            <Content><![CDATA['''+content+''']]></Content>\n
            <MsgId>'''+MsgId+'''</MsgId>\n</xml>'''
    return response
if __name__ == '__main__':
    app.run(host='128.24.6.122', port=80, debug=False)#根据自己设置的信息填写

加解密是独立的,但EncryptMsg和DecryptMsg具备功能性定义,严格操作了"将公众号回复用户的消息加密打包""检验消息的真实性,并且获取解密后的明文"两个功能。

参考&感谢<span id="jump"></span>

微信企业号开发2--消息回复
微信企业号接收消息并自动响应
官方文档 回调模式
微信开发 企业号(二)-- 回调模式之Tooken验证 .net/python
Python微信企业号开发之回调模式接收微信端客户端发送消息及被动返回消息示例
加解密方案的详细说明
python3 WXBizMsgCrypt.py 支持中文

转载请注明如下内容:

知乎: @Forfreedom
CSDN:Freedom_anytime的博客 - CSDN博客
简书:For_freedom - 简书

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,596评论 18 139
  • 微信服务号开发 整体流程 域名报备,服务器搭建 Python开发环境和项目的初始化搭建; 微信公众号注册及开发模式...
    飞行员suke阅读 4,470评论 0 14
  • 1、开启公众号开发者模式 公众平台的技术文档目的为了简明扼要的交代接口的使用,语句难免苦涩难懂,甚至对于不同的读者...
    good7758阅读 1,504评论 0 1
  • 经历了漫长的飞行之旅,我看着窗外迷人的晚霞,从辽阔的祖国大地去到了这片充满音乐细胞的维也纳土地上,我仿佛听...
    钟政文阅读 476评论 0 0