定时任务扫描订单数据
任务描述:
设计为主任务和子任务,主任务只处理指定时间片,子任务会拆分主任务的时间片。其目的为拆分后的子任务可以并行执行,不会干扰,提供吞吐量。
主任务状态:新建、执行中、执行完毕
多次触发时,没有主任务则新建,执行中不处理,当前任务执行完毕,则需新建下一个时间段的主任务。
主任务初始化:
触发 job,主任务领域通过【加载最近执行的任务】或【新建主任务】方式,初始化主任务信息。
//获取最近同步的主任务
OrderSyncMainTask lastSyncMainTask = MainTaskFactory.getOrInitLastMainTask();
public static OrderSyncMainTask getOrInitLastMainTask() {
// 任务查询并启动,暂时未考虑连续触发的情况(并发)
OrderSyncMainTask orderSyncMainTask = omsOrderSyncTaskRepo.getLastMainTask();
if (orderSyncMainTask == null) {
orderSyncMainTask = initMainTask(null);
}
return orderSyncMainTask;
}
问题描述:
这里【主任务领域】初始化的方式,采用工厂+静态方法,对于外部传入的入参,是很容易加工生成目标类的,但如果需要再次读取 db 库的数据,工厂类就需要引入(注入)相关 service、repository。
@Slf4j
@Component
public class MainTaskFactory {
static OmsOrderSyncTaskRepo omsOrderSyncTaskRepo;
//从spring容器找到要用的bean
@Autowired
public void setOmsOrderSyncTaskRepo(OmsOrderSyncTaskRepo omsOrderSyncTaskRepo) {
MainTaskFactory.omsOrderSyncTaskRepo = omsOrderSyncTaskRepo;
}
//...
}
现在出现一个我们不熟悉的依赖方式,静态工厂的静态属性,需要引入 spring 的 bean 实例。
实际上,让 MainTaskFactory 做了初始化,从持久化数据库读数据,就必然依赖 spring 中的 bean:
整理下 MainTaskFactory 的工作职责:
- 职责 1: 为了通过静态方法使用其它 bean,将自己作为 bean 组件,并通过 static set 方式引入要用的 bean。
- 职责 2:获取、初始化 MainTask。
- create MainTask,实际上 new MainTask
- 职责 3:持久化主任务,为此需要职责 1。
总结为:factory 做了查询、构造、保存工作,这里分工混乱。
谁来读取持久化的数据(分工);
如何读取持久化的数据(实现方式);
谁来保存 domain 信息;
repo.save(domain)
domain 转换出 dto1,dto2,dto3
=======以下为同一事务=======
dto1Repo.save(dto1);
dto2Repo.save(dto2);
dto3Repo.save(dto3);
写到这里,发现写和读应该属于同一逻辑:
//repo 加载 domain 的方法:
public domain load(cmd.getId);
按以上想法,重新设计:
repo 层,从持久化服务中,加载需要的 dto,交给 factory 构造 domain。
public SyncOmsOrderTaskDomain loadLast() {
MainTaskDTO mainTaskDTO = null;//omsOrderSyncTaskRepo.getLastMainTask();
List<SubTaskDTO> subTaskDTOList = null;//omsOrderSyncTaskRepo.getSubTaskList(mainTaskDTO.getId());
List<SubTaskRecordDTO> subTaskRecordDTOList = new ArrayList<>();//获取子任务执行记录,存在多个记录,取最后一次,或者设计上,保证每个子任务只有一个有效的记录
return MainTaskFactory.create(mainTaskDTO, subTaskDTOList, subTaskRecordDTOList);
}
factory 构造 domain 对象。
public static SyncOmsOrderTaskDomain create(MainTaskDTO mainTaskDTO, List<SubTaskDTO> subTaskDTOList, List<SubTaskRecordDTO> subTaskRecordDTOList) {
String taskId = UUIDHexGenerator.generate();
//对查询的值做一些基本校验
return new SyncOmsOrderTaskDomain(mainTaskDTO, subTaskDTOList, subTaskRecordDTOList);
}
domainService 中,操作 domain 的各种方法,包括必要时,会通过入参、配置等信息,通过 factory 构造 domain。
@Service
public class MainTaskDomainService {
@Resource
MainTaskRepo mainTaskRepo;
@Resource
MainTaskConfig mainTaskConfig;
@Resource
SubTaskConfig subTaskConfig;
@Resource
ConcurrentReadOrderService concurrentReadOrderService;
//触发任务调度
public String triggerTask() {
SyncOmsOrderTaskDomain syncOmsOrderTaskDomain = mainTaskRepo.loadLast();
if (syncOmsOrderTaskDomain.ifNotExsit()) {
syncOmsOrderTaskDomain.initTaskByConfig(mainTaskConfig);
syncOmsOrderTaskDomain.initSubTaskByConfig(subTaskConfig);
mainTaskRepo.save(syncOmsOrderTaskDomain);
//细节1:实时更新子任务发送进度,如何实现?
// 1。子任务查询并发送成功后,可使用内部通知机制,异步通知、更新。
// 2。子任务查询发送、直接调用repo更新(有点耦合)
concurrentReadOrderService.executeRead(syncOmsOrderTaskDomain);
return "任务首次运行";
} else {
if (syncOmsOrderTaskDomain.isRunning()) {
return "执行中";
}
if (syncOmsOrderTaskDomain.needReRun()){
syncOmsOrderTaskDomain.updateReRunTask();
mainTaskRepo.updateReRunTask(syncOmsOrderTaskDomain);
concurrentReadOrderService.executeRead(syncOmsOrderTaskDomain);
return "重新运行";
}
if (syncOmsOrderTaskDomain.isFinished()) {
syncOmsOrderTaskDomain.createNextTask();
mainTaskRepo.save(syncOmsOrderTaskDomain);
concurrentReadOrderService.executeRead(syncOmsOrderTaskDomain);
return "运行下一周期";
}
}
return "未知状态";
}
}
更新子任务发送状态:
执行子任务时,会组装成 runable,交给线程池执行查询,每次查询到数据,同时发送内部事件,更新子任务的发送数据数以及更新是否发送完毕状态。
发送消息结构:
class SubTaskSendCommand {
String taskId;
String subTaskId;
int sendCount;
boolean sendFinished;
}
因为发送的状态数据仅需保存,所以通过 factory 直接构造 domain,通过 repo save。
SyncOrderTaskDomain domain = MainTaskFactory.generate(subTaskSendCommand);
repo.updateSendInfo(domain);
更新子任务消费状态:
扫描的数据被消费处理成功后,需要更新任务记录状态,处理流程和上步类似:
class SubTaskConsumerCommand {
String taskId;
String subTaskId;
int receivedCount;
}
SyncOrderTaskDomain domain = MainTaskFactory.generate(subTaskConsumerCommand);
repo.updateReceiveInfo(domain);
整理流程都收敛在 domainService 层,factory 主要职责是构造 domain 对象,repo 负责加载、保存、更新 domain 信息。
domain 处理各类 command 指定,包括任务触发、更新任务发送记录,更新任务消费记录。
关于 domain 内部模型,如何管理主任务、子任务、任务记录,后序更新。