对账流程
对账流程节点:数据抓取、数据清洗、对账、结果汇总4个操作节点;
对账结果:上游单边(订单上游存在,下游不存在)、下游单边、金额不一致(上下游订单都存在,但是金额不同);
对账操作节点功能简介
- 数据抓取:拉取待对账业务数据,生成原始凭证数据;
- 数据清洗:原始凭证数据清洗,生成对账凭证数据;
- 对账:对账凭证对账,记录对账结果;
- 汇总:对账结果汇总;
系统设计需要处理的问题
- 数据量较大时,要分组处理;
- 数据分组时,避免相同单号的分到不同的组,导致数据单边;
- 应用多实例部署,避免多实例并发冲突,避免数据重复处理或数据遗漏;
- 对账流程,可以个性化配置,不同业务需要的对账流程可以不同;
- 对账链路中,任意节点的数据,都可以沿着对账链路往下执行;
系统实现方案
- 数据分表存储,避免后期数据量大了之后再分表时的数据迁移问题,并对数据进行了分组;
- 使用task处理方式,结合分布式定时任务组件xxljob,避免多实例冲突及数据重复处理问题(按表维度创建task任务);
- task处理流程进行模板化,模板方法抽象task处理的公共流程;
使用到的设计模式
- 责任链模式;
- 模板模式;
责任链模式
- 对于对账流程,目前的对账分为抓取、清洗、对账、汇总4个功能节点;
- 责任链模式灵活组装、增减功能节点的流转顺序和个数;
- 任意节点开始,进入链路,都可以继续链路的流转(常规代码实现的责任链不支持该需求);
模板模式
- 依赖task触发数据处理;
- task控制任务的并发处理;
代码结构
模板模式代码继承关系
- BaseTask<T>:基础模板抽象类,描述了Task任务处理的流程(execute方法),及流程处理过程中的抽象方法;
- task解锁方法:unlockTask();
- task获取方法:getTasks();
- task分片方法:currentPartition();
- task执行方法:process();
- task执行前处理方法:beforeProcess();
- task执行后处理方法:afterProcess();
- BaseExeTask:继承了基础抽象类BaseTask,并具体实现了task流程处理过程中的抽象方法(除process()之外的方法);
- BaseReconcileTask:对账操作基础抽象类,其中放置对账的基础操作(目前的对账只一种,所以该类为空);
- ReconcileDefaultTask:具体对账实现类,实现process(),对账逻辑在这里实现;
图中是对账模块的继承关系
责任链各节点核心处理流程介绍
- 数据抓取
- 提供目标业务数据分组功能,确定数据抓取的范围,并创建对应数量的task来分组抓取业务数据;
- 提供目标业务数据按组抓取功能,每个task可以按照上面的分组信息抓取指定范围的业务数据;
- 针对多个对账目标字段的情况,组装目标字段为json,便于数据清洗时拆分;
- 数据清洗
- 按照物理表维度创建清洗task,每个task只处理一个物理分表中的数据;
- 完成字段调整映射,统一同一对账字段名称,统一方式、渠道、状态等属性值;
- 对于多个对账目标字段的情况,拆分一条原始凭证数据为多条对账凭证;
- 对账
- 按照物理表维度创建对账task,每个task只处理一个物理分表中的数据;
- 借助Redis中Set结构的命令:Sdiffstore、Sinterstore函数实现对账操作;
- 对账汇总
- 按照物理表维度创建汇总task,每个task只处理一个物理分表中的数据;
- 每个task统计完该物理表中的汇总数据后,获取redis锁,获取成功后将单个表的汇总结果插入或更新到对账汇总表中;
责任链task节点创建及链路流转过程介绍
- 链路节点及节点的顺序信息存储在数据库中,可以针对每个对账类型创建一个链路,实现对账流程的高度配置话和自由度;
- 每次对账执行,创建批次信息,批次信息中记录当前对账流转到的节点及该节点下创建了多少个task;
- 任务开始执行,创建批次信息,task读取到批次信息后,读取对应的链路配置信息,开始创建第一个节点的task数据,并在批次中记录节点类型及对应task数量;
- 各个节点对应的task定时扫描表中对应的task,执行各自的任务(按照目标方法中规定的流程执行);
- 批次对应的task任务出发批次执行,校验该批次当前节点下所有的任务是否都已经执行完成,完成后创建下个节点的任务,未完成则根据规则重试;
对账流程图