序
本文主要研究一下powerjob的maxResultLength
maxResultLength
powerjob-worker/src/main/java/tech/powerjob/worker/common/PowerJobWorkerConfig.java
@Getter
@Setter
public class PowerJobWorkerConfig {
//......
/**
* Max length of response result. Result that is longer than the value will be truncated.
* {@link ProcessResult} max length for #msg
*/
private int maxResultLength = 8096;
//......
}
maxResultLength默认为8096,它定义了返回的result的最大长度
innerRun
powerjob-worker/src/main/java/tech/powerjob/worker/core/processor/runnable/HeavyProcessorRunnable.java
public void innerRun() throws InterruptedException {
final BasicProcessor processor = processorBean.getProcessor();
String taskId = task.getTaskId();
Long instanceId = task.getInstanceId();
log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, task.getTaskName());
ThreadLocalStore.setTask(task);
ThreadLocalStore.setRuntimeMeta(workerRuntime);
// 0. 构造任务上下文
WorkflowContext workflowContext = constructWorkflowContext();
TaskContext taskContext = constructTaskContext();
taskContext.setWorkflowContext(workflowContext);
// 1. 上报执行信息
reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null);
ProcessResult processResult;
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
// 2. 根任务 & 广播执行 特殊处理
if (TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName()) && executeType == ExecuteType.BROADCAST) {
// 广播执行:先选本机执行 preProcess,完成后 TaskTracker 再为所有 Worker 生成子 Task
handleBroadcastRootTask(instanceId, taskContext);
return;
}
// 3. 最终任务特殊处理(一定和 TaskTracker 处于相同的机器)
if (TaskConstant.LAST_TASK_NAME.equals(task.getTaskName())) {
handleLastTask(taskId, instanceId, taskContext, executeType);
return;
}
// 4. 正式提交运行
try {
processResult = processor.process(taskContext);
if (processResult == null) {
processResult = new ProcessResult(false, "ProcessResult can't be null");
}
} catch (Throwable e) {
log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, taskContext.getTaskId(), taskContext.getTaskName(), e);
processResult = new ProcessResult(false, e.toString());
}
reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, workflowContext.getAppendedContextData());
}
innerRun方法最后一步执行reportStatus,它会对processResult.getMsg()进行suit
suit
private String suit(String result) {
if (StringUtils.isEmpty(result)) {
return "";
}
final int maxLength = workerRuntime.getWorkerConfig().getMaxResultLength();
if (result.length() <= maxLength) {
return result;
}
log.warn("[ProcessorRunnable-{}] task(taskId={})'s result is too large({}>{}), a part will be discarded.",
task.getInstanceId(), task.getTaskId(), result.length(), maxLength);
return result.substring(0, maxLength).concat("...");
}
suit方法就是根据workerRuntime.getWorkerConfig().getMaxResultLength()来判断,超过maxLength的则取maxLength长度,后面加
...
onReceiveProcessorReportTaskStatusReq
powerjob-worker/src/main/java/tech/powerjob/worker/actors/TaskTrackerActor.java
@Handler(path = WTT_HANDLER_REPORT_TASK_STATUS)
public AskResponse onReceiveProcessorReportTaskStatusReq(ProcessorReportTaskStatusReq req) {
int taskStatus = req.getStatus();
// 只有重量级任务才会有两级任务状态上报的机制
HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId());
// 手动停止 TaskTracker 的情况下会出现这种情况
if (taskTracker == null) {
log.warn("[TaskTrackerActor] receive ProcessorReportTaskStatusReq({}) but system can't find TaskTracker.", req);
return null;
}
if (ProcessorReportTaskStatusReq.BROADCAST.equals(req.getCmd())) {
taskTracker.broadcast(taskStatus == TaskStatus.WORKER_PROCESS_SUCCESS.getValue(), req.getSubInstanceId(), req.getTaskId(), req.getResult());
}
taskTracker.updateTaskStatus(req.getSubInstanceId(), req.getTaskId(), taskStatus, req.getReportTime(), req.getResult());
// 更新工作流上下文
taskTracker.updateAppendedWfContext(req.getAppendedWfContext());
// 结束状态需要回复接受成功
if (TaskStatus.FINISHED_STATUS.contains(taskStatus)) {
return AskResponse.succeed(null);
}
return null;
}
onReceiveProcessorReportTaskStatusReq方法接收到ProcessorReportTaskStatusReq则执行taskTracker.updateTaskStatus
updateTaskStatus
powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {
if (finished.get()) {
return;
}
TaskStatus nTaskStatus = TaskStatus.of(newStatus);
int lockId = taskId.hashCode();
try {
// 阻塞获取锁
segmentLock.lockInterruptible(lockId);
TaskBriefInfo taskBriefInfo = taskId2BriefInfo.getIfPresent(taskId);
// 缓存中不存在,从数据库查
if (taskBriefInfo == null) {
Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
if (taskOpt.isPresent()) {
TaskDO taskDO = taskOpt.get();
taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.of(taskDO.getStatus()), taskDO.getLastReportTime());
} else {
// 理论上不存在这种情况,除非数据库异常
log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId);
taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.WAITING_DISPATCH, -1L);
}
// 写入缓存
taskId2BriefInfo.put(taskId, taskBriefInfo);
}
// 过滤过期的请求(潜在的集群时间一致性需求,重试跨 Worker 时,时间不一致可能导致问题)
if (taskBriefInfo.getLastReportTime() > reportTime) {
log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",
instanceId, subInstanceId, taskBriefInfo.getLastReportTime(), reportTime, taskId, newStatus);
return;
}
// 检查状态转移是否合法,fix issue 404
if (nTaskStatus.getValue() < taskBriefInfo.getStatus().getValue()) {
log.warn("[TaskTracker-{}-{}] receive invalid task status report(taskId={},currentStatus={},newStatus={}), TaskTracker will drop this report.",
instanceId, subInstanceId, taskId, taskBriefInfo.getStatus().getValue(), newStatus);
return;
}
// 此时本次请求已经有效,先更新相关信息
taskBriefInfo.setLastReportTime(reportTime);
taskBriefInfo.setStatus(nTaskStatus);
// 处理失败的情况
int configTaskRetryNum = instanceInfo.getTaskRetryNum();
if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum >= 1) {
// 失败不是主要的情况,多查一次数据库也问题不大(况且前面有缓存顶着,大部分情况之前不会去查DB)
Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
// 查询DB再失败的话,就不重试了...
if (taskOpt.isPresent()) {
int failedCnt = taskOpt.get().getFailedCnt();
if (failedCnt < configTaskRetryNum) {
TaskDO updateEntity = new TaskDO();
updateEntity.setFailedCnt(failedCnt + 1);
/*
地址规则:
1. 当前存储的地址为任务派发的目的地(ProcessorTracker地址)
2. 根任务、最终任务必须由TaskTracker所在机器执行(如果是根任务和最终任务,不应当修改地址)
3. 广播任务每台机器都需要执行,因此不应该重新分配worker(广播任务不应当修改地址)
*/
String taskName = taskOpt.get().getTaskName();
ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) {
updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
}
updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
updateEntity.setLastReportTime(reportTime);
boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
if (retryTask) {
log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, subInstanceId, taskId);
return;
}
}
}
}
// 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...)
result = result == null ? "" : result;
boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result);
if (!updateResult) {
log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, subInstanceId, taskId);
}
} catch (InterruptedException ignore) {
// ignore
} catch (Exception e) {
log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e);
} finally {
segmentLock.unlock(lockId);
}
}
updateTaskStatus则执行taskPersistenceService.updateTaskStatus
TaskPersistenceService.updateTaskStatus
powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java
public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) {
try {
return execute(() -> taskDAO.updateTaskStatus(instanceId, taskId, status, lastReportTime, result));
}catch (Exception e) {
log.error("[TaskPersistenceService] updateTaskStatus failed.", e);
}
return false;
}
执行的是taskDAO.updateTaskStatus
TaskDAOImpl.updateTaskStatus
powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskDAOImpl.java
public boolean updateTaskStatus(Long instanceId, String taskId, int status, long lastReportTime, String result) throws SQLException {
String sql = "update task_info set status = ?, last_report_time = ?, result = ?, last_modified_time = ? where instance_id = ? and task_id = ?";
try (Connection conn = connectionFactory.getConnection(); PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setInt(1, status);
ps.setLong(2, lastReportTime);
ps.setString(3, result);
ps.setLong(4, lastReportTime);
ps.setLong(5, instanceId);
ps.setString(6, taskId);
ps.executeUpdate();
return true;
}
}
updateTaskStatus最后更新到task_info表
task_info
public void initTable() throws Exception {
String delTableSQL = "drop table if exists task_info";
// 感谢 Gitee 用户 @Linfly 反馈的 BUG
// bigint(20) 与 Java Long 取值范围完全一致
String createTableSQL = "create table task_info (task_id varchar(255), instance_id bigint, sub_instance_id bigint, task_name varchar(255), task_content blob, address varchar(255), status int, result text, failed_cnt int, created_time bigint, last_modified_time bigint, last_report_time bigint, constraint pkey unique (instance_id, task_id))";
try (Connection conn = connectionFactory.getConnection(); Statement stat = conn.createStatement()) {
stat.execute(delTableSQL);
stat.execute(createTableSQL);
}
}
task_info定义了result字段,其类型为text
小结
- PowerJobWorkerConfig的maxResultLength默认为8096,它定义了返回的result的最大长度
- HeavyProcessorRunnable的innerRun方法最后一步执行reportStatus,它会对processResult.getMsg()进行suit,suit方法就是根据workerRuntime.getWorkerConfig().getMaxResultLength()来判断,超过maxLength的则取maxLength长度,后面加
...
- onReceiveProcessorReportTaskStatusReq方法接收到ProcessorReportTaskStatusReq则执行taskTracker.updateTaskStatus,最后更新到task_info表,该表定义了result字段,其类型为text