从前面几篇组件介绍,希望用户对oozie的组件有个粗略的概念,用户需要完成的业务逻辑将会被封装成为wf、coord、bundle,作为一个调度系统,oozie需要为用户做到什么呢,oozie需要按照用户的设地在合适的时候对用户的wf、coord、bundle进行操作,可能的操作包括 启动、停止、杀死、重跑、挂起、恢复、结束等等,本篇主要介绍oozie的命令系统;
/** * Extends Callable adding the concept of priority. <p/> The priority is useful when queuing callables for later
* execution via the {@link org.apache.oozie.service.CallableQueueService}. <p/> A higher number means a higher
* priority. <p/>
*/
public interface XCallable<T> extends Callable<T> {
/** * Base class for synchronous and asynchronous commands.
* <p/>
* It enables by API the following pattern:
* <p/>
* <ul>
* <li>single execution: a command instance can be executed only once</li>
* <li>eager data loading: loads data for eager precondition check</li>
* <li>eager precondition check: verify precondition before obtaining lock</li>
* <li>data loading: loads data for precondition check and execution</li>
* <li>precondition check: verifies precondition for execution is still met</li>
* <li>locking: obtains exclusive lock on key before executing the command</li>
* <li>execution: command logic</li>
* </ul>
* <p/>
* It has built in instrumentation and logging.
*/
public abstract class XCommand<T> implements XCallable<T> {
oozie将所有的命令抽象出一层 XCommand ,命令根据不同的场景需要同步执行或者异步执行,当进行异步执行的时候,还引入了优先级的概念来排列命令的执行计划;
/** * Implements the XCommand life-cycle.
*
* @return the {link #execute} return value.
* @throws Exception thrown if the command could not be executed. */
@Override
public final T call() throws CommandException {
setLogInfo();
if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) {
LOG.debug("Command [{0}] key [{1}] already used for [{2}]", getName(), getEntityKey(), this.toString());
return null;
}
commandQueue = null;
instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".executions", 1);
Instrumentation.Cron callCron = new Instrumentation.Cron();
try {
callCron.start();
if (!isSynchronous) {
eagerLoadState();
eagerVerifyPrecondition();
}
try {
T ret = null;
if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) {
Instrumentation.Cron acquireLockCron = new Instrumentation.Cron();
acquireLockCron.start();
acquireLock();
acquireLockCron.stop();
instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".acquireLock", acquireLockCron);
}
// executing interrupts only in case of the lock required commands
if (lock != null) {
this.executeInterrupts();
}
if (isSynchronous || !isLockRequired() || (lock != null) || this.inInterruptMode()) {
if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType())
&& !used.compareAndSet(false, true)) {
LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(), this.toString());
return null;
}
LOG.trace("Load state for [{0}]", getEntityKey());
loadState();
LOG.trace("Precondition check for command [{0}] key [{1}]", getName(), getEntityKey());
verifyPrecondition();
LOG.debug("Execute command [{0}] key [{1}]", getName(), getEntityKey());
Instrumentation.Cron executeCron = new Instrumentation.Cron();
executeCron.start();
ret = execute();
executeCron.stop();
instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron);
}
if (commandQueue != null) {
CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) {
LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey());
if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) {
LOG.warn("Could not queue [{0}] commands with delay [{1}]ms, queue full", entry.getValue()
.size(), entry.getKey());
}
}
}
return ret;
}
finally {
if (!isSynchronous && isLockRequired() && !this.inInterruptMode()) {
releaseLock();
}
}
}
catch(PreconditionException pex){
LOG.warn(pex.getMessage().toString() + ", Error Code: " + pex.getErrorCode().toString());
instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".preconditionfailed", 1);
return null;
}
catch (XException ex) {
LOG.error("XException, ", ex);
instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".xexceptions", 1);
if (ex instanceof CommandException) {
throw (CommandException) ex;
}
else {
throw new CommandException(ex);
}
}
catch (Exception ex) {
LOG.error("Exception, ", ex);
instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".exceptions", 1);
throw new CommandException(ErrorCode.E0607, getName(), ex.getMessage(), ex);
}
catch (Error er) {
LOG.error("Error, ", er);
instrumentation.incr(INSTRUMENTATION_GROUP, getName() + ".errors", 1);
throw er;
}
finally {
FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
callCron.stop();
instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".call", callCron);
}}
如上是 所有命令的call方法:如果是异步执行命令时候,再执行命令的时候 需要
先来测试一下此刻的状态是否还有执行命令的必要;所有的call方法在执行 具体的 execute() 时候,都需要装载需要操作的信息,用于更改数据库信息。如果对某个实例的操作不能同时进行,在执行命令之前还需要去获取锁,来保证某一时刻的操作是唯一的。很多情况下,一个命令会衍生出子命令,比如说杀死一个bundle的时候,我需要将bundle的状态置为杀死之外,还要去发送杀死bundle中的coord杀死的命令,这个就是一个命令产生子命令的场景,这个时候,我们也需要将这些子命令加入到异步命令执行池中去直接。
不同的命令逻辑的执行区别主要体现在不同的子类中的 execute() 中。
图中是oozie的整个包含不同业务色彩的命令系统。