azkaban是没有创建作业流,创建任务,建立任务之间关系的操作的.
它本身的工作模式,在本地进行编辑.创建任务,创建作业流,建立依赖关系,然后打成zip上传.
DirectoryFlowLoader解压zip包,然后解析解压出来的zip目录.
首先需要两个基础类Node,Edge.
Node表示的是作业流中的节点信息,包含job文件名,properties属性文件名,作业类型,
Edge表示的是节点之间边信息.
loadProjectFlow
将project目录解析成flow,properties json串,将一些无效的job,flow预先剔除掉.
无效的job:
- type属性没写的
- 重复job(多次出现将会被剔除掉)
无效依赖:
- 只身依赖(自己依赖自己)
- 无效依赖(所依赖的job不存在)
- 依赖于job是个重复job
代码
public void loadProjectFlow(Project project, File baseDirectory) {
propsList = new ArrayList<Props>();//整个工程的.properties配置列表
flowPropsList = new ArrayList<FlowProps>();//作业流.properties配置列表
jobPropsMap = new HashMap<String, Props>();//jobName->.job配置
nodeMap = new HashMap<String, Node>();//jobName->Node
flowMap = new HashMap<String, Flow>();//flowName->Flow
errors = new HashSet<String>();
duplicateJobs = new HashSet<String>();//重复任务名(jobName)
nodeDependencies = new HashMap<String, Map<String, Edge>>();//jobname->sourceJobName->依赖的边
rootNodes = new HashSet<String>();//根节点,解释一下,这里的根节点是flow中最后的节点
flowDependencies = new HashMap<String, Set<String>>();//flow于flow之间的依赖关系,解决内嵌之间依赖关系
// Load all the props files and create the Node objects
loadProjectFromDir(baseDirectory.getPath(), baseDirectory, null);
jobPropertiesCheck(project);
// Create edges and find missing dependencies
resolveDependencies();
// Create the flows.
buildFlowsFromDependencies();
// Resolve embedded flows
resolveEmbeddedFlows();
}
loadProjectFlow函数用于解析工作目录,解析job,构建工作流.
第一步loadProjectFromDir(baseDirectory.getPath(), baseDirectory, null);
目的有两个,一将配置文件解析成类,二排除文件重复job
加载.properties,.job文件,将加载的配置放入
flowPropsList,propsList.(.properties)
jobPropsMap,duplicateJobs,nodeMap.(.job)
nodeMap存储的是所有的节点信息
jobPropertiesCheck函数,检查job任务属性是否合格.azkaban中会限制每个任务最大内存和最小内存.如果超过job的社会超过限制,就会放入error中.
private void jobPropertiesCheck(Project project) {
// if project is in the memory check whitelist, then we don't need to check
// its memory settings
if (ProjectWhitelist.isProjectWhitelisted(project.getId(),
ProjectWhitelist.WhitelistType.MemoryCheck)) {
return;
}
String maxXms = props.getString(JOB_MAX_XMS, MAX_XMS_DEFAULT);
String maxXmx = props.getString(JOB_MAX_XMX, MAX_XMX_DEFAULT);
long sizeMaxXms = Utils.parseMemString(maxXms);
long sizeMaxXmx = Utils.parseMemString(maxXmx);
for (String jobName : jobPropsMap.keySet()) {
Props jobProps = jobPropsMap.get(jobName);
String xms = jobProps.getString(XMS, null);
if (xms != null && !PropsUtils.isVarialbeReplacementPattern(xms)
&& Utils.parseMemString(xms) > sizeMaxXms) {
errors.add(String.format(
"%s: Xms value has exceeded the allowed limit (max Xms = %s)",
jobName, maxXms));
}
String xmx = jobProps.getString(XMX, null);
if (xmx != null && !PropsUtils.isVarialbeReplacementPattern(xmx)
&& Utils.parseMemString(xmx) > sizeMaxXmx) {
errors.add(String.format(
"%s: Xmx value has exceeded the allowed limit (max Xmx = %s)",
jobName, maxXmx));
}
// job callback properties check
JobCallbackValidator.validate(jobName, props, jobProps, errors);
}
}
resolveDependencies回溯依赖关系,得到所有Edge,将无效依赖排除(依赖的job不存在,依赖的job是重复job)
buildFlowsFromDependencies利用之前解析好Nodes,Edges,回溯形成flow.
//这里所谓的根节点是末节点
private void buildFlowsFromDependencies() {
//找出所有的依赖节点
// Find all root nodes by finding ones without dependents.
HashSet<String> nonRootNodes = new HashSet<String>();
for (Map<String, Edge> edges : nodeDependencies.values()) {
for (String sourceId : edges.keySet()) {
nonRootNodes.add(sourceId);
}
}
// Now create flows. Bad flows are marked invalid
Set<String> visitedNodes = new HashSet<String>();
for (Node base : nodeMap.values()) {
// Root nodes can be discovered when parsing jobs
if (rootNodes.contains(base.getId())
|| !nonRootNodes.contains(base.getId())) {
rootNodes.add(base.getId());
Flow flow = new Flow(base.getId());
Props jobProp = jobPropsMap.get(base.getId());
// Dedup with sets
@SuppressWarnings("unchecked")
List<String> successEmailList =
jobProp.getStringList(CommonJobProperties.SUCCESS_EMAILS,
Collections.EMPTY_LIST);
Set<String> successEmail = new HashSet<String>();
for (String email : successEmailList) {
successEmail.add(email.toLowerCase());
}
@SuppressWarnings("unchecked")
List<String> failureEmailList =
jobProp.getStringList(CommonJobProperties.FAILURE_EMAILS,
Collections.EMPTY_LIST);
Set<String> failureEmail = new HashSet<String>();
for (String email : failureEmailList) {
failureEmail.add(email.toLowerCase());
}
@SuppressWarnings("unchecked")
List<String> notifyEmailList =
jobProp.getStringList(CommonJobProperties.NOTIFY_EMAILS,
Collections.EMPTY_LIST);
for (String email : notifyEmailList) {
email = email.toLowerCase();
successEmail.add(email);
failureEmail.add(email);
}
flow.addFailureEmails(failureEmail);
flow.addSuccessEmails(successEmail);
flow.addAllFlowProperties(flowPropsList);
constructFlow(flow, base, visitedNodes);//不断的递归,直到依赖为null为止
flow.initialize();
flowMap.put(base.getId(), flow);
}
}
}