- There are 3 steps in BrokerLoadJob: BrokerPendingTask, LoadLoadingTask, CommitAndPublishTxn.
- Step1: BrokerPendingTask will be created on method of unprotectedExecuteJob.
- Step2: LoadLoadingTasks will be created by the method of onTaskFinished when BrokerPendingTask is finished.
- Step3: CommitAndPublicTxn will be called by the method of onTaskFinished when all of LoadLoadingTasks are finished.
BrokerLoadJob.onTaskFinished
if (attachment instanceof BrokerPendingTaskAttachment) {
onPendingTaskFinished((BrokerPendingTaskAttachment) attachment);
} else if (attachment instanceof BrokerLoadingTaskAttachment) {
onLoadingTaskFinished((BrokerLoadingTaskAttachment) attachment);
}
BrokerLoadJob.cancelJobWithoutCheck
Job状态转换为cancel
#LoadJob.unprotectedExecuteCancel
// change state
state = JobState.CANCELLED;
BrokerLoadJob.onLoadingTaskFinished
Catalog.getCurrentGlobalTransactionMgr().commitTransaction(
dbId, transactionId, commitInfos,
new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
finishTimestamp, state, failMsg));
DatabaseTransactionMgr.commitTransaction
transactionState.afterStateTransform
public void afterStateTransform(TransactionStatus transactionStatus, boolean txnOperated,
String txnStatusChangeReason)
throws UserException {
// after status changed
TxnStateChangeCallback callback = Catalog.getCurrentGlobalTransactionMgr()
.getCallbackFactory().getCallback(callbackId);
if (callback != null) {
switch (transactionStatus) {
case ABORTED:
//ABORTED的时候会调用unprotectedExecuteCancel ,canal Job
//更新JobState 为CANCELLED
callback.afterAborted(this, txnOperated, txnStatusChangeReason);
break;
//COMMITTED 的时候更新JobState 为COMMITTED
case COMMITTED:
callback.afterCommitted(this, txnOperated);
break;
//VISIBLE 的时候更新JobState 为FINISHED
case VISIBLE:
callback.afterVisible(this, txnOperated);
break;
default:
break;
}
}
}