业务背景及需求
- 根据业务发展,需要一个进件平台,该平台提供统一的第三方渠道接入接口规范和公司内部业务系统接入规范,第三方渠道和公司内部业务系统根据接入规范接入到进件平台,进件申请由进件平台做相应处理后分发到对应的内部业务系统。
- 前期平台需提供根据不同渠道进行不同进件申请处理流程的功能:平台需分析公司进件相关业务,归纳,整理提炼出业务公共服务,分析业务属性,转换为无业务区分的最小原子属性单元,不同业务只需组合这些原子属性单元即可形成特有的产品。
- 平台实现公共业务服务,业务系统只需实现产品特有功能。
- 由于内部系统的复杂性,平台需提供灵活的接入方式,并保证进件信息的可靠处理。
设计思路
- 基础框架选用: xService微服务快速开发框架
- 基于RabbitMQ 的延迟队列+死信队列实现进件信息的可靠投递与消费,支持根据不同渠道生成不同渠道交换机,路由KEY和队列,以实现不同渠道业务处理的隔离。
- 设计进件平台通知接口规范,支持内部业务系统基于HTTP和RPC两种方式接入。
- 根据进件业务归纳整理进件信息基本要素并分类,将其作为原子处理单元,为后续不同渠道配置不同处理单元做准备(此处可用Map对象作为参数对象,最为灵活,但是可读性较差)
- 平台提供2份接口文档:第三方进件渠道接入接口文档,内部业务系统接入接口文档。
- 基于XXLJob+EventBus实现准实时异步审批流程。
设计实现
一,进件总体流程图
进件总体流程说明
第三方渠道推送进件申请信息
进件系统接收到进件申请信息,进入进件处理器处理。
-
进件处理器根据渠道配置参数配置的处理器,对信息进行相关校验及和转换等处理,未通过则直接返回进件申请失败,否则进入下一步处理流程,进件申请处理设计如下图:
处理器完成处理,已将进件申请转换为业务系统通知接口需要的参数对象,先保存到数据库,再推送到RabbitMQ,这2步都成功则返回进件申请成功,此时进入异步通知业务系统流程。
进件系统Recever 开始消费MQ中的进件申请通知信息,根据渠道配置的接入方式(默认RPC)向业务信息推送进件信息,如果业务系统响应成功,则更新进件申请表为成功,否则判断渠道是否支持重试且小于最大重试次数,如果是进入重试交换机(配置消息延迟消费),并重新进入工作队列,继续推送,否则更新进件申请表为失败。
失败信息进入死信队列,并由失败处理器处理,记录到失败信息表中。
对于失败信息的处理,可用XXLJob定时任务,按照配置将信息推送到进件申请交换机,再次消费,也可人工干预处理。
进件关键代码实现
-
RabbitMQ相关类结构图
-
进件申请服务:是进件申请的入口,集成了进件申请处理器持有者对象,以及工作生产者对象,将进件申请处理与MQ推送进行了整合
/** * 进件申请服务实现类 * <li></li> * * @author: DuanYong */ @Service public class LoanApplyServiceImpl extends BaseService implements LoanApplyService { ... /** * Handler持有者 */ @Autowired private HandlerHolder handlerHolder; /** * 工作生产者对象 */ @Autowired private WorkProdcer workProdcer; .... /** * 进件申请 * <li></li> * * @param loanApplyReq :进件申请申请请求对象 * @author DuanYong * @return: java.util.Optional<com.javacoo.fund.bean.response.LoanApplyRes> */ @Override @Transactional public Optional<LoanApplyRes> loanApply(LoanApplyReq loanApplyReq) { LogUtil.info("进件申请:{}",loanApplyReq); //初始化进件申请上下文 LoanApplyContext loanApplyContext = getLoanApplyContext(loanApplyReq); //处理申请信息 handlerApply(loanApplyContext); //更新申请信息 updateApply(loanApplyContext); //推送到消息队列 push(loanApplyContext); LogUtil.info("进件申请完成"); return Optional.of(LoanApplyRes.builder().applyNo(loanApplyReq.getApplyNo()).build()); } /** * 进件申请信息处理 * <li></li> * @author DuanYong * @param loanApplyContext:进件申请上下文 * @return: void */ private void handlerApply(LoanApplyContext loanApplyContext) { if(!handlerHolder.getChannelHandlersMap().containsKey(loanApplyContext.getLoanApplyReq().getChannelNo())){ LogUtil.warn("未配置渠道:{},进件申请信息处理器",loanApplyContext.getLoanApplyReq().getChannelNo()); return; } //当前渠道处理器集合 List<Handler> handlers = handlerHolder.getChannelHandlersMap().get(loanApplyContext.getLoanApplyReq().getChannelNo //执行处理 handlers.stream().forEach(handler -> handler.accept(loanApplyContext)); } ..... /** * 推送到消息队列 * <li></li> * @author DuanYong * @param loanApplyContext:进件申请上下文 * @return: void */ private void push(LoanApplyContext loanApplyContext) { workProdcer.convertAndSend(loanApplyContext.getLoanApplyNoticeReq()); LogUtil.info("进件申请->推送到消息队列成功"); } }
-
渠道消费者接口:接口继承ChannelAwareMessageListener ,并新增了启动,停止监听功能,方便后续对不同渠道监听器的灵活控制。
/** * 渠道消费者 * <p>说明:</p> * <li></li> * * @author DuanYong */ public interface IChannelConsumer extends ChannelAwareMessageListener { /** * 重启 * <li></li> * @author DuanYong * @return: void */ default void shutdown() {} /** * 启动消费者监听 * <li></li> * @author DuanYong * @return: void */ default void start(){} /** * 停止消费者监听 * <li></li> * @author DuanYong * @return: void */ default void stop(){} }
-
渠道消费者抽象实现类:是IChannelConsumer 抽象实现类,实现了onMessage方法,定义了消息处理的主流程:接收消息->转换消息为MessageWrapper对象->获取message对象->调用抽象方法process执行消息推送->如果成功则更新申请状态为成功->如果失败则调用消息重试方法->如果发送消息过程中出现异常,则调用消息重试方法->最后此条消息要么消费成功,要么从回消息队列。
/** * 渠道消费者抽象实现类 * <p>说明:</p> * <li></li> * * @author DuanYong */ @Slf4j @Data @SuperBuilder @AllArgsConstructor @NoArgsConstructor public abstract class AbstractIChannelConsumer implements IChannelConsumer { /** 渠道号 */ protected String channelNo; /** 渠道参数 */ protected ChannelParams channelParams; /** SimpleMessageListenerContainer */ protected SimpleMessageListenerContainer container; /** 重试生产者对象 */ protected RetryProdcer retryProdcer; /** 进件申请信息MAPPER */ private ImportApplyInfoMapper importApplyInfoMapper; @Override public void onMessage(Message message, Channel channel) throws Exception { //获取消息 String msg = new String(message.getBody(), Charset.defaultCharset()); log.info("渠道:{}消费者,收到消息:{}",channelNo,msg); //执行推送 Long startTime = System.currentTimeMillis(); MessageWrapper messageWrapper = FastJsonUtil.toBean(msg,MessageWrapper.class); try { log.info("执行推送消息->:开始处理..."); String pushMsg = FastJsonUtil.toJSONString(messageWrapper.getMessage()); log.info("执行推送消息到渠道:{}->参数:{},申请数据:{}", channelNo,channelParams,pushMsg); process(messageWrapper); log.info("执行推送消息到渠道:{},是否成功:{}", channelNo, messageWrapper.isSuccess()); if(messageWrapper.isSuccess()){ //更新申请状态 updateApplyInfoStateSuccess(messageWrapper); log.info("执行推送消息到渠道:{},推送成功, 耗时:{}秒", channelNo,(System.currentTimeMillis() - startTime)/1000.0); }else{ retry(messageWrapper); } }catch(Exception e) { e.printStackTrace(); log.error("渠道消费者->消息处理失败进入重试队列, 耗时:{}秒",(System.currentTimeMillis() - startTime)/1000.0,e); retry(messageWrapper); }finally { //如果消费成功或者重试成功,则确认消息已经消费 if(messageWrapper.isSuccess() || messageWrapper.isRetry()){ //确认消息已经消费 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("渠道消费者->确认消息已经消费:{}, 耗时:{}秒",FastJsonUtil.toJSONString(messageWrapper),(System.currentTimeMillis() - startTime)/1000.0); }else{ //消息消费失败从回队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); log.info("渠道消费者->消息消费失败从回队列:{}, 耗时:{}秒",FastJsonUtil.toJSONString(messageWrapper),(System.currentTimeMillis() - startTime)/1000.0); } } } /** * 消息重试 * <li></li> * @author duanyong@javacoo.com * @param messageWrapper: 消息封装对象 * @return: void */ private void retry(MessageWrapper messageWrapper) { try{ log.error("执行推送消息到渠道:{},推送失败,进入重试队列", channelNo); retryProdcer.convertAndSend(messageWrapper); //重试成功 messageWrapper.setRetry(true); }catch (Exception re){ log.error("渠道消费者->消息处理失败进入重试队列失败:{}", channelNo,re); } } /** * 执行操作 * <li></li> * @author DuanYong * @param messageWrapper: mq消息包装对象 */ protected abstract void process(MessageWrapper messageWrapper); ....... }
- http渠道消费者:继承AbstractIChannelConsumer,实现了process抽象方法,并提供了setMessageListener方法,将此类设置到监听器容器中,大致处理流程如下:业务系统实现平台提供的进件申请通知接口->将服务地址配置到渠道配置参数中->当进申请信息到达时调用业务系统接口推送信息。此种接入方式最为灵活,但效率略低。
/** * 渠道消费者 * <p>说明:</p> * <li>http推送</li> * * @author DuanYong */ @Slf4j @Data @SuperBuilder @AllArgsConstructor @NoArgsConstructor public class HttpChannelConsumer extends AbstractIChannelConsumer{ /** 远程服务客户端工厂接口 */ protected RemoteClientFactory remoteClientFactory; /** * 执行操作 * <li>http接入</li> * * @param messageWrapper : mq消息包装对象 * @author DuanYong */ @Override protected void process(MessageWrapper messageWrapper){ log.info("HttpChannelConsumer执行推送消息->:开始处理..."); String pushMsg = FastJsonUtil.toJSONString(messageWrapper.getMessage()); log.info("HttpChannelConsumer执行推送消息到渠道:{}->参数:{},申请数据:{}", channelNo,channelParams,pushMsg); TargetConfig targetConfig = TargetConfig.builder() .url(channelParams.getPushUrl()) .connectionTimeout(channelParams.getConnectionTimeout()) .period(channelParams.getPeriod()) .retryCount(channelParams.getRetryCount()) .socketTimeout(channelParams.getSocketTimeout()) .build(); JSONObject resultJson = remoteClientFactory.getClient(LoanApplyPushClient.class,targetConfig).pushLoanApply(pushMsg); log.info("HttpChannelConsumer执行推送消息到渠道:{},响应:{}", channelNo, resultJson); if(resultJson != null && Constants.LOAN_APPLY_PUSH_RETURN_STATE_CODE.equals(resultJson.getString(Constants.LOAN_APPLY_PUSH_RETURN_STATE))){ messageWrapper.setSuccess(true); } if(resultJson != null){ messageWrapper.setRemark(StringUtils.join(resultJson.getString(Constants.LOAN_APPLY_PUSH_RETURN_CODE),"_",resultJson.getString(Constants.LOAN_APPLY_PUSH_RETURN_DESC))); } } /** * 设置消息监听 * <li></li> * @author DuanYong * @return: com.javacoo.fund.mq.consumer.HttpChannelConsumer */ public HttpChannelConsumer setMessageListener(){ this.container.setMessageListener(this); return this; } }
-
RPC渠道消费者:业务系统依赖平台提供的进件申请通知接口API包,并实现接口,注册为dubbo服务(根据约定,注册服务时,需要指定group=渠道号_NOTICE_SERVICE),平台则根据消息所带渠道信息,动态路由获取对应渠道通知接口,推送消息,此种接入方式受到一定的限制,有失灵活性,但是效率较高。
/** * 渠道消费者 * <p>说明:</p> * <li>RPC推送:基于dubbo接口调用</li> * * @author DuanYong */ @Slf4j @Data @SuperBuilder @AllArgsConstructor @NoArgsConstructor public class RpcChannelConsumer extends AbstractIChannelConsumer{ /** * dubbo服务帮助类 */ @Autowired protected DubboServiceHelper dubboServiceHelper; /** * 执行操作 * <li>RPC推送</li> * * @param messageWrapper : mq消息包装对象 * @author DuanYong * @return: boolean 是否成功 */ @Override protected void process(MessageWrapper messageWrapper){ log.info("RpcChannelConsumer执行推送消息->:开始处理..."); String pushMsg = FastJsonUtil.toJSONString(messageWrapper.getMessage()); log.info("RpcChannelConsumer执行推送消息到渠道:{}->参数:{},申请数据:{}", channelNo,channelParams,pushMsg); if(!dubboServiceHelper.getChannelLoanApplyNoticeServiceMap().containsKey(channelNo)){ log.error("未配置渠道:{},通知服务",channelNo); messageWrapper.setRemark("未配置渠道:"+channelNo+"通知服务"); return; } //获取通知服务 LoanApplyNoticeService loanApplyNoticeService = dubboServiceHelper.getChannelLoanApplyNoticeServiceMap().get(channelNo).get(); if(loanApplyNoticeService == null){ log.error("无法获取渠道:{},配置的通知服务",channelNo); messageWrapper.setRemark("无法获取渠道:"+channelNo+"配置的通知服务"); return; } LoanApplyNoticeReq loanApplyNoticeReq = getLoanApplyNoticeReq(messageWrapper); BaseResp baseResp = loanApplyNoticeService.notice(loanApplyNoticeReq); log.info("RpcChannelConsumer执行推送消息到渠道:{},响应:{}", channelNo, baseResp); if(baseResp != null && baseResp.getState()){ messageWrapper.setSuccess(true); } if(baseResp != null){ messageWrapper.setRemark(StringUtils.join(baseResp.getCode(),"_",baseResp.getDesc())); } } /** * 获取进件申请请求对象 * <li></li> * @author DuanYong * @param messageWrapper: mq消息包装对象 * @return: com.javacoo.common.operate.request.outer.LoanApplyNoticeReq */ private LoanApplyNoticeReq getLoanApplyNoticeReq(MessageWrapper messageWrapper) { return FastJsonUtil.toBean(FastJsonUtil.toJSONString(messageWrapper.getMessage()),LoanApplyNoticeReq.class); } /** * 设置消息监听 * <li></li> * @author DuanYong * @return: com.javacoo.fund.mq.consumer.HttpChannelConsumer */ public RpcChannelConsumer setMessageListener(){ this.container.setMessageListener(this); return this; } }
-
渠道消费工厂:应用启动时,根据接入的渠道信息动态创建渠道消费者对象并启动监听
/** * 渠道消费工厂 * <p>说明:</p> * <li></li> * * @author DuanYong */ @Slf4j @Component public class ChannelConsumerFactory { /** * 重试生产者对象 */ @Autowired private RetryProdcer retryProdcer; /** 远程服务客户端工厂接口 */ @Autowired protected RemoteClientFactory remoteClientFactory; /** * dubbo服务帮助类 */ @Autowired protected DubboServiceHelper dubboServiceHelper; /** 进件申请信息MAPPER */ @Autowired private ImportApplyInfoMapper importApplyInfoMapper; @Autowired private CachingConnectionFactory connectionFactory; @Autowired private RabbitAdmin rabbitAdmin; /** 渠道配置*/ @Autowired private ChannelConfig channelConfig; @PostConstruct private void init(){ channelConfig.getChannels().forEach((key,value)-> doStartChannelConsumer(key,value)); } /** * 执行启动消费者 * <li></li> * @author DuanYong * @param channelNo: 渠道 * @param channelParams: 渠道参数 * @return: void */ private void doStartChannelConsumer(String channelNo , ChannelParams channelParams) { try { log.info("执行启动消费者:{},渠道参数:{}",channelNo,channelParams); DynamicConsumerContainerFactory fac = DynamicConsumerContainerFactory.builder() .channelNo(channelNo) .rabbitAdmin(rabbitAdmin) .connectionFactory(connectionFactory) .build(); //创建消费者 IChannelConsumer consumer = PushType.HTTP.equals(channelParams.getPushType()) ? HttpChannelConsumer.builder() .remoteClientFactory(remoteClientFactory) .channelNo(channelNo) .channelParams(channelParams) .container(fac.getObject()) .retryProdcer(retryProdcer) .importApplyInfoMapper(importApplyInfoMapper) .build() .setMessageListener() : RpcChannelConsumer.builder() .dubboServiceHelper(dubboServiceHelper) .channelNo(channelNo) .channelParams(channelParams) .container(fac.getObject()) .retryProdcer(retryProdcer) .importApplyInfoMapper(importApplyInfoMapper) .build() .setMessageListener(); //启动消费者 consumer.start(); } catch (Exception e) { e.printStackTrace(); log.error("启动消费者异常:{}",channelNo,e); } } }
-
动态消费者容器工厂:实现了FactoryBean接口getObject方法,定义了工作,重试相关交换机,队列,实现了根据接入渠道编号生成对应的交换机和队列,路由key,如:
"渠道号_work_direct_exchange","渠道号_work_queue","渠道号_work_routing_key"
并动态申明到RabbitAdmin 中,最后构建SimpleMessageListenerContainer对象并设置相关参数/** * 动态消费者容器工厂 * <p>说明:</p> * <li></li> * * @author DuanYong */ @Slf4j @Data @Builder public class DynamicConsumerContainerFactory implements FactoryBean<SimpleMessageListenerContainer> { /** 渠道号*/ private String channelNo; /** 连接工厂 */ private ConnectionFactory connectionFactory; /** rabbitAdmin */ private RabbitAdmin rabbitAdmin; /** 消费者数量 */ private Integer concurrentNum; // 消费方 private IChannelConsumer consumer; /** * 构建工作队列 * <li></li> * @author DuanYong * @return: org.springframework.amqp.core.Queue */ private Queue buildWorkQueue() { if (StringUtils.isBlank(channelNo)) { throw new IllegalArgumentException("渠道号不能为空"); } //队列名称 String queue = StringUtils.join(channelNo, Constants.WORK_QUEUE); //交换机 String exchange = StringUtils.join(channelNo, Constants.WORK_ERROR_DIRECT_EXCHANGE); //路由key String routingKey = StringUtils.join(channelNo, Constants.WORK_ROUTING_KEY); log.info("构建工作队列,队列名称:{},交换机:{},路由key:{}",queue,exchange,routingKey); return QueueBuilder.durable(queue) // DLX,dead letter发送到的exchange ,设置死信队列交换器到处理交换器 .withArgument(Constants.DEAD_LETTER_EXCHANGE_KEY, exchange) // dead letter携带的routing key,配置处理队列的路由key .withArgument(Constants.DEAD_LETTER_ROUTING_KEY, routingKey) .build(); } /** * 构建工作定向队列交换机 * <li></li> * @author DuanYong * @return: org.springframework.amqp.core.Exchange */ private Exchange buildWorkExchange() { //交换机 String exchange = StringUtils.join(channelNo, Constants.WORK_DIRECT_EXCHANGE); log.info("构建工作定向队列交换机,交换机:{}}",exchange); //durable(true) 持久化,mq重启之后交换机还在 return ExchangeBuilder.directExchange(exchange).durable(true).build(); } /** * 队列绑定交换机 * <li></li> * @author DuanYong * @param queue: 信息队列 * @param exchange: 定向队列交换机 * @return: org.springframework.amqp.core.Binding */ private Binding bindWork(Queue queue, Exchange exchange) { //路由key String routingKey = StringUtils.join(channelNo, Constants.WORK_ROUTING_KEY); return BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs(); } /** * 重试的队列 * <li>超时,死信队列,实现重试机制</li> * @author DuanYong * @return: org.springframework.amqp.core.Queue */ public Queue buildRetryQueue() { //队列名称 String queue = StringUtils.join(channelNo, Constants.RETRY_QUEUE); //交换机 String exchange = StringUtils.join(channelNo, Constants.WORK_DIRECT_EXCHANGE); //路由key String routingKey = StringUtils.join(channelNo, Constants.WORK_ROUTING_KEY); log.info("构建重试的队列,队列名称:{},交换机:{},路由key:{}",queue,exchange,routingKey); // 设置超时队列 return QueueBuilder.durable(queue) // DLX,dead letter发送到的exchange ,设置死信队列交换器到处理交换器 .withArgument(Constants.DEAD_LETTER_EXCHANGE_KEY, exchange) // dead letter携带的routing key,配置处理队列的路由key .withArgument(Constants.DEAD_LETTER_ROUTING_KEY, routingKey) // 设置过期时间 //.withArgument(X_MESSAGE_TTL_KEY, QUEUE_EXPIRATION) .build(); } /** * 申请重试的交换器。 * <li></li> * @author DuanYong * @return: org.springframework.amqp.core.Exchange */ public Exchange buildRetryExchange() { //交换机 String exchange = StringUtils.join(channelNo, Constants.RETRY_EXCHANGE); log.info("构建工作定向队列交换机,交换机:{}}",exchange); //durable(true) 持久化,mq重启之后交换机还在 return ExchangeBuilder.directExchange(exchange).durable(true).build(); } /** * 绑定重试队列到重试交换机 * <li></li> * @author DuanYong * @date 2021/6/4 9:27 * @param queue: 重试队列 * @param exchange: 重试交换机 * @return: org.springframework.amqp.core.Binding */ public Binding buildrRetryBinding(Queue queue,Exchange exchange){ //路由key String routingKey = StringUtils.join(channelNo, Constants.RETRY_KEY); return BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs(); } /** * 校验 * <li></li> * @author DuanYong * @return: void */ private void check() { if (null == rabbitAdmin || null == connectionFactory) { throw new IllegalArgumentException("rabbitAdmin,connectionFactory 不能为空!"); } } @Override public SimpleMessageListenerContainer getObject() throws Exception { //校验 check(); //构建工作队列 Queue workQueue = buildWorkQueue(); //构建工作定向队列交换机 Exchange workExchange = buildWorkExchange(); //队列绑定交换机 Binding workBinding = bindWork(workQueue, workExchange); //申明队列,交换机,绑定 rabbitAdmin.declareQueue(workQueue); rabbitAdmin.declareExchange(workExchange); rabbitAdmin.declareBinding(workBinding); //构建重试队列 Queue retryQueue = buildRetryQueue(); //构建重试定向队列交换机 Exchange retryExchange = buildRetryExchange(); //队列绑定交换机 Binding retryBinding = buildrRetryBinding(retryQueue, retryExchange); //申明队列,交换机,绑定 rabbitAdmin.declareQueue(retryQueue); rabbitAdmin.declareExchange(retryExchange); rabbitAdmin.declareBinding(retryBinding); //构建SimpleMessageListenerContainer SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setAmqpAdmin(rabbitAdmin); container.setConnectionFactory(connectionFactory); container.setQueues(workQueue,retryQueue); container.setPrefetchCount(20); container.setConcurrentConsumers(concurrentNum == null ? 1 : concurrentNum); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); if (null != consumer) { container.setMessageListener(consumer); } return container; } @Override public Class<?> getObjectType() { return SimpleMessageListenerContainer.class; } }
-
Dubbo服务帮助类:此类启动时根据渠道配置参数初始化 渠道->进件申请通知服务MAP,主要是根据渠道RPC配置参数对dubbo的ReferenceConfig对象进行配置,实现不同渠道绑定不同的ReferenceConfig对象,从而达到根据渠道调用不同实现的目的。
/** * Dubbo服务帮助类 * <li></li> * * @author: DuanYong */ @Data @Component public class DubboServiceHelper { /** * 渠道配置参数 */ @Autowired private ChannelConfig channelConfig; /** * 渠道->进件申请通知服务MAP */ private Map<String, ReferenceConfig<LoanApplyNoticeService>> channelLoanApplyNoticeServiceMap = new HashMap<>(2); @PostConstruct private void init(){ channelConfig.getChannels().forEach((key,value)-> doInitChannelLoanApplyNoticeServiceMap(key,value)); } /** * 初始化渠道->进件申请通知服务MAP * <li></li> * @author DuanYong * @param channelNo: 渠道号 * @param channelParams: 渠道配置 * @return: void */ private void doInitChannelLoanApplyNoticeServiceMap(String channelNo, ChannelParams channelParams) { if(channelParams.getRpc() == null){ return; } RpcConfig rpcConfig = channelParams.getRpc(); ReferenceConfig referenceConfig = new ReferenceConfig<>(); referenceConfig.setRegistry(new RegistryConfig(rpcConfig.getAddress())); referenceConfig.setInterface(LoanApplyNoticeService.class); referenceConfig.setVersion(rpcConfig.getVersion()); referenceConfig.setGroup(rpcConfig.getGroup()); referenceConfig.setRetries(-1); channelLoanApplyNoticeServiceMap.put(channelNo, referenceConfig); } }
-
申请信息生产者对象:根据渠道编号动态组装交换机名称和路由KEY,实现将进件申请通知请求消息发送到渠道对应的交换机。
/** * 申请信息生产者对象 * <li></li> * @author DuanYong */ @Slf4j @Component public class WorkProdcer { @Resource private RabbitTemplate rabbitTemplate; /** * 转换并推送申请信息到MQ服务器 * <li></li> * @author DuanYong * @param loanApplyNoticeReq: 进件申请通知对象 * @return: void */ public void convertAndSend(final LoanApplyNoticeReq loanApplyNoticeReq) { //交换机 String exchange = StringUtils.join(loanApplyNoticeReq.getChannelNo(), Constants.WORK_DIRECT_EXCHANGE); //路由key String routingKey = StringUtils.join(loanApplyNoticeReq.getChannelNo(), Constants.WORK_ROUTING_KEY); rabbitTemplate.convertAndSend(exchange, routingKey, FastJsonUtil.toJSONString(MessageWrapper.builder() .channelNo(loanApplyNoticeReq.getChannelNo()) .message(loanApplyNoticeReq) .retryCount(new AtomicInteger(0)) .createTime(DateUtil.dateToString(new Date())) .messageId(loanApplyNoticeReq.getTransactionSn()) .build())); log.info("[申请信息生产者对象]->消息发送完成:{}",FastJsonUtil.toJSONString(loanApplyNoticeReq)); } }
-
重试生产者对象:首选判断是否支持重试,如果不支持则直接返回,否则根据渠道编号动态组装交换机名称和路由KEY,并设置消息过期时间,实现将进件申请通知请求消息发送到渠道对应的重试交换机,消息过期后,将重新进入绑定的工作队列,重新执行消息对象流程,达到重试的目的。
/** * 重试生产者对象 * <li></li> * * @author: DuanYong */ @Slf4j @Component public class RetryProdcer { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitProperties rabbitProperties; /** * 失败信息生产者对象 */ @Autowired private FailedProdcer failedProdcer; /** * 转换并推送信息到MQ服务器 * <li></li> * @author DuanYong * @param messageWrapper: 在线申请包装对象 * @return: void */ public void convertAndSend(final MessageWrapper messageWrapper) { //不支持重试 if(!rabbitProperties.getListener().getSimple().getRetry().isEnabled()){ failedProdcer.convertAndSend(messageWrapper); log.info("[重试生产者对象]->不支持重试,进入失败队列->{}", FastJsonUtil.toJSONString(messageWrapper)); return; } //重试次数大于最大次数 if(messageWrapper.getRetryCount().get() >= rabbitProperties.getListener().getSimple().getRetry().getMaxAttempts()){ failedProdcer.convertAndSend(messageWrapper); log.info("[重试生产者对象]->重试次数大于最大次数:{},进入失败队列->{}",rabbitProperties.getListener().getSimple().getRetry().getMaxAttempts(),FastJsonUtil.toJSONString(messageWrapper)); return; } //重试次数+1 messageWrapper.getRetryCount().incrementAndGet(); //交换机 String exchange = StringUtils.join(messageWrapper.getChannelNo(), Constants.RETRY_EXCHANGE); //路由key String routingKey = StringUtils.join(messageWrapper.getChannelNo(), Constants.RETRY_KEY); rabbitTemplate.convertAndSend(exchange, routingKey, FastJsonUtil.toJSONString(messageWrapper),message -> { //乘子 int multiplier = messageWrapper.getMultiplier() != 0 ? messageWrapper.getMultiplier() : Double.valueOf(rabbitProperties.getListener().getSimple().getRetry().getMultiplier()).intValue(); //间隔时间 int interval = messageWrapper.getInterval() !=0 ? messageWrapper.getInterval() : Double.valueOf(rabbitProperties.getListener().getSimple().getRetry().getInitialInterval().get( ChronoUnit.SECONDS)).intValue(); // 设置延迟毫秒值: retryCount*multiplier*interval int expir = messageWrapper.getRetryCount().get() * multiplier * interval * 1000; log.info("[重试生产者对象]->消息ID:{},重试次数:{},过期时间:{}秒",messageWrapper.getMessageId(),messageWrapper.getRetryCount().get(),expir / 1000); message.getMessageProperties().setExpiration(String.valueOf(expir)); return message; }); log.info("[重试生产者对象]->消息发送完成:{}", FastJsonUtil.toJSONString(messageWrapper)); } }
-
失败信息生产者对象:消息在经历配置的重试次数后,如果任然失败,则由该类处理,此类直接向失败交换机发送消息,进入消息失败队列,等待异常信息接收器处理。
/** * 失败信息生产者对象 * <li></li> * * @author: DuanYong * @since: 2021/6/4 14:52 */ @Slf4j @Component public class FailedProdcer { @Autowired private RabbitTemplate rabbitTemplate; /** * 转换并推送信息到MQ服务器 * <li></li> * @author DuanYong * @param messageWrapper: 在线申请包装对象 * @return: void */ public void convertAndSend(final MessageWrapper messageWrapper) { rabbitTemplate.convertAndSend(ConsumeFailedConfig.FAILED_EXCHANGE, ConsumeFailedConfig.FAILED_KEY, FastJsonUtil.toJSONString(messageWrapper)); log.info("[失败信息生产者对象]->消息发送完成:{}", FastJsonUtil.toJSONString(messageWrapper)); } }
-
异常信息接收器:首先保存异常进件申请信息,然后更新更新申请状态为失败,最后设置此消息处理成功,如果出现异常,则重回队列。
/** * 异常信息接收器 * <li></li> * @author DuanYong */ @Slf4j @Component public class FailedConsumer { /** 序列号生成器 */ @Resource private Sequence sequence; /** MQ异常信息MAPPER */ @Autowired private ImportMqErrorInfoMapper importMqErrorInfoMapper; /** 进件申请信息MAPPER */ @Autowired private ImportApplyInfoMapper importApplyInfoMapper; @RabbitListener(queues = ConsumeFailedConfig.FAILED_QUEUE) public void process(String info, Channel channel, Message message) throws Exception { try{ Long startTime = System.currentTimeMillis(); log.info("异常信息接收器收到消息->:{},时间:{}", info ,new Date()); MessageWrapper messageWrapper = FastJsonUtil.toBean(info,MessageWrapper.class); //入异常库 save(messageWrapper); //更新申请状态为失败 updateApplyInfoStateFail(messageWrapper); //消费成功 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("异常信息接收器->处理成功, 耗时:{}秒",(System.currentTimeMillis() - startTime)/1000.0); }catch (Exception e){ e.printStackTrace(); log.error("异常信息接收器异常",e); //重回队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } ....... }
处理器相关类结构图
-
处理器接口:继承Consumer接口,并提供默认排序方法。
/** * 处理器接口 * <li></li> * * @author: DuanYong */ public interface Handler<T> extends Consumer<T> { /** * 排序 * <li>自然排序</li> * @author DuanYong * @return: int */ default int order() {return 0;} }
-
Handler持有者:该对象初始化时,根据渠道配置参数初始 渠道->Handler集合 对照Map,以实现根据不同渠道执行不同的处理流程。
/** * Handler持有者 * <li></li> * * @author: DuanYong */ @Slf4j @Data @Component public class HandlerHolder { /** * 渠道配置参数 */ @Autowired private ChannelConfig channelConfig; /** * Handler 实现类map */ @Autowired private Map<String, Handler> handlerMap; /** * 渠道->CheckHandler集合 对照Map */ private Map<String, List<Handler>> channelHandlersMap = new HashMap<>(5); @PostConstruct private void init(){ channelConfig.getChannels().forEach((key,value)-> doInitChannelHandlersMap(key,value)); } /** * 初始化channelHandlersMap * <li></li> * @author DuanYong * @param channelNo: 渠道号 * @param channelParams: 渠道配置参数 * @return: void */ private void doInitChannelHandlersMap(String channelNo, ChannelParams channelParams) { List<Handler> handlers = new ArrayList<>(10); //文件处理器集合 List<Handler> fileInfoHandlers = getHandlers(channelParams.getFileInfoHandlers()); handlers.addAll(fileInfoHandlers); //认证处理器集合 List<Handler> authInfoHandlers = getHandlers(channelParams.getAuthInfoHandlers()); handlers.addAll(authInfoHandlers); //基本信息处理器集合 List<Handler> baseInfoHandlers = getHandlers(channelParams.getBaseInfoHandlers()); handlers.addAll(baseInfoHandlers); log.info("渠道:{},处理器集合:{}",channelNo,handlers); channelHandlersMap.put(channelNo,handlers); } /** * 获取处理器集合 * <li></li> * @author DuanYong * @param handlerNamesString: 处理器名称字符串集合 * @return: java.util.List<com.javacoo.fund.handler.Handler> */ private List<Handler> getHandlers(String handlerNamesString) { if(StringUtils.isBlank(handlerNamesString)){ return Collections.emptyList(); } String[] checkHandlerNames = handlerNamesString.split(","); List<Handler> handlers = new ArrayList<>(5); for(String checkHandlerName : checkHandlerNames){ if(handlerMap.containsKey(checkHandlerName)){ handlers.add(handlerMap.get(checkHandlerName)); } } //自然排序 List<Handler> sortedHandlers = handlers.stream().sorted(Comparator.comparing(Handler::order)).collect(Collectors.toList()); return sortedHandlers; }
-
处理器抽象实现类:实现了accept方法,并定义了校验,转换2个抽象方法,在accept方法中将依次进行校验和转换。
/** * 处理器抽象实现类 * <li>定义抽象方法:校验,转换</li> * * @author: DuanYong */ @Slf4j public abstract class AbstractHander<T> implements Handler<T> { @Override public void accept(T t) { //检查 check(t); //转换 convert(t); } /** * 校验 * <li></li> * @author DuanYong * @date 2021/6/21 11:49 * @param t: 参数 * @return: void */ protected abstract void check(T t); /** * 转换 * <li></li> * @author DuanYong * @param t: 参数 * @return: void */ protected abstract void convert(T t); }
-
文件信息处理器抽象实现类:处理器之一,专门对文件进行相关处理,实现了文件处理的公共方法,定义了获取文件类型的抽象方法getFileTypeEnum,供具体子类实现。
/** * 文件信息处理器抽象实现类 * <li></li> * * @author: DuanYong */ @Slf4j public abstract class AbstractFileInfoHander extends AbstractHander<LoanApplyContext> { /** * 文件Mapper */ @Autowired private ImportFileInfoMapper importFileInfoMapper; /** * 校验 * <li></li> * * @param loanApplyContext : 参数 * @author DuanYong * @return: void */ @Override public void check(LoanApplyContext loanApplyContext) { //检查是否存在该文件 exist(loanApplyContext); } /** * 转换 * <li></li> * * @param loanApplyContext : 参数 * @author DuanYong * @return: void */ @Override protected void convert(LoanApplyContext loanApplyContext) { FileInfo fileInfo = FileInfo.builder() .fileDesc(loanApplyContext.getImportFileInfo().getFileDesc()) .fileName(loanApplyContext.getImportFileInfo().getFileName()) .fileType(loanApplyContext.getImportFileInfo().getFileType()) .fileUrl(loanApplyContext.getImportFileInfo().getFileUrl()) .level(getFileTypeEnum().getLevel()) .build(); loanApplyContext.getLoanApplyNoticeReq().getFileInfos().add(fileInfo); } /** * 获取文件类型 * <li></li> * @author DuanYong * @return: com.javacoo.fund.enums.FileTypeEnum */ protected abstract FileTypeEnum getFileTypeEnum(); /** * 检查是否存在该文件 * <li></li> * @author DuanYong * @param loanApplyContext: 进件申请上下文 * @return: void */ private void exist(LoanApplyContext loanApplyContext){ //查询条件:申请号+渠道号+文件类型 ImportFileInfoExample example = new ImportFileInfoExample(); example.createCriteria() .andApplyNoEqualTo(loanApplyContext.getLoanApplyReq().getApplyNo()) .andChannelNoEqualTo(loanApplyContext.getLoanApplyReq().getChannelNo()) .andFileTypeEqualTo(getFileTypeEnum().getCode()); example.setOrderByClause("CREATE_TIME DESC"); List<ImportFileInfo> importFileInfos = importFileInfoMapper.selectByExample(example); if(CollectionUtils.isEmpty(importFileInfos)){ throw new BusinessException("申请号:{},下对应的:{}文件不存在",new String[]{loanApplyContext.getLoanApplyReq().getApplyNo(),getFileTypeEnum().getValue()}); } log.info("检查是否存在该文件->条件,ApplyNo:{},ChannelNo:{},FileType:{},总数:{}",loanApplyContext.getLoanApplyReq().getApplyNo(),loanApplyContext.getLoanApplyReq().getChannelNo(),getFileTypeEnum().getCode(),importFileInfos.size()); loanApplyContext.setImportFileInfo(importFileInfos.get(0)); } }
-
身份证正面照处理器:文件处理器之一,专门处理身份证正面照。
/** * 身份证正面照处理器 * <li></li> * * @author: DuanYong * @since: 2021/6/21 16:55 */ @Service(value = Handler.FILE_INFO_SERVICE_IDCARD_KEY) public class IdCardHander extends AbstractFileInfoHander { /** * 获取文件类型 * <li></li> * * @author DuanYong * @return:FileTypeEnum */ @Override protected FileTypeEnum getFileTypeEnum() { return FileTypeEnum.IDCARD_FRONT; } }
-
渠道配置参数:配置信息对象
/** * 渠道参数 * <li></li> * * @author: DuanYong */ @Data @ToString public class ChannelParams { /** * 渠道连接 */ private String url; /** * token 过期时间,单位秒 */ private int tokenExpireSeconds; /** * 推送类型:默认rpc */ private PushType pushType = PushType.RPC; /** * rpc配置 */ private RpcConfig rpc; /** * 贷款申请推送地址 */ private String pushUrl; /** * 重试次数 */ private int retryCount = RemoteSetting.DEFAULT_RETRY_COUNT; /** * 重试周期(单位:毫秒) */ private long period = RemoteSetting.DEFAULT_PERIOD; /** * 数据传输处理时间(单位:毫秒) */ private int socketTimeout = RemoteSetting.DEFAULT_SOCKET_TIMEOUT; /** * 建立连接的timeout时间(单位:毫秒) */ private int connectionTimeout = RemoteSetting.DEFAULT_CONN_TIMEOUT; /** * 基本信息处理器集合:对应处理器spring bean名称,逗号分隔 */ private String baseInfoHandlers; /** * 文件信息处理器集合:对应处理器spring bean名称,逗号分隔 */ private String fileInfoHandlers; /** * 认证信息处理器集合:对应处理器spring bean名称,逗号分隔 */ private String authInfoHandlers; }
-
配置示例
#XXX渠道配置 #推送地址 #app.config.channels.XXX.pushUrl = https://loan-web-dev.corp.javacoo.com #RPC配置-版本号 app.config.channels.XXX.rpc.version = ${api.dubbo.version} #RPC配置-dubbo地址 app.config.channels.XXX.rpc.address = ${dubbo.registry.address} #RPC配置-分组 格式:渠道号_NOTICE_SERVICE app.config.channels.XXX.rpc.group = XXX_NOTICE_SERVICE #基本信息校验处理器集合 app.config.channels.XXX.baseInfoHandlers = customerInfoHander,homeInfoHander,workInfoHander,contactInfoHander,loanInfoHander #文件信息校验处理器集合 app.config.channels.XXX.fileInfoHandlers = idCardHander,idCardBackHander,faceHander,creditAuthHander,dataAuthHander #认证信息校验处理器集合 app.config.channels.XXX.authInfoHandlers = policeAuthHander,faceAuthHander,idCardOcrAuthHander,idCardBackOcrAuthHander
二,异步审批流程设计
流程说明
-
业务系统收到进件系统推送过来的进件信息,首先执行相关进件申请数据保存任务(根据具体业务类型而异),如:建立用户体系,保存认证信息,保存附件信息,保存扩展信息,新建定时任务(后续异步流程)等,异步通知延迟执行,这一系列任务完成后将返回进件系统推送接收成功消息,推送完成,实现类图如下:
此时定时任务表已经建立定时任务,将由XXLJob定时执行,不过由于XXLJob有一定的时间延迟,所以这里利用EventBus通知XXLJob任务实现类立即执行后续异步任务。
XXLJob任务实现类执行任务,可由XXLJob定时任务调度执行,也可以由EventBus通知执行,考虑到XXLJob定时任务调度和EventBus通知可能存在的并发,在执行方法中,采用了分布式锁机+数据库乐观锁机制保证了任务只能被执行一次,实现类图如下:
- TaskJobHandler实现类,实现了任务的处理,分为以下处理器:征信报告查询,公积金信息查询,计算客户分类,额度试算,订单创建。每个处理器执行完成后,将在数据库创建下一个任务,如果都成功,则由EventBus通知XXLJob任务实现类立即执行下一个处理器,如果失败,则更新当前任务为失败,且执行次数加1。如果失败次数达到设定的最大次数,则不再执行此任务,系统将创建失败订单,结束此时进件申请,实现类图如下:
关键代码实现
-
进件申请通知服务接口实现:
/** * 进件申请通知服务 * <li></li> * * @author: DuanYong */ public interface LoanApplyNoticeService { /** * 贷款申请通知 * <li></li> * @author DuanYong * @param loanApplyNoticeReq: 贷款申请请求对象 * @return: com.javacoo.common.operate.response.BaseResponse */ BaseResp notice(LoanApplyNoticeReq loanApplyNoticeReq); }
-
进件申请通知服务接口实现:
... /** * 进件申请服务实现 * <li></li> * * @author: DuanYong */ @Slf4j @Service("loanApplyNoticeService") public class LoanApplyNoticeServiceImpl extends BaseService implements LoanApplyNoticeService { ... /** * 进件申请通知 * <li></li> * * @param loanApplyNoticeReq : 进件申请请求对象 * @author DuanYong * @return: com.javacoo.common.operate.response.BaseResponse */ @Override public BaseResp notice(LoanApplyNoticeReq loanApplyNoticeReq) { log.info("贷款申请通知:{}", FastJsonUtil.toJSONString(loanApplyNoticeReq)); try{ //初始化贷款申请上下文对象 LoanApplyContext loanApplyContext = getLoanApplyContext(loanApplyNoticeReq); //建立用户体系 register(loanApplyContext); //保存认证信息 saveAuthInfo(loanApplyContext); //保存附件信息 saveFileInfo(loanApplyContext); //保存扩展信息(公司信息,家庭信息,联系人信息) saveExtendInfo(loanApplyContext); //插入定时任务 ImportTask importTask = insertTask(loanApplyContext); //异步通知延迟执行 EventBusUtil.getInstance().post(importTask,1000); log.info("贷款申请通知 成功, 耗时:{}秒",(System.currentTimeMillis() - loanApplyContext.getStartTime())/1000.0); return BaseResp.ok(); }catch (Exception e){ log.error("贷款申请失败:",e); return BaseResp.fail("贷款申请失败"); } } ... }
-
定时服务服务基类实现:
/** * job服务基类 * <li></li> * @author DuanYong */ @Slf4j public abstract class BaseJobService<T> { /**业务时间key*/ protected static final String BIZ_DATE = "bizDate"; /**事务管理器*/ @Autowired private PlatformTransactionManager transactionManager; /**业务时间*/ protected String bizDate; /** * 执行 * <li></li> * @author DuanYong * @param param: 参数 * @return: void */ public final void execute(String param) { getJobTask(param).ifPresent(this::accept); } /** * 获取JOB任务 * <li></li> * @author DuanYong * @param param:控制台参数 * @return: java.util.Optional<java.util.List<T>> */ protected abstract Optional<List<T>> getJobTask(String param); /** * 执行单个任务 * <li></li> * @author DuanYong * @param task:任务 * @return: void */ protected void doExecute(T task){}; /** * 批量执行任务 * <li></li> * @author DuanYong * @param taskList: 任务集合 * @return: void */ protected void doExecute(List<T> taskList){}; /** * 是否支持并行执行 * <li>默认false</li> * @author DuanYong * @return: boolean */ protected boolean parallel(){ return false; } /** * 是否支持批量执行 * <li>默认false</li> * @author DuanYong * @return: boolean */ protected boolean batch(){ return false; } /** * 将xxljob控制台传递的参数封装到map中 * <li>控制台参数输入格式:a:xxx;b:xxx</li> * @author DuanYong * @param param: 参数 * @return: java.util.Map<java.lang.String,java.lang.String> */ protected Map<String,String> getParamMap(String param){ return initParam(parse(param,";").stream().map(s->s.split(":")).collect( Collectors.toMap(a->getValue(a,0),a->getValue(a,1),(existing, replacement) -> existing))); } /** * 公共参数初始化 * <li></li> * @author DuanYong * @param paramMap: * @return: java.util.Map<java.lang.String,java.lang.String> */ private Map<String,String> initParam(Map<String,String> paramMap){ //初始化业务时间 //bizDate = StringUtils.isNotBlank(paramMap.get(BIZ_DATE)) ? paramMap.get(BIZ_DATE) : serviceUtils.getBizDate(); return paramMap; } /** * 获取数组中指定索引的值 * <li></li> * @author DuanYong * @param array: 数组 * @param index: 索引 * @return: java.lang.String */ protected String getValue(String[] array,int index){ return (array == null || array.length == 0) ? "" : array.length == 1 ? array[0] : index <= array.length ? array[index] : array[array.length - 1]; } /** * 参数解析 * <li></li> * @author DuanYong * @param param: 参数 * @param regex: 分隔符 * @return: java.util.List<java.lang.String> */ private List<String> parse(String param,String regex){ return StringUtils.isBlank(param) ? Collections.emptyList() : Lists.newArrayList(param.trim().split(regex)); } /** * 接收参数并执行 * <li></li> * @author DuanYong * @param taskList: * @return: void */ private void accept(List<T> taskList) { if(batch()){ doExecute(taskList); }else{ if(parallel()){ taskList.parallelStream().forEach(task -> doExecute(task)); }else{ taskList.stream().forEach(task -> doExecute(task)); } } } /** * 执行事务处理 * <li>开启事务,执行校验</li> * @author DuanYong * @param t: 需要事务处理对象 * @param transactionHandler: 事务处理器 * @return: void */ public <T> void doTransactionHandler(T t, Consumer<T> transactionHandler){ // 开启声明式事务 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); TransactionStatus status = transactionManager.getTransaction(def); try{ transactionHandler.accept(t); //提交事务 transactionManager.commit(status); }catch (Exception e){ e.printStackTrace(); log.error("执行事务处理任务失败:{}", JSON.toJSONString(t),e); //事务回滚 transactionManager.rollback(status); } } /** * 加锁 * <li></li> * @author DuanYong * @param cacheKey: 锁key * @return: boolean */ protected boolean tryLock(String cacheKey){ int timeout = Lock.TIMEOUT_SECOND; if(!LockHolder.getLock().isPresent()){ log.info("不支持加锁"); return true; } boolean isLocked = LockHolder.getLock().get().tryLock(cacheKey, TimeUnit.SECONDS,0,timeout); if(isLocked){ log.info("加锁成功,KEY:{},自动失效时间:{}秒",cacheKey,timeout); } return isLocked; } /** * 解锁 * <li></li> * @author DuanYong * @param key: 缓存key * @return: void */ protected void unlock(String key){ if(!LockHolder.getLock().isPresent()){ return; } LockHolder.getLock().get().unlock(key); } }
-
进件申请JOB帮助类:
/** * 进件申请JOB帮助类 * <li></li> * * @author: DuanYong */ @Slf4j @Component public class LoanJobServiceHelper extends BaseJobService<ImportTask> { /** 任务 */ private static final String CACHE_KEY_PREFIX = "task:applyNo:"; /** 查询到的征信报告有效期(天) */ @Value("${credit.report.term.of.validity}") private Integer termOfValidity; /** 默认渠道集合 */ @Value("${channelNos:1111}") private String defaultChannelNos; /**渠道编号key*/ private static final String CHANNEL_NO_KEY = "channelNos"; /** 默认定时任务最大执行次数 */ @Value("${task.maxExecNum:10}") private Integer defaultMaxExecNum; /**定时任务最大执行次数key*/ private static final String MAX_EXEC_NUM_KEY = "maxExecNum"; /** * 定时任务 */ @Resource private CustomImportTaskMapper customImportTaskMapper; /** * TaskJobHandler 实现类map */ @Autowired private Map<String, TaskJobHandler> taskJobHandlerMap; @PostConstruct private void init(){ EventBusUtil.getInstance().register(this); } /** * 获取JOB任务 * <li></li> * * @param param :控制台参数 * @author DuanYong * @return: java.util.Optional<java.util.List < T>> */ @Override protected Optional<List<ImportTask>> getJobTask(String param) { //参数解析 Map<String, String> paramMap = getParamMap(param); //获取渠道集合 String channelNoString= StringUtils.hasText(paramMap.get(CHANNEL_NO_KEY)) ? paramMap.get(CHANNEL_NO_KEY) : defaultChannelNos; String[] channelNos = channelNoString.split(","); //获取最大执行次数 Integer maxExecNum = StringUtils.hasText(paramMap.get(MAX_EXEC_NUM_KEY)) ? Integer.valueOf(paramMap.get(MAX_EXEC_NUM_KEY)) : defaultMaxExecNum; List<ImportTask> importTasks = customImportTaskMapper.selectWaittingImportTasks(channelNos,maxExecNum); //如果为空直接返回 if(CollectionUtils.isEmpty(importTasks)){ return Optional.empty(); } log.info("[定时任务]执行处理,总数:{}", importTasks.size()); //过滤无处理器的任务 List<ImportTask> filterImportTasks = importTasks.stream() .filter(importTask -> taskJobHandlerMap.containsKey(importTask.getHandlerName())) .collect(Collectors.toList()); log.info("[定时任务]执行处理,过滤无处理器的任务后总数:{}", importTasks.size()); return Optional.of(filterImportTasks); } /** * 是否支持并行执行 * <li>默认false</li> * @author DuanYong * @return: boolean */ @Override protected boolean parallel(){ return true; } /** * 执行处理 * <li>1:锁定当前订单号任务,并更新状态为执行中</li> * <li>2:执行处理</li> * @author DuanYong * @param importTask: 定时任务 * @return: void */ @Override protected void doExecute(ImportTask importTask) { log.info("[定时任务]执行处理,订单号:{}", importTask.getApplyNo()); try { if(lockAndUpdate(importTask)){ //执行处理 doHandle(importTask); } }catch (Exception e){ log.info("[定时任务]->执行处理失败",e); } } /** * 锁定当前订单号任务,并更新状态为执行中 * <li></li> * @author DuanYong * @param importTask: 定时任务 * @return: boolean */ private boolean lockAndUpdate(ImportTask importTask){ String lockKey = new StringBuilder(CACHE_KEY_PREFIX).append(importTask.getApplyNo()).toString(); try{ if(tryLock(lockKey)){ updateImportTaskToRun(importTask); return true; } }finally { unlock(lockKey); } return false; } /** * 更新当前任务状态为执行中 * <li></li> * @author DuanYong * @param importTask:定时任务 * @return: void */ private void updateImportTaskToRun(ImportTask importTask){ int count = customImportTaskMapper.updateStateToRun(importTask.getId()); log.info("[定时任务]->更新当前任务状态为执行中->任务ID:{}->影响记录数:{}",importTask.getId(),count); if(count <= 0){ throw new LoanException(ResponseStateCode.ERROR.getCode(), "更新当前任务状态为执行中失败!"); } } /** * 执行处理 * <li></li> * @author DuanYong * @param importTask:定时任务 * @return: void */ private void doHandle(ImportTask importTask) { Long startTile = System.currentTimeMillis(); log.info("[定时任务],执行定时任务:{}",importTask.getHandlerName()); taskJobHandlerMap.get(importTask.getHandlerName()).accept(importTask); log.info("[定时任务],执行定时任务:{}完成, 耗时:{}秒",importTask.getHandlerName(),(System.currentTimeMillis() - startTile)/1000.0); } /** * 任务事件处理 * <li></li> * @author DuanYong * @param importTask: 定时任务 * @return: void */ @Subscribe private void taskJobEventHandle(ImportTask importTask){ log.info("[任务事件处理],执行任务:{}", FastJsonUtil.toJSONString(importTask)); doExecute(importTask); } }
-
定时任务处理器:
/** * 定时任务处理器 * <li></li> * * @author: DuanYong */ public interface TaskJobHandler<T> extends Consumer<T> { }
-
定时任务处理器抽象实现类:
... /** * 定时任务处理器抽象实现类 * <li></li> * * @author: DuanYong */ @Slf4j public abstract class AbstractTaskJobHandler implements TaskJobHandler<ImportTask>{ /** 默认定时任务最大执行次数 */ @Value("${gjj.online.api.task.maxExecNum:2}") private Integer defaultMaxExecNum; /**事务管理器*/ @Autowired private PlatformTransactionManager transactionManager; /** * 定时任务 */ @Resource protected CustomImportTaskMapper customImportTaskMapper; /** * 渠道参数配置 */ @Autowired private ChannelConfig channelConfig; @Resource(name = "writeBaseDAO") private BaseDAO writeBaseDAO; @Override public final void accept(ImportTask importTask) { //初始定时任务执行上下文 TaskJobContext taskJobContext = initLoanJobContext(importTask); String errorMsg = ""; try{ //执行带事务的处理 errorMsg = doTransactionHandler(taskJobContext, this::handle); }catch (Exception e){ log.info("[执行带事务的处理]->执行处理失败",e); errorMsg = "执行带事务的处理异常:"+e.getMessage(); } //如果没有异常信息 if(StringUtils.isBlank(errorMsg)){ //异步通知立即执行下一个处理器 noticeToRun(taskJobContext); }else{ log.info("[定时任务]->执行处理失败:{}",errorMsg); //更新当前任务状态为失败 updateImportTaskToFail(taskJobContext); //创建失败订单 createFailOrder(importTask,errorMsg); } } /** * 处理任务 * <li></li> * @author DuanYong * @param taskJobContext:定时任务JOB执行上下文 * @return: void */ private void handle(TaskJobContext taskJobContext){ //执行处理 doHandle(taskJobContext); //更新当前任务状态为成功 updateImportTaskToSuccess(taskJobContext); //设置下个处理器 setNextHandler(taskJobContext); } /** * 异步通知立即执行下一个处理器 * <li></li> * @author DuanYong * @param taskJobContext: 定时任务JOB执行上下文 * @return: void */ private void noticeToRun(TaskJobContext taskJobContext) { if(taskJobContext.getNextImportTask() == null){ return; } //异步通知立即执行 EventBusUtil.getInstance().post(taskJobContext.getNextImportTask()); } /** * 创建失败订单 * <li></li> * @author DuanYong * @param importTask: * @param errorMsg: * @return: void */ protected void createFailOrder(ImportTask importTask, String errorMsg) { //如果当前任务执行次数小于最大执行次数 if(importTask.getExecNum().intValue() + 1 < defaultMaxExecNum.intValue() & StringUtils.isNotBlank(getNextHandler())){ return; } ... } /** * 执行处理 * <li></li> * @author DuanYong * @param taskJobContext:定时任务JOB执行上下文 * @return: void */ protected abstract void doHandle(TaskJobContext taskJobContext); /** * 获取下一个处理器 * <li></li> * @author DuanYong * @return: java.lang.String */ protected abstract String getNextHandler(); /** * 设置下一个处理器 * <li></li> * * @param taskJobContext : 定时任务JOB执行上下文 * @author DuanYong * @return: void */ private void setNextHandler(TaskJobContext taskJobContext) { if(StringUtils.isBlank(getNextHandler())){ return; } //组装定时任务 ImportTask record = new ImportTask(); record.setApplyNo(taskJobContext.getImportTask().getApplyNo()); record.setChannelNo(taskJobContext.getImportTask().getChannelNo()); record.setHandlerName(getNextHandler()); record.setState(JobTaskStateEnum.JOB_TASK_STATE_ENUM_0.getCode()); record.setParam(taskJobContext.getImportTask().getParam()); record.setCreated(OnlineApiConstant.DEFAULT_SYSTEM_USER); record.setModified(OnlineApiConstant.DEFAULT_SYSTEM_USER); record.setCreatedDate(new Date()); record.setModifiedDate(record.getCreatedDate()); record.setStatus(Boolean.valueOf(StateEnum.EFFECTIVE.getCode())); record.setExecNum(0); int count = customImportTaskMapper.insertTask(record); log.info("设置下一个处理器:{}->影响记录数:{}", FastJsonUtil.toJSONString(record),count); if(count <= 0){ throw new LoanException(ResponseStateCode.ERROR.getCode(), "设置下一个处理器失败!"); } taskJobContext.setNextImportTask(record); } /** * 初始化贷款申请JOB上下文 * <li></li> * @author DuanYong * @param importTask: 定时任务 * @return: com.javacoo.common.operate.service.outer.context.LoanJobContext */ protected TaskJobContext initLoanJobContext(ImportTask importTask){ return TaskJobContext.builder().importTask(importTask).taskParam(JSONObject.parseObject(importTask.getParam())).build(); } /** * 更新当前任务状态为失败 * <li></li> * @author DuanYong * @param taskJobContext:定时任务JOB执行上下文 * @return: void */ protected void updateImportTaskToFail(TaskJobContext taskJobContext){ int count = customImportTaskMapper.updateStateToFail(taskJobContext.getImportTask().getId()); log.info("执行定时任务->更新当前任务状态为失败->任务ID:{}->影响记录数:{}",taskJobContext.getImportTask().getId(),count); } /** * 更新当前任务状态为成功 * <li></li> * @author DuanYong * @param taskJobContext:定时任务JOB执行上下文 * @return: void */ protected void updateImportTaskToSuccess(TaskJobContext taskJobContext){ int count = customImportTaskMapper.updateStateToSuccess(taskJobContext.getImportTask().getId()); log.info("执行定时任务->更新当前任务状态为成功->任务ID:{}->影响记录数:{}",taskJobContext.getImportTask().getId(),count); if(count <= 0){ throw new LoanException(ResponseStateCode.ERROR.getCode(), "更新当前任务状态为成功失败!"); } } /** * 执行事务处理 * <li>开启事务,执行校验</li> * @author DuanYong * @param t: 需要事务处理对象 * @param transactionHandler: 事务处理器 * @return: String */ private <T> String doTransactionHandler(T t, Consumer<T> transactionHandler){ // 开启声明式事务 DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); TransactionStatus status = transactionManager.getTransaction(def); try{ transactionHandler.accept(t); //提交事务 transactionManager.commit(status); }catch (Exception e){ e.printStackTrace(); log.error("执行事务处理任务失败:{}", JSON.toJSONString(t),e); //事务回滚 transactionManager.rollback(status); return e.getMessage(); } return ""; } @Subscribe private void handleTaskJob(LoanApplyContext loanApplyContext){ TaskJobContext taskJobContext = TaskJobContext.builder().build(); } }
-
征信报告查询定时任务(具体定时任务实现举例):
... /** * 征信报告查询定时任务 * <li></li> * * @author: DuanYong */ @Slf4j @Component(value = OnlineApiConstant.CREDIT_REPORT_PREFIX_KEY) public class CreditReportTaskJobHandler extends AbstractTaskJobHandler { /** * 执行处理 * <li></li> * @author DuanYong * @param taskJobContext:定时任务JOB执行上下文 * @return: void */ @Override protected void doHandle(TaskJobContext taskJobContext) { //执行征信查询 doCreditQuery(taskJobContext); //创建征信报告 creditReportInfo(taskJobContext); //更新贷款申请关联信息 updateLoanApplLinks(taskJobContext); log.info("[执行征信查询JOB]订单号:{}成功",taskJobContext.getImportTask().getApplyNo()); } /** * 获取下一个处理器 * <li></li> * * @author DuanYong * @return: java.lang.String */ @Override protected String getNextHandler() { return OnlineApiJobTaskTypeEnum.ONLINR_API_JOB_TASK_TYPE_ENUM_1.getValue(); } ... }
-
事件处理工具类:
/** * 事件处理器 * <p>说明:</p> * <li>基于google eventbus</li> * @author DuanYong */ @Slf4j public class EventBusUtil { /** * AsyncEventBus */ private static AsyncEventBus eventBus = null; /** * DelayQueue */ private static DelayQueue<EventItem> delayQueue = null; private static class EventBusUtilHolder { /** * 静态初始化器,由JVM来保证线程安全 */ private static EventBusUtil instance = new EventBusUtil(); } private EventBusUtil() { TaskExecutor taskExecutor = new TaskExecutor() { ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),Executors.defaultThreadFactory()); @Override public void execute(Runnable task) { executorService.execute(task); } }; eventBus = new AsyncEventBus(taskExecutor); delayQueue = new DelayQueue<>(); } public static EventBusUtil getInstance() { return EventBusUtilHolder.instance; } public void register(Object object){ eventBus.register(object); } /** * 执行事件 * <li></li> * @author duanyong@javacoo.com * @param object: 事件对象 * @return: void */ public void post(Object object){ eventBus.post(object); } /** * 延迟执行事件 * <li></li> * @author DuanYong * @param object: 事件对象 * @param time: 延迟时间,单位:毫秒 * @return: void */ public void post(Object object, long time) { log.info("延迟执行事件->入延迟队列:{}",object); //入延迟队列 delayQueue.put(new EventItem(object, time)); //开启线程 new Thread(()->execute()).start(); } /** * * <li></li> * @author DuanYong * @return: void */ private void execute(){ try { // 使用DelayQueue的take方法获取当前队列里的元素(take方法是阻塞方法,如果队列里有值则取出,否则一直阻塞) eventBus.post(delayQueue.take().getEventObject()); log.info("延迟执行事件"); }catch (InterruptedException interruptedException){ log.info("延迟执行事件异常:",interruptedException); } } /** * 卸载事件 * @param object */ public void unRegister(Object object){ eventBus.unregister(object); } /** * 事件项 * <li></li> * @author DuanYong */ class EventItem<T> implements Delayed { /** 触发时间:单位 毫秒 */ private long time; /** 事件对象 */ private T eventObject; public EventItem(T eventObject, long time) { super(); // 将传入的时间转换为超时的时刻 this.time = TimeUnit.NANOSECONDS.convert(time, TimeUnit.MILLISECONDS) + System.nanoTime(); this.eventObject = eventObject; } public long getTime() { return time; } public T getEventObject() { return eventObject; } @Override public long getDelay(TimeUnit unit) { // 剩余时间= 到期时间-当前系统时间,系统一般是纳秒级的,所以这里做一次转换 return unit.convert(time-System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { // 剩余时间-当前传入的时间= 实际剩余时间(单位纳秒) long d = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); // 根据剩余时间判断等于0 返回1 不等于0 // 有可能大于0 有可能小于0 大于0返回1 小于返回-1 return (d == 0) ? 0 : ((d > 0) ? 1 : -1); } @Override public String toString() { return "EventItem{" + "time=" + time + ", eventObject='" + eventObject + '\'' + '}'; } } }
三,数据库设计(mysql)
-
进件信息表:IMPORT_APPLY_INFO
字段名称 类型 描述 是否必填 ID BIGINT(20) 主键 是 BIZ_SERIAL_ID VARCHAR(64) 业务流水号 是 CHANNEL_NO VARCHAR(10) 渠道号 是 APPLY_NO VARCHAR(64) 申请编号 是 CONTENT MEDIUMTEXT 申请内容 是 STATE VARCHAR(2) 状态:<br />1->初始状态<br />2->推送到MQ成功<br />3->业务处理成功<br />4->业务处理失败 是 REMARKS VARCHAR(200) 描述 否 STATUS TINYINT(1) 数据有效性: <br />0->无效 <br />1->有效 是 CREATED VARCHAR(20) 数据创建者 是 CREATED_DATE DATETIME 数据创建时间 是 MODIFIED VARCHAR(20) 数据修改者 是 MODIFIED_DATE TIMESTAMP 更新时间 是 索引信息
类型 名称 字段 方法 主键索引 PK_ID ID btree 唯一索引 UNQ_BC BIZ_SERIAL_ID,CHANNEL_NO btree -
MQ异常信息表:IMPORT_MQ_ERROR_INFO
字段名称 类型 描述 是否必填 ID BIGINT(20) 主键 是 BIZ_SERIAL_ID VARCHAR(64) 业务流水号 是 CHANNEL_NO VARCHAR(10) 渠道号 是 CONTENT MEDIUMTEXT 内容 是 STATE VARCHAR(2) 状态:<br />1->初始状态<br />2->推送到MQ成功 是 STATUS TINYINT(1) 数据有效性: <br />0->无效 <br />1->有效 是 CREATED VARCHAR(20) 数据创建者 是 CREATED_DATE DATETIME 数据创建时间 是 MODIFIED VARCHAR(20) 数据修改者 是 MODIFIED_DATE TIMESTAMP 更新时间 是 索引信息
类型 名称 字段 方法 主键索引 PK_ID ID btree 普通索引 IDX_ST STATE btree -
任务表:IMPORT_TASK
字段名称 类型 描述 是否必填 ID BIGINT(20) 主键 是 APPLY_NO VARCHAR(64) 申请号 是 CHANNEL_NO VARCHAR(10) 渠道号 是 HANDLER_NAME VARCHAR(100) 任务处理器名称 是 EXEC_NUM INT(11) 任务执行次数 否 PARAM MEDIUMTEXT 任务参数 否 STATE VARCHAR(1) 任务状态:0->初始1->执行2->失败3->成功 是 STATUS TINYINT(1) 数据有效性 0:无效 1:有效 是 CREATED VARCHAR(20) 数据创建者 是 CREATED_DATE DATETIME 数据创建时间 是 MODIFIED VARCHAR(20) 数据修改者 是 MODIFIED_DATE TIMESTAMP 更新时间 是 索引信息
类型 名称 字段 方法 主键索引 PK_ID ID btree 普通索引 IDX_CSS CHANNEL_NO,STATE,STATUS btree
四,业务系统接入
- Rpc方式接入(dubbo实现)
业务系统依赖平台提供的进件申请通知接口API包,并实现接口(进件通知接口文档v1.0.0.docx的接口)。
-
注册为dubbo服务(根据约定,注册服务时,需要指定group=渠道号NOTICE_SERVICE)如:
<dubbo:service interface="com.javacoo.laip.api.LoanApplyNoticeService" ref="loanApplyNoticeService" group="渠道号_NOTICE_SERVICE"/>
平台则根据消息所带渠道信息,动态路由获取对应渠道通知接口,推送消息。
此种接入方式受到一定的限制,有失灵活性,但是效率较高。
-
Http方式接入
实现进件通知接口文档v1.0.0.docx的接口。
-
将服务地址注册到进件平台(约定推送路径:/notice/loanApply)
#XXX渠道配置 #推送地址 #app.config.channels.XXX.pushUrl = https://www.javacoo.com
当进申请信息到达时调用业务系统接口推送信息
此种接入方式最为灵活,但效率略低。
五,渠道接入
- 渠道按照文档:进件平台渠道接入接口文档v1.0.0.docx接入平台,并按照约定流程进行接口调用。
后续规划
- 本文所述思路及代码为进件平台核心功能的一个雏形,仅供学习交流,存在诸多不足之处,欢迎指正,共同探讨。
一些信息
路漫漫其修远兮,吾将上下而求索
码云:https://gitee.com/javacoo
QQ:164863067
作者/微信:javacoo
邮箱:xihuady@126.com