微信小程序电子发票中控服务实现方案

微信小程序电子发票中控服务实现方案

最近在开发微信小程序的时候,需要从微信服务定时获取access_token。花了半天时间实现了一下这个功能。

默认大家已经熟练掌握以下技术:

dubbo、SpringBoot、Redis、Redisson、分布式锁

微信官方文档中对access_token的说明:

1、建议开发者使用中控服务器统一获取和刷新access_token,其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则容易造成冲突,导致access_token覆盖而影响业务;

2、中控服务器需要根据这个有效时间提前去刷新access_token。在刷新过程中,中控服务器可对外继续输出的老access_token,此时公众平台后台会保证在5分钟内,新老access_token都可用,这保证了第三方业务的平滑过渡;

3、Access_token的有效时间可能会在未来有调整,所以中控服务器不仅需要内部定时主动刷新,还需要提供被动刷新access_token的接口,这样便于业务服务器在API调用获知access_token已超时的情况下,可以触发access_token的刷新流程。。

微信文档地址:https://mp.weixin.qq.com/wiki?t=resource/res_main&id=mp1421140183

方案要点设计:

  • access_token可以保存在缓存中,其定时过期可以使用redis来实现。
  • 使用Redisson来连接redis,redis配置采用哨兵配置,配置1主2从3哨兵。
  • 生产环境肯定不能用单节点去做中控服务器,需要用多台服务来保证高可用,需要保证不冲突不覆盖access_token。
  • 基于分布式的dubbo框架的provider服务,为服务消费者consumer提供服务。部署多个provider服务实例,每一个provider服务实例都要配置定时任务从微信服务获取access_token,每个provider服务实例都要与redis进行交互。关键是需要使用分布式锁来保证每两个小时(实际用的是1个小时)的定时任务在触发执行时,只能有一台服务成功运行定时任务从微信服务获取access_token并保存到redis中

具体实现方案:

写在前面

省略了在服务器搭建redis哨兵的过程。
该项目使用springboot框架。省略了springboot与duboo的集成过程。
注意Redisson的版本问题,如果使用jdk1.8则可以使用Redisson的3.0以上的版本,比如3.7.1。如果jdk是1.7则可以使用Redisson的2.9.1。
因为微信小程序后台服务是在一台老服务器上部署,jre只有1.7。所以这里选用Redisson的版本为2.9.1。
SpringBoot对缓存的自动配置是其默认的RedisTemplate,没有对Redisson的自动配置,需要自己实现。

手动实现SpringBoot的自动配置来配置Redisson

手动实现SpringBoot对Redisson的自动配置主要是分为两步:

  • 1.将SpringBoot的配置文件对Redisson的配置项映射到配置Redisson的Properties类中,主要是用注解@ConfigurationProperties(prefix = "redisson")
  • 2.将配置Redisson的Properties类的配置绑定到Redisson的自动配置类中,使其生效,主要是用注解@EnableConfigurationProperties(RedissonSentinelProperties.class)

application.properties中配置项

配置项中的ip是公司的内网ip,在用的时候改成自己的ip,密码同理。

redisson.masterName=mymaster
redisson.addSentinelAddress=192.168.10.112:26279,192.168.10.112:26280,192.168.10.114:26281
redisson.password=thirdservice******
redisson.readMode=slave
redisson.subscription-mode=slave
redisson.connectTimeout=3000
redisson.timeout=3000
redisson.idleConnectionTimeout=10000
redisson.retryAttempts=3
redisson.retryInterval=1500
redisson.reconnectionTimeout=3000
redisson.failedAttempts=3
redisson.slaveConnectionPoolSize=256
redisson.slaveConnectionMinimumIdleSize=64
redisson.masterConnectionPoolSize=256
redisson.masterConnectionMinimumIdleSize=64
redisson.subscriptionsPerConnection=5
redisson.pingTimeout=1000

配置Redisson的Properties类RedissonSentinelProperties

RedissonSentinelProperties 中使用@ConfigurationProperties(prefix = "redisson")绑定application.properties中以redisson开头的配置项,并将自身添加到Spring容器中。

import lombok.Data;
import org.redisson.config.ReadMode;
import org.redisson.config.SubscriptionMode;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * @Copyright (C), 2017-2018, 钢的郭
 * @FileName: RedissonSentinelProperties
 * @Author: gangdeguo
 * @Date: 2018/11/26 10:50
 * @Description: Redisson哨兵配置资源项类
 * @Version:1.0
 */
@Data
@Component
@ConfigurationProperties(prefix = "redisson")
public class RedissonSentinelProperties {

    /**
     * (主服务器的名称)
     * 主服务器的名称是哨兵进程中用来监测主从服务切换情况的。
     */
    private String masterName;

    /**
     * (添加哨兵节点地址)
     * 可以通过host:port的格式来指定哨兵节点的地址。多个节点可以一次性批量添加。
     */
    private String addSentinelAddress;

    /**
     * (密码)
     *      默认值:null
     *      用于节点身份验证的密码。
     */
    private String password;

    /**
     * (DNS监控间隔)
     *      单位:毫秒 默认值:5000
     *      用来指定检查节点DNS变化的时间间隔。使用的时候应该确保JVM里的DNS数据的缓存时间保持在足够低的范围才有意义。用-1来禁用该功能。
     */
    //private int dnsMonitoringInterval;

    /**
     * (读取操作的负载均衡模式)
     *      默认值: SLAVE(只在从服务节点里读取)
     *      注:在从服务节点里读取的数据说明已经至少有两个节点保存了该数据,确保了数据的高可用性。
     *      设置读取操作选择节点的模式。 可用值为: SLAVE - 只在从服务节点里读取。 MASTER - 只在主服务节点里读取。 MASTER_SLAVE - 在主从服务节点里都可以读取。
     */
    private ReadMode readMode;


    /**
     * (订阅操作的负载均衡模式)
     *     默认值:SLAVE(只在从服务节点里订阅)
     *
     *     设置订阅操作选择节点的模式。 可用值为: SLAVE - 只在从服务节点里订阅。 MASTER - 只在主服务节点里订阅。
     */
    private SubscriptionMode subscriptionMode;


    /**
     * (连接超时,单位:毫秒)
     *     默认值:10000
     *
     *     同任何节点建立连接时的等待超时。时间单位是毫秒。
     */
    private int connectTimeout;

    /**
     * (命令等待超时,单位:毫秒)
     *     默认值:3000
     *
     *     等待节点回复命令的时间。该时间从命令发送成功时开始计时。
     */
    private int timeout;
    /**
     * (连接空闲超时,单位:毫秒)
     *     默认值:10000
     *
     *     如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。
     */
    private int idleConnectionTimeout;

    /**
     * (命令失败重试次数)
     *     默认值:3
     *
     *     如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
     */
    private int retryAttempts;

    /**
     * (命令重试发送时间间隔,单位:毫秒)
     *     默认值:1500
     *
     *     在一条命令发送失败以后,等待重试发送的时间间隔。时间单位是毫秒。
     */
    private int retryInterval;

    /**
     * (重新连接时间间隔,单位:毫秒)
     *     默认值:3000
     *
     *     当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。
     */
    private int reconnectionTimeout;

    /**
     * (执行失败最大次数)
     *     默认值:3
     *
     *     在某个节点执行相同或不同命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。
     */
    private int failedAttempts;

    /**
     * (从节点连接池大小)
     *     默认值:64
     *
     *     多从节点的环境里,每个 从服务节点里用于普通操作(非 发布和订阅)连接的连接池最大容量。连接池的连接数量自动弹性伸缩。
     */
    private int slaveConnectionPoolSize;

    /**
     * (从节点最小空闲连接数)
     *     默认值:32
     *
     *     多从节点的环境里,每个 从服务节点里用于普通操作(非 发布和订阅)的最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时读取反映速度。
     */
    private int slaveConnectionMinimumIdleSize;

    /**
     * (主节点连接池大小)
     *     默认值:64
     *
     *     主节点的连接池最大容量。连接池的连接数量自动弹性伸缩。
     */
    private int masterConnectionPoolSize;

    /**
     * (主节点最小空闲连接数)
     *     默认值:32
     *
     *     多从节点的环境里,每个 主节点的最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时写入反应速度。
     */
    private int masterConnectionMinimumIdleSize;

    /**
     * ping超时时间
     */
    private int pingTimeout;
}

Redisson的自动配置类RedissonSentinelConfiguration

RedissonSentinelConfiguration是一个SpringBoot的配置类。该配置类中使用注解@EnableConfigurationProperties(RedissonSentinelProperties.class)来绑定和启用上节的RedissonSentinelProperties类。
配置类RedissonSentinelConfiguration 是没有无参构造器的,在项目启动过程中,Spring会自动将其容器中的RedissonSentinelProperties类注入到RedissonSentinelConfiguration 中。
配置类RedissonSentinelConfiguration 创建了RedissonClient 并将其注册到Spring容器中,我么在使用的时候使用@Autowire注进来就直接使用就可以了。

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.codec.FstCodec;
import org.redisson.config.Config;
import org.redisson.config.SentinelServersConfig;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Copyright (C), 2017-2018, 钢的郭
 * @FileName: RedissonSentinelConfiguration
 * @Author: guoyfiang
 * @Date: 2018/11/26 10:50
 * @Description: Redisson哨兵自动配置类
 * @Version:1.0
 */
@Configuration
@EnableConfigurationProperties(RedissonSentinelProperties.class)
public class RedissonSentinelConfiguration {

    private RedissonSentinelProperties redissonSentinelProperties;

    public RedissonSentinelConfiguration(RedissonSentinelProperties redissonSentinelProperties) {
        this.redissonSentinelProperties = redissonSentinelProperties;
    }


    @Bean
    RedissonClient redissonClient() {
        Config config = new Config();
        //Codec codec = new FstCodec();
        //config.setCodec(codec);
        SentinelServersConfig sentinelConfig = config.useSentinelServers();

        System.out.println(redissonSentinelProperties.getAddSentinelAddress() + "++++++++++++++");

        //设置哨兵主服务器的名称
        sentinelConfig.setMasterName(redissonSentinelProperties.getMasterName());
        //添加哨兵节点地址
        sentinelConfig.addSentinelAddress(redissonSentinelProperties.getAddSentinelAddress().split(","));
        //设置redis哨兵连接密码
        sentinelConfig.setPassword(redissonSentinelProperties.getPassword());
        //读取操作的负载均衡模式
        sentinelConfig.setReadMode(redissonSentinelProperties.getReadMode());
        // 订阅操作的负载均衡模式
        sentinelConfig.setSubscriptionMode(redissonSentinelProperties.getSubscriptionMode());
        // 设置redis连接超时时间
        sentinelConfig.setConnectTimeout(redissonSentinelProperties.getConnectTimeout());
        // 设置命令等待超时
        sentinelConfig.setTimeout(redissonSentinelProperties.getTimeout());
        // 设置连接空闲超时
        sentinelConfig.setIdleConnectionTimeout(redissonSentinelProperties.getIdleConnectionTimeout());
        // 设置命令失败重试次数
        sentinelConfig.setRetryAttempts(redissonSentinelProperties.getRetryAttempts());
        // 设置命令重试发送时间间隔
        sentinelConfig.setRetryInterval(redissonSentinelProperties.getRetryInterval());
        // 设置重新连接时间间隔
        sentinelConfig.setReconnectionTimeout(redissonSentinelProperties.getReconnectionTimeout());
        // 设置执行失败最大次数
        sentinelConfig.setFailedAttempts(redissonSentinelProperties.getFailedAttempts());
        // 设置从节点连接池大小
        sentinelConfig.setSlaveConnectionPoolSize(redissonSentinelProperties.getSlaveConnectionPoolSize());
        // 设置从节点最小空闲连接数
        sentinelConfig.setSlaveConnectionMinimumIdleSize(redissonSentinelProperties.getSlaveConnectionMinimumIdleSize());
        // 设置主节点连接池大小
        sentinelConfig.setMasterConnectionPoolSize(redissonSentinelProperties.getMasterConnectionPoolSize());
        // 主节点最小空闲连接数
        sentinelConfig.setMasterConnectionMinimumIdleSize(redissonSentinelProperties.getMasterConnectionMinimumIdleSize());
        // 单个连接最大订阅数量
        sentinelConfig.setSubscriptionsPerConnection(redissonSentinelProperties.getSubscriptionsPerConnection());
        // ping超时时间
        sentinelConfig.setPingTimeout(redissonSentinelProperties.getPingTimeout());
        return Redisson.create(config);
    }
}

与redis进行交互的service接口

定义接口主要用于定义api,dubbo项目的consumer 和provider同时依赖它。
此接口主要是用于使用Redisson与缓存进行交互。

import cn.leadeon.thirdparty.base.ResultData;

/**
 * @Copyright (C), 2017-2018, 钢的郭
 * @FileName: ThirdCacheService
 * @Author: Leadeon
 * @Date: 2018/11/27 14:39
 * @Description: 第三方缓存交互service
 * @Version:1.0
 */
public interface ThirdCacheService {
    
    /**
     * 保存数据至缓存
     * @param trace 流水号
     * @param key 键
     * @param value 值
     * @param expireTime 过期时间
     * @return ResultData
     */
     ResultData<?> set(String trace, String key, String value, int expireTime);
    /**
     * 
     * @param trace 流水号
     * @param key 键
     * @param value 值
     * @return ResultData
     */
     ResultData<?> set(String trace, String key, String value);
    
    
    /**
     * 获取缓存的的value
     * @param trace 流水号
     * @param key 键
     * @return ResultData
     */
     ResultData<?> get(String trace, String key);
    
    /**
     * INCR key 
     * @param trace 流水号
     * @param key 键
     * @param value 值
     * @return ResultData
     */
     ResultData<?> incrBy(String trace, String key, Double value);
    
    /**
     * INCR key 
     * @param trace 流水号
     * @param key 键
     * @param value 值
     * @return ResultData
     */
     ResultData<?> incrBy(String trace, String key, Double value, long expireTime);
    
    /**
     * DECR key 
     * @param trace 流水号
     * @param key 键
     * @param value 值
     * @return ResultData
     */
     ResultData<?> decrBy(String trace, String key, Double value);

    
    /**
     * 向sortset中插入元素
     * @param trace 流水号
     * @param key 键
     * @param member 成员
     * @param score 得分
     * @return ResultData
     */
     ResultData<?> zadd(String trace, String key, String member, Long score);


    /**
     * 获取介于最大值和最小值之间的固定条数据
     * @param trace 流水号
     * @param key 键
     * @param startScore  开始得分
     * @param endScore  结束得分
     * @param offset
     * @param count
     * @return ResultData
     */
     ResultData<?> zRangeByScore(String trace, String key, long startScore, long endScore, int offset, int count);

    /**
     * 获取介于最大值和最小值之间的总条数
     * @param trace 流水号
     * @param key 键
     * @param startScore  开始
     * @param endScore  结束
     * @return ResultData
     */
     ResultData<?>  zCount(String trace, String key, long startScore, long endScore);
    
    /**
     * 入消息队列
     * @param trace 流水号
     * @param key 键
     * @param value 值
     * @return ResultData
     */
     ResultData<?> lPush(String trace, String key, String value);
    
    /**
     * 出消息队列
     * @param trace 流水号
     * @param key 键
     * @return ResultData
     */
     ResultData<?> rPop(String trace, String key);
    
    /**
     * 获取分布式锁
     * @param trace 流水号
     * @param key 键
     * @param expireTime 过期时间
     * @return ResultData
     */
     ResultData<?> getLock(String trace, String key, long expireTime);
    
    /**
     * 释放分布式锁
     * @param trace 流水号
     * @param key 键
     * @return ResultData
     */
     ResultData<?> unLock(String trace, String key);
    
}

与redis进行交互的service实现类

实现了上一节的接口,与缓存进行交互

import cn.leadeon.thirdparty.base.ResultData;
import cn.leadeon.thirdparty.constant.ThridPartyResCode;
import cn.leadeon.thirdparty.log.Log;
import org.redisson.api.RAtomicDouble;
import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
/**
 * @Copyright (C), 2017-2018, 钢的郭
 * @FileName: RedissonSentinelConfiguration
 * @Author: guoyfiang
 * @Date: 2018/11/26 10:50
 * @Description: Redisson哨兵自动配置类
 * @Version:1.0
 */
@Component
public class ThirdCacheServiceImpl implements ThirdCacheService {


    public final static Log log = new Log(ThirdCacheServiceImpl.class);

    @Autowired
    private RedissonClient redissonClient;

    @Override
    public ResultData<?> set(String trace, String key, String val, int expireTime) {
        ResultData<Boolean> rd = new ResultData<>();
        try {
            redissonClient.getBucket(key).set(val, expireTime, TimeUnit.SECONDS);
            rd.setResultCode(ThridPartyResCode._0000);
            rd.setResultData(true);
        } catch (Exception e) {
            rd.setResultData(false);
            rd.setResultCode(ThridPartyResCode._0001);
            rd.setDesc("get data fail..");
            rd.setException(e);
            // 打印错误日志
            log.error("trace:" + trace + "set [key:" + key + "val:" + val + "] fail", e);
        }
        return rd;
    }

    @Override
    public ResultData<?> set(String trace, String key, String val) {
        ResultData<Boolean> rd = new ResultData<>();
        try {

            redissonClient.getBucket(key).set(val);
            rd.setResultCode(ThridPartyResCode._0000);
            rd.setResultData(true);
        } catch (Exception e) {
            rd.setResultData(false);
            rd.setResultCode(ThridPartyResCode._0001);
            rd.setDesc("get data fail..");
            rd.setException(e);
            // 打印错误日志
            log.error("trace:" + trace + "set [key:" + key + "val:" + val + "] fail", e);
        }
        return rd;
    }

    @Override
    public ResultData<?> get(String trace, String key) {
        ResultData<Object> rd = new ResultData<>();
        try {
            RBucket<Object> bucket = redissonClient.getBucket(key);
            rd.setResultCode(ThridPartyResCode._0000);
            rd.setResultData(bucket.get());
        } catch (Exception e) {
            rd.setResultCode(ThridPartyResCode._0001);
            rd.setDesc("get data fail..");
            rd.setException(e);
            // 打印错误日志
            log.error("trace:" + trace + "get [key:" + key + "] fail", e);
        }
        return rd;
    }


    @Override
    public ResultData<?> incrBy(String trace, String key, Double value) {
        ResultData<Double> rd = new ResultData<Double>();
        try {
            double bucket = redissonClient.getAtomicDouble(key).addAndGet(value);
            rd.setResultCode(ThridPartyResCode._0000);
            rd.setResultData(bucket);
        } catch (Exception e) {
            rd.setResultCode(ThridPartyResCode._0001);
            rd.setException(e);
            // 打印错误日志
            log.error("trace:" + trace + "incrBy [key:" + key + "score:" + value + "] fail", e);
        }
        return rd;
    }

    @Override
    public ResultData<?> incrBy(String trace, String key, Double value, long expireTime) {
        ResultData<Double> rd = new ResultData<Double>();
        try {
            double bucket = redissonClient.getAtomicDouble(key).addAndGet(value);
            redissonClient.getAtomicDouble(key).expire(expireTime, TimeUnit.MILLISECONDS);
            rd.setResultCode(ThridPartyResCode._0000);
            rd.setResultData(bucket);
        } catch (Exception e) {
            rd.setResultCode(ThridPartyResCode._0001);
            rd.setException(e);
            // 打印错误日志
            log.error("trace:" + trace + "incrBy [key:" + key + "score:" + value + "] fail", e);
        }
        return rd;
    }


    @Override
    public ResultData<?> decrBy(String trace, String key, Double value) {
        ResultData<Double> rd = new ResultData<Double>();
        try {
            RAtomicDouble atomicDouble = redissonClient.getAtomicDouble(key);
            if (value <= atomicDouble.get()) {
                //可扣减
                double bucket = redissonClient.getAtomicDouble(key).addAndGet(-value);
                rd.setResultData(bucket);
                rd.setResultCode(ThridPartyResCode._0000);
            } else {
                //余额不够扣减
                rd.setResultCode(ThridPartyResCode._0005);
                rd.setResultData(atomicDouble.get());
            }
        } catch (Exception e) {
            rd.setResultCode(ThridPartyResCode._0001);
            rd.setException(e);
            // 打印错误日志
            log.error("trace:" + trace + "decrBy [key:" + key + "score:" + value + "] fail", e);
        }
        return rd;
    }

    @Override
    public ResultData<?> zadd(String trace, String key, String member, Long score) {

        ResultData<Boolean> rd = new ResultData<Boolean>();
        try {
            redissonClient.getScoredSortedSet(key).add(score, member);
            rd.setResultCode(ThridPartyResCode._0000);
            rd.setResultData(true);
        } catch (Exception e) {
            rd.setResultCode(ThridPartyResCode._0001);
            rd.setResultData(false);
            rd.setException(e);
            // 打印错误日志
            log.error("trace:" + trace + "zadd [key:" + key + "member:" + member + "score:" + score + "] fail", e);
        }
        return rd;
    }

    @Override
    public ResultData<?> zRangeByScore(String trace, String key, long startScore, long endScore, int offset, int count) {
        ResultData<ArrayList<String>> rd = new ResultData<ArrayList<String>>();
        ArrayList<String> resList = new ArrayList<String>();
        try {
            Collection<Object> resData = redissonClient.getScoredSortedSet(key).valueRangeReversed(startScore, true, endScore, true, offset, count);
            Iterator<Object> iterator = resData.iterator();
            while (iterator.hasNext()) {
                resList.add(String.valueOf(iterator.next()));
            }
            rd.setResultCode(ThridPartyResCode._0000);
            rd.setResultData(resList);
        } catch (Exception e) {
            rd.setResultCode(ThridPartyResCode._0001);
            rd.setException(e);
            // 打印错误日志
            log.error("trace:" + trace + "zRangeByScore [key:" + key + "startScore:" + startScore + "endScore:" + endScore + "offset:" + offset + "count:" + count + "] fail", e);
        }
        return rd;
    }

    @Override
    public ResultData<?> zCount(String trace, String key, long startScore, long endScore) {
        ResultData<Long> rd = new ResultData<Long>();
        try {
            Long count = redissonClient.getScoredSortedSet(key).count(startScore, true, endScore, true);
            rd.setResultCode(ThridPartyResCode._0000);
            rd.setResultData(count);
        } catch (Exception e) {
            rd.setResultCode(ThridPartyResCode._0001);
            rd.setException(e);
            // 打印错误日志
            log.error("trace:" + trace + "zCount [key:" + key + "startScore:" + startScore + "endScore:" + endScore + "] fail", e);
        }
        return rd;
    }

    @Override
    public ResultData<?> lPush(String trace, String key, String value) {
        ResultData<Boolean> rd = new ResultData<Boolean>();
        try {
            redissonClient.getDeque(key).add(value);
            rd.setResultCode(ThridPartyResCode._0000);
            rd.setResultData(true);
        } catch (Exception e) {
            rd.setResultCode(ThridPartyResCode._0001);
            rd.setResultData(false);
            rd.setException(e);
            // 打印错误日志
            log.error("trace:" + trace + "zadd [key:" + key + "value:" + value + "] fail", e);
        }
        return rd;
    }

    @Override
    public ResultData<?> rPop(String trace, String key) {
        ResultData<Object> rd = new ResultData<Object>();
        try {
            Object value = redissonClient.getDeque(key).pollLast();
            rd.setResultCode(ThridPartyResCode._0000);
            rd.setResultData(value);
        } catch (Exception e) {
            rd.setResultCode(ThridPartyResCode._0001);
            rd.setResultData(null);
            rd.setException(e);
            // 打印错误日志
            log.error("trace:" + trace + "zadd [key:" + key + "] fail", e);
        }
        return rd;
    }

    @Override
    public ResultData<?> getLock(String trace, String key, long expireTime) {
        ResultData<Boolean> rd = new ResultData<Boolean>();
        Boolean result = false;
        try {
            RLock lock = redissonClient.getReadWriteLock(key).writeLock();
            result = lock.tryLock(1000, expireTime, TimeUnit.MILLISECONDS);
            rd.setResultCode(ThridPartyResCode._0000);
        } catch (Exception e) {
            rd.setResultCode(ThridPartyResCode._0001);
            rd.setException(e);
            // 打印错误日志
            log.error("trace:" + trace + "getLock [key:" + key + "] fail", e);
        }
        rd.setResultData(result);
        return rd;
    }

    @Override
    public ResultData<?> unLock(String trace, String key) {
        ResultData<Boolean> rd = new ResultData<Boolean>();
        Boolean result = false;
        try {
            redissonClient.getReadWriteLock(key).writeLock().unlock();
            rd.setResultCode(ThridPartyResCode._0000);
            result = true;
        } catch (Exception e) {
            rd.setResultCode(ThridPartyResCode._0001);
            rd.setException(e);
            // 打印错误日志
            log.error("trace:" + trace + "unLock [key:" + key + "] fail", e);
        }
        rd.setResultData(result);
        return rd;
    }
}

获取微信AccessToken的service的接口GetAccessTokenService

该接口定义了主动与被动获取access_token方法的api,duboo项目的consumer与provider都依赖它。

package cn.leadeon.thirdparty.service;

import cn.leadeon.thirdparty.base.ResultData;

/**
 * @Copyright (C), 2015-2018, 钢的郭
 * @FileName: GetAccessTokenService
 * @Author: gangdeguo
 * @Date: 2018-11-27 11:45
 * @Description: 获取微信电子发票access_token
 * @Version: 1.0
 */
public interface GetAccessTokenService {
    // 从缓存获取access_token,如果缓存中不存在则从微信服务获取access_token
    ResultData<?> getAccessToken(String trace , String busCode);
    // 从微信服务获取access_token,存到缓存中
    void setAccessToken(String trace , String busCode);
}

获取微信电子发票 access_token service实现类GetAccessTokenServiceImpl

该接口实现类,实现了上节接口定义的方法。具备主动与被动获取access_token的能力。
这个类是最核心的代码。使用分布式的锁来保证所有provider实例的定时任务触发后只能有一个provider实例的定时方法能从微信服务获取access_token并保存到缓存中。并且一旦有一个provider实例成功从微信服务获取access_token并保存到缓存中,那么接下来的半小时所有的provider实例都不用去请求微信服务来再次获取access_token。避免频繁请求微信服务。

该接口实现类中使用SpringBoot的定时任务需要在项目的主启动类上标注解@EnableScheduling开启定任务

package cn.leadeon.thirdparty.service;

import cn.leadeon.thirdparty.base.ResultData;
import cn.leadeon.thirdparty.common.http.LocalHttpClient;
import cn.leadeon.thirdparty.constant.ThridPartyResCode;
import cn.leadeon.thirdparty.log.Log;
import cn.leadeon.thirdparty.pojo.AccessToken;
import com.alibaba.fastjson.JSON;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

/**
 * @Copyright (C), 2015-2018, 钢的郭
 * @FileName: GetAccessTokenServiceImpl
 * @Author: gangdeguo
 * @Date: 2018-11-27 11:47
 * @Description: 获取微信电子发票 access_token  service实现类
 * @Version: 1.0
 */
@Service
public class GetAccessTokenServiceImpl implements GetAccessTokenService {

    /**
     * 分布式锁的时间长度
     */
    private static final int LOCK_TIME = 10;
    /**
     * access_token 的过期时间
     */
    private static final int LOCK_ACCESS_TOKEN_TIME = 60;
    /**
     * access_token_flag 的过期时间
     */
    private static final int LOCK_ACCESS_TOKEN_FLAG_TIME = 30;
    /**
     * 分布式锁的时间单位
     */
    private static final TimeUnit LOCK_TIME_SECONDS = TimeUnit.SECONDS;
    private static final TimeUnit LOCK_TIME_MINUTES = TimeUnit.MINUTES;
    /**
     * 日志
     */
    private static final Log log = new Log(GetAccessTokenServiceImpl.class);

    /**
     * 分布式锁的key
     */
    private static final String LOCK_KEY = "LOCK_KEY";
    /**
     * 缓存中 access_token  的 key
     */
    private static final String ACCESS_TOKEN = "access_token";

    /**
     * 缓存中 access_token_flag 的 key
     */
    private static final String ACCESS_TOKEN_FLAG = "access_token_flag";

    @Autowired
    ThirdCacheServiceImpl thirdCacheServiceImpl;

    @Autowired
    private RedissonClient redissonClient;


    /**
     * 从缓存获取access_token,如果缓存中不存在则从微信服务获取access_token
     *
     * @param trace   流水号
     * @param busCode 业务码
     * @return ResultData
     */
    @Override
    public ResultData<?> getAccessToken(String trace, String busCode) {
        Long startTime = System.currentTimeMillis();
        log.reqPrint(Log.THIRDPARTY_SIGN, Log.THIRDPARTY_REQUEST, trace, busCode, "");
        ResultData<?> resultData;
        try {
            // 从缓存获取 ACCESS_TOKEN_FLAG
            resultData = thirdCacheServiceImpl.get(trace, ACCESS_TOKEN);
            // 缓存响应失败的情况
            if (null == resultData || null == resultData.getResultCode()
                    || !ThridPartyResCode._0000.equals(resultData.getResultCode())) {
                log.respPrint(Log.THIRDPARTY_SIGN, Log.THIRDPARTY_RESPONSE, trace,
                        busCode, System.currentTimeMillis() - startTime, JSON.toJSONString(resultData));
                return resultData;
            }
            // 缓存响应正常,但access_token过期失效的情况
            if (ThridPartyResCode._0000.equals(resultData.getResultCode()) && null == resultData.getResultData()) {
                //主动调用,从微信服务获取access_token,存到缓存中
                setAccessToken(trace, busCode);
                resultData = thirdCacheServiceImpl.get(trace, ACCESS_TOKEN);
            }
        } catch (Exception e) {
            resultData = new ResultData<>();
            resultData.setResultCode(ThridPartyResCode._0001);
            resultData.setDesc("Get accessToken fail...");
            resultData.setException(e);
            log.error("trace:" + trace + ",busCode:" + busCode + "get [key:" + ACCESS_TOKEN_FLAG + "] fail", e);
        }
        log.respPrint(Log.THIRDPARTY_SIGN, Log.THIRDPARTY_RESPONSE, trace,
                busCode, System.currentTimeMillis() - startTime, JSON.toJSONString(resultData));
        return resultData;
    }

    /**
     * 主动调用,从微信服务获取access_token,存到缓存中
     *
     * @param trace   流水号
     * @param busCode 接口号
     */
    @Override
    public void setAccessToken(String trace, String busCode) {
        Long startTime = System.currentTimeMillis();
        log.reqPrint(Log.THIRDPARTY_SIGN, Log.THIRDPARTY_REQUEST, trace, busCode, "");
        //从微信服务获取access_token
        setAccessToken();
        log.respPrint(Log.THIRDPARTY_SIGN, Log.THIRDPARTY_RESPONSE, trace,
                busCode, System.currentTimeMillis() - startTime, "setAccessToken");
    }

    /**
     * 被动调用,定时从微信服务获取access_token
     */
    @Scheduled(cron = "0 0 0/1 * * * ")
    public void setAccessToken() {
        Long startTime = System.currentTimeMillis();
        RLock lock = null;
        boolean res = false;
        try {
            log.info("update access_token begin...");
            // 获取分布式锁
            lock = redissonClient.getFairLock(LOCK_KEY);
            // 尝试加锁,最多等待1秒,加锁成功后10秒钟自动释放锁
            res = lock.tryLock(5,LOCK_TIME, LOCK_TIME_SECONDS);
            if (false==res){
                log.info("get lock failure...");
                return;
            }
            // 如果ACCESS_TOKEN_FLAG不为空,说明30分钟内已经有线程更新过ACCESS_TOKEN,此次不用更新ACCESS_TOKEN
            //if (null != redissonClient.getBucket(ACCESS_TOKEN_FLAG).get()
            //        && null != redissonClient.getBucket(ACCESS_TOKEN).get()) {
            if(null != thirdCacheServiceImpl.get("", ACCESS_TOKEN_FLAG).getResultData()
                    && null !=thirdCacheServiceImpl.get("", ACCESS_TOKEN)){
                log.info("access_token already exits. spend time" + (System.currentTimeMillis() - startTime));
                return;
            }
            //从微信服务获取accessToken
            String accessToken = GetAccessTokenFromWechatServer();
            // 未成功获取到accessToken
            if (null == accessToken) {
                return;
            }
            // 成功获取到accessToken
            // 向缓存中保存access_token,60分钟有效
            redissonClient.getBucket(ACCESS_TOKEN).set(accessToken, LOCK_ACCESS_TOKEN_TIME, LOCK_TIME_MINUTES);
            log.info("update access_token suecss. access_token:"+ accessToken);
            // 设置ACCESS_TOKEN_FLAG为一个不为null的值,30分钟内有效
            redissonClient.getBucket(ACCESS_TOKEN_FLAG).set("something not null", LOCK_ACCESS_TOKEN_FLAG_TIME, LOCK_TIME_MINUTES);
            log.info("update access_token finish,spend time" + (System.currentTimeMillis() - startTime));
        } catch (Exception e) {
            log.info("set access_token error. " + e.toString());
        } finally {
            if (lock != null) {
                //释放锁
                lock.unlock();
            }
        }
    }

    /**
     * 从微信服务获取 access_token
     * @return String access_token
     */
    private static String GetAccessTokenFromWechatServer() {
        HttpUriRequest httpUriRequest = RequestBuilder.get()
                .setUri("https://api.weixin.qq.com" + "/cgi-bin/token")
                .addParameter("grant_type", "client_credential")
                .addParameter("appid", "*******************")      //用自己的appid
                .addParameter("secret", "**********************") //用自己的secret
                .build();
        AccessToken accessToken = LocalHttpClient.executeJsonResult(httpUriRequest, AccessToken.class);
        return accessToken.getAccess_token();
    }
}

写代码遇到的坑

1 注意 分布式锁的key 不要与 缓存中 access_token的key 共用。否则会报错 WRONGTYPE Operation against a key holding the wrong kind of value,查了一下是redis类型冲突,比如redis中保存的为String,如果用hash去获取就会报这个错。但是在反复检查代码后确认没有用错类型。后来才意识到Redisson分布式锁Rlock的底层原理其实是用的redis的SetNx,会在redis创建一个key,Redisson中的Rlock默认是用的hash。而如果我们与access_token共用key的话其实就是String与hash的类型冲突了。关于这一点在Redisson的github中有国外的网友指出了这点,详见以下这篇帖子:
https://github.com/redisson/redisson/issues/480

该帖其中提到了:
Name conflict with different object could cause this bug too. RLock uses Redis map structure in internals, so if you're trying to use lock with name matches with different object you'll get the same error. Here is the code snippet causes this problem:

        String key = String.valueOf("425011000000151");
        redisson.getBucket(key).set("123");

        RLock lc = redisson.getLock(key); 
        lc.lock(1000,TimeUnit.MILLISECONDS);

以下是贴心的英翻中:
不同object的name冲突也可以导致这个bug。Redisson的分布式锁Rlock底层使用的Redis的map类型来保存它的key,所以如果你使用的锁的名字与你的object冲突的话,就会导致相同的错误。上面就是导致这个问题的代码片段。在代码中String 的key与Rlock的key相同,就会导致WRONGTYPE Operation against a key holding the wrong kind of value。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,596评论 18 139
  • 微信服务号开发 整体流程 域名报备,服务器搭建 Python开发环境和项目的初始化搭建; 微信公众号注册及开发模式...
    飞行员suke阅读 4,470评论 0 14
  • 紧合双目却比睁眼现实,炯炯圆睁又如蒙闭愚昧,或许,就是文人和其他人的区别。 人们活在当下,就好比一场恢宏的即兴表演...
    吾欲阅读 250评论 0 1
  • 常用操作ctrl +z退出当前代码命令#表示单行注释R语言中没有多行注释,多行注释可通过if(FALSE){}包含...
    周一ing阅读 1,361评论 0 1