Job日志
mapred-site.xml配置任务执行日志路径(hdfs)
<property>
<name>yarn.app.mapreduce.am.staging-dir</name>
<value>/data/yarn/stage</value>
</property>
<property>
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>/data/yarn/intermediate_done</value>
</property>
<property>
<name>mapreduce.jobhistory.done-dir</name>
<value>/data/yarn/done</value>
</property>
- 作业启动时,hadoop会将作业信息放在${yarn.app.mapreduce.am.staging-dir}/${user}/.staging/${job_id}目录
- 作业完成后,作业数据会被移到${mapreduce.jobhistory.intermediate-done-dir}/${user}目录
- intermediate-done-dir只是临时中转站,hadoop会定时将此目录的数据移到done地址: ${mapreduce.jobhistory.done-dir}/${year}/${month}/${day}/${serialPart}
.jhist保存job的执行信息,对应JobInfo类;conf是job的配置信息。只针对MapReduce类型任务
org.apache.hadoop.mapreduce.v2.hs.JobHistory,默认每3分钟move intermediate to done
protected void serviceStart() throws Exception {
hsManager.start();
if (storage instanceof Service) {
((Service) storage).start();
}
scheduledExecutor = new ScheduledThreadPoolExecutor(2,
new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
.build());
// moveThreadInterval = conf.getLong(JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS, JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
// "mapreduce.jobhistory.move.interval-ms": 3*60*1000L
scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(),
moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);
// Start historyCleaner
scheduleHistoryCleaner();
super.serviceStart();
}
Global
Global是入口,继承play框架的GlobalSettings类,并重写onStart()、onStop()方法。也就是调用DrElephant对象的start()、kill()方法
public class Global extends GlobalSettings {
DrElephant _drElephant;
public void onStart(Application app) {
Logger.info("Starting Application...");
fixJavaKerberos();
try {
_drElephant = new DrElephant();
// DrElephant是Thread子类
_drElephant.start();
} catch (IOException e) {
Logger.error("Application start failed...", e);
}
}
public void onStop(Application app) {
Logger.info("Stopping application...");
if (_drElephant != null) {
_drElephant.kill();
}
}
... // fixJavaKerberos
}
DrElephant
DrElephant类继承Thread线程类,重写run()方法。主要用来加载app-conf目录下的配置文件,并调用ElephantRunner
public class DrElephant extends Thread {
public static final String AUTO_TUNING_ENABLED = "autotuning.enabled";
private static final Logger logger = Logger.getLogger(DrElephant.class);
private ElephantRunner _elephant;
private AutoTuner _autoTuner;
private Thread _autoTunerThread;
private Boolean autoTuningEnabled;
public DrElephant() throws IOException {
// FileSystem.get(new Configuration()).getDefaultBlockSize(new Path("/"));
// getConf().getLong("fs.local.block.size", 32 * 1024 * 1024)
HDFSContext.load();
// instance()方法调用loadConfiguration(),加载配置
// 返回AutoTuningConf.xml配置信息
Configuration configuration = ElephantContext.instance().getAutoTuningConf();
autoTuningEnabled = configuration.getBoolean(AUTO_TUNING_ENABLED, false);
logger.debug("Auto Tuning Configuration: " + configuration.toString());
// 新建ElephantRunner对象,implements Runnable
_elephant = new ElephantRunner();
if (autoTuningEnabled) {
// job优化器
_autoTuner = new AutoTuner();
_autoTunerThread = new Thread(_autoTuner, "Auto Tuner Thread");
}
}
// 分别执行ElephantRunner类、AutoTuner类的run()方法
@Override
public void run() {
if (_autoTunerThread != null) {
logger.debug("Starting auto tuner thread ");
_autoTunerThread.start();
}
_elephant.run();
}
public void kill() {
if (_elephant != null) {
// 执行ElephantRunner对象的kill()方法: _threadPoolExecutor.shutdownNow()
_elephant.kill();
}
if (_autoTunerThread != null) {
// 终止线程的写法
// thread.interrupt() + while (!Thread.currentThread().isInterrupted()){...}
_autoTunerThread.interrupt();
}
}
}
ElephantContext
DrElephant对象的构造函数执行ElephantContext.instance()时,会加载系统各组件配置文件。对应configurations目录下的类
private void loadConfiguration() {
// AggregatorConf.xml
loadAggregators();
// FetcherConf.xml
loadFetchers();
// HeuristicConf.xml
loadHeuristics();
// JobTypeConf.xml
loadJobTypes();
// GeneralConf.xml
loadGeneralConf();
// AutoTuningConf.xml
loadAutoTuningConf();
// It is important to configure supported types in the LAST step so that we could have information from all
// configurable components.
/**
* Decides what application types can be supported.
*
* An application type is supported if all the below are true.
* 1. A Fetcher is defined in FetcherConf.xml for the application type.
* 2. At least one Heuristic is configured in HeuristicConf.xml for the application type.
* 3. At least one job type is configured in JobTypeConf.xml for the application type.
*/
// 取_typeToFetcher、_typeToHeuristics、_appTypeToJobTypes、_typeToAggregator中ApplicationType的交集,同时将这4个Map的key retainAll,也就是只保留交集ApplicationType
configureSupportedApplicationTypes();
}
解析xml文件,再执行XXXConfiguration对象的parseXXXConfiguration()方法,将xml数据映射成XXXConfigurationData对象,并存入ElephantContext对象的Map中
InputStream instream = ClassLoader.getSystemClassLoader().getResourceAsStream(filePath);
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
Document document = builder.parse(instream);
ElephantRunner
此类是系统的核心类,包含AnalyticJobGeneratorHadoop2.fetchAnalyticJobs()获取yarn jobs、分析每个job的AnalyticJob.getAnalysis()、保存分析数据AppResult.save()、Metrics统计MetricsController.init()、获取RM的Active URL:AnalyticJobGeneratorHadoop2.updateResourceManagerAddresses()
public class ElephantRunner implements Runnable {
private static final Logger logger = Logger.getLogger(ElephantRunner.class);
private static final long FETCH_INTERVAL = 60 * 1000; // Interval between fetches
private static final long RETRY_INTERVAL = 60 * 1000; // Interval between retries
private static final int EXECUTOR_NUM = 5; // The number of executor threads to analyse the jobs
private static final String FETCH_INTERVAL_KEY = "drelephant.analysis.fetch.interval";
private static final String RETRY_INTERVAL_KEY = "drelephant.analysis.retry.interval";
private static final String EXECUTOR_NUM_KEY = "drelephant.analysis.thread.count";
private AtomicBoolean _running = new AtomicBoolean(true);
private long lastRun;
private long _fetchInterval;
private long _retryInterval;
private int _executorNum;
private HadoopSecurity _hadoopSecurity;
private ThreadPoolExecutor _threadPoolExecutor;
private AnalyticJobGenerator _analyticJobGenerator;
private void loadGeneralConfiguration() {
Configuration configuration = ElephantContext.instance().getGeneralConf();
// "drelephant.analysis.thread.count" : 5
_executorNum = Utils.getNonNegativeInt(configuration, EXECUTOR_NUM_KEY, EXECUTOR_NUM);
// "drelephant.analysis.fetch.interval" : 60*1000
_fetchInterval = Utils.getNonNegativeLong(configuration, FETCH_INTERVAL_KEY, FETCH_INTERVAL);
// "drelephant.analysis.retry.interval" : 60*1000
_retryInterval = Utils.getNonNegativeLong(configuration, RETRY_INTERVAL_KEY, RETRY_INTERVAL);
}
private void loadAnalyticJobGenerator() {
// conf.get("mapreduce.framework.name").equals("yarn")
if (HadoopSystemContext.isHadoop2Env()) {
// 构造AnalyticJobGeneratorHadoop2对象获取待分析jobs
_analyticJobGenerator = new AnalyticJobGeneratorHadoop2();
} else {
throw new RuntimeException("Unsupported Hadoop major version detected. It is not 2.x.");
}
try {
// 设置_fetchStartTime、_lastTime,也就是获取job的startTime
// 获取Active状态的_resourceManagerAddress,因为RM是HA的
_analyticJobGenerator.configure(ElephantContext.instance().getGeneralConf());
} catch (Exception e) {
logger.error("Error occurred when configuring the analysis provider.", e);
throw new RuntimeException(e);
}
}
@Override
public void run() {
logger.info("Dr.elephant has started");
try {
_hadoopSecurity = HadoopSecurity.getInstance();
// 特权访问,绕过HDFS的权限验证
_hadoopSecurity.doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
HDFSContext.load();
loadGeneralConfiguration();
loadAnalyticJobGenerator();
// 多余!在前面的DrElephant类构造函数里已经执行过ElephantContext.instance()
ElephantContext.init();
// Initialize the metrics registries.
// 注册job、queue、gc、memory、healthcheck等统计信息
// CustomGarbageCollectorMetricSet类对比GarbageCollectorMetricSet类,添加jvmUptime、gc2UptimeRatio信息
// MemoryUsageGaugeSet类直接使用metrics-jvm组件
// ThreadDeadlockHealthCheck通过ThreadDeadlockDetector获取死锁线程数,判定HealthCheck
MetricsController.init();
logger.info("executor num is " + _executorNum);
if (_executorNum < 1) {
throw new RuntimeException("Must have at least 1 worker thread.");
}
// 构建ThreadFactory,以及定长线程池
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("dr-el-executor-thread-%d").build();
_threadPoolExecutor = new ThreadPoolExecutor(_executorNum, _executorNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), factory);
// 循环处理整个流程!running标识以及线程中断检查
// 使用ScheduledExecutorService.scheduleAtFixedRate()更好些
while (_running.get() && !Thread.currentThread().isInterrupted()) {
// 每次都更新RM的URL
_analyticJobGenerator.updateResourceManagerAddresses();
lastRun = System.currentTimeMillis();
logger.info("Fetching analytic job list...");
try {
// 好吧,每次while都要check。HadoopSecurity.getInstance()已经做了check
_hadoopSecurity.checkLogin();
} catch (IOException e) {
logger.info("Error with hadoop kerberos login", e);
//Wait for a while before retry
// sleep 60s
waitInterval(_retryInterval);
continue;
}
List<AnalyticJob> todos;
try {
// 从RM中拉取成功或失败的jobs
// http://rmhost/ws/v1/cluster/apps?finalStatus=SUCCEEDED&finishedTimeBegin=%s&finishedTimeEnd=%s
// http://rmhost/ws/v1/cluster/apps?finalStatus=FAILED&finishedTimeBegin=%s&finishedTimeEnd=%s
todos = _analyticJobGenerator.fetchAnalyticJobs();
} catch (Exception e) {
logger.error("Error fetching job list. Try again later...", e);
// Wait for a while before retry
// IO异常时
waitInterval(_retryInterval);
continue;
}
for (AnalyticJob analyticJob : todos) {
// 多线程并发执行ExecutorJob的run()方法
// 获取job的Counters统计信息,启发式算法生成数据入库
_threadPoolExecutor.submit(new ExecutorJob(analyticJob));
}
int queueSize = _threadPoolExecutor.getQueue().size();
MetricsController.setQueueSize(queueSize);
logger.info("Job queue size is " + queueSize);
// Wait for a while before next fetch
waitInterval(_fetchInterval);
}
logger.info("Main thread is terminated.");
return null;
}
});
} catch (Exception e) {
logger.error(e.getMessage());
// Throwable转成String方式
logger.error(ExceptionUtils.getStackTrace(e));
}
}
private class ExecutorJob implements Runnable {
private AnalyticJob _analyticJob;
ExecutorJob(AnalyticJob analyticJob) {
_analyticJob = analyticJob;
}
@Override
public void run() {
try {
String analysisName = String.format("%s %s", _analyticJob.getAppType().getName(), _analyticJob.getAppId());
long analysisStartTimeMillis = System.currentTimeMillis();
logger.info(String.format("Analyzing %s", analysisName));
// 启发式分析job
AppResult result = _analyticJob.getAnalysis();
// 分析数据保存入库
result.save();
long processingTime = System.currentTimeMillis() - analysisStartTimeMillis;
logger.info(String.format("Analysis of %s took %sms", analysisName, processingTime));
MetricsController.setJobProcessingTime(processingTime);
MetricsController.markProcessedJobs();
} catch (InterruptedException e) {
logger.info("Thread interrupted");
logger.info(e.getMessage());
logger.info(ExceptionUtils.getStackTrace(e));
Thread.currentThread().interrupt();
} catch (TimeoutException e){
logger.warn("Timed out while fetching data. Exception message is: " + e.getMessage());
jobFate();
}catch (Exception e) {
logger.error(e.getMessage());
logger.error(ExceptionUtils.getStackTrace(e));
jobFate();
}
}
// 将失败的job先放到first queue,retry 3次;再放入second queue,retry 5次。8次重试还不成功_skippedJobs.mark();
private void jobFate () {
// job retry limit 3
if (_analyticJob != null && _analyticJob.retry()) {
logger.warn("Add analytic job id [" + _analyticJob.getAppId() + "] into the retry list.");
_analyticJobGenerator.addIntoRetries(_analyticJob);
} else if (_analyticJob != null && _analyticJob.isSecondPhaseRetry()) {
//Putting the job into a second retry queue which fetches jobs after some interval. Some spark jobs may need more time than usual to process, hence the queue.
logger.warn("Add analytic job id [" + _analyticJob.getAppId() + "] into the second retry list.");
_analyticJobGenerator.addIntoSecondRetryQueue(_analyticJob);
} else {
if (_analyticJob != null) {
MetricsController.markSkippedJob();
logger.error("Drop the analytic job. Reason: reached the max retries for application id = [" + _analyticJob.getAppId() + "].");
}
}
}
}
// 这里sleep不是固定时长:scheduleAtFixedRate
private void waitInterval(long interval) {
// Wait for long enough
long nextRun = lastRun + interval;
long waitTime = nextRun - System.currentTimeMillis();
if (waitTime <= 0) {
return;
}
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
// 必须要重新设置中断标记,不能丢失
Thread.currentThread().interrupt();
}
}
public void kill() {
_running.set(false);
if (_threadPoolExecutor != null) {
_threadPoolExecutor.shutdownNow();
}
}
}
AnalyticJobGeneratorHadoop2
该类负责:更新Active RM的URL: updateResourceManagerAddresses()、读取succeeded、failed任务: readApps()、fetchAnalyticJobs()
返回的AnalyticJob对象包含: appId、type、user、name、queueName、trackingUrl、startTime、finishTime字段
public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator {
private static final Logger logger = Logger.getLogger(AnalyticJobGeneratorHadoop2.class);
private static final String RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.webapp.address";
private static final String IS_RM_HA_ENABLED = "yarn.resourcemanager.ha.enabled";
private static final String RESOURCE_MANAGER_IDS = "yarn.resourcemanager.ha.rm-ids";
private static final String RM_NODE_STATE_URL = "http://%s/ws/v1/cluster/info";
private static final String FETCH_INITIAL_WINDOW_MS = "drelephant.analysis.fetch.initial.windowMillis";
private static Configuration configuration;
// We provide one minute job fetch delay due to the job sending lag from AM/NM to JobHistoryServer HDFS
private static final long FETCH_DELAY = 60000;
// Generate a token update interval with a random deviation so that it does not update the token exactly at the same
// time with other token updaters (e.g. ElephantFetchers).
private static final long TOKEN_UPDATE_INTERVAL =
Statistics.MINUTE_IN_MS * 30 + new Random().nextLong() % (3 * Statistics.MINUTE_IN_MS);
private String _resourceManagerAddress;
private long _lastTime = 0;
private long _fetchStartTime = 0;
private long _currentTime = 0;
private long _tokenUpdatedTime = 0;
private AuthenticatedURL.Token _token;
private AuthenticatedURL _authenticatedURL;
private final ObjectMapper _objectMapper = new ObjectMapper();
private final Queue<AnalyticJob> _firstRetryQueue = new ConcurrentLinkedQueue<AnalyticJob>();
// bug!应该使用ConcurrentLinkedQueue代替
private final ArrayList<AnalyticJob> _secondRetryQueue = new ArrayList<AnalyticJob>();
// 获取ACTIVE状态的RM HOST
public void updateResourceManagerAddresses() {
// "yarn.resourcemanager.ha.enabled"
if (Boolean.valueOf(configuration.get(IS_RM_HA_ENABLED))) {
// "yarn.resourcemanager.ha.rm-ids"
String resourceManagers = configuration.get(RESOURCE_MANAGER_IDS);
if (resourceManagers != null) {
logger.info("The list of RM IDs are " + resourceManagers);
List<String> ids = Arrays.asList(resourceManagers.split(","));
_currentTime = System.currentTimeMillis();
updateAuthToken();
for (String id : ids) {
try {
// RM_HOST: "yarn.resourcemanager.webapp.address.xxxx"
String resourceManager = configuration.get(RESOURCE_MANAGER_ADDRESS + "." + id);
// "http://${RM_HOST}/ws/v1/cluster/info"
String resourceManagerURL = String.format(RM_NODE_STATE_URL, resourceManager);
logger.info("Checking RM URL: " + resourceManagerURL);
JsonNode rootNode = readJsonNode(new URL(resourceManagerURL));
String status =
// 读取clusterInfo下haState字段值
rootNode.path("clusterInfo").path("haState").getValueAsText();
if (status.equals("ACTIVE")) {
// 当haState是ACTIVE时,赋值RM_HOST
logger.info(resourceManager + " is ACTIVE");
_resourceManagerAddress = resourceManager;
break;
} else {
logger.info(resourceManager + " is STANDBY");
}
} catch (AuthenticationException e) {
logger.info("Error fetching resource manager " + id + " state " + e.getMessage());
} catch (IOException e) {
logger.info("Error fetching Json for resource manager "+ id + " status " + e.getMessage());
}
}
}
} else {
// 当RM不是HA部署时,直接读取"yarn.resourcemanager.webapp.address"配置项
_resourceManagerAddress = configuration.get(RESOURCE_MANAGER_ADDRESS);
}
if (_resourceManagerAddress == null) {
throw new RuntimeException(
"Cannot get YARN resource manager address from Hadoop Configuration property: [" + RESOURCE_MANAGER_ADDRESS + "].");
}
}
@Override
public void configure(Configuration configuration)
throws IOException {
this.configuration = configuration;
String initialFetchWindowString = configuration.get(FETCH_INITIAL_WINDOW_MS);
if (initialFetchWindowString != null) {
long initialFetchWindow = Long.parseLong(initialFetchWindowString);
_lastTime = System.currentTimeMillis() - FETCH_DELAY - initialFetchWindow;
_fetchStartTime = _lastTime;
}
updateResourceManagerAddresses();
}
/**
* Fetch all the succeeded and failed applications/analytic jobs from the resource manager.
*
* @return
* @throws IOException
* @throws AuthenticationException
*/
@Override
public List<AnalyticJob> fetchAnalyticJobs()
throws IOException, AuthenticationException {
List<AnalyticJob> appList = new ArrayList<AnalyticJob>();
// There is a lag of job data from AM/NM to JobHistoryServer HDFS, we shouldn't use the current time, since there
// might be new jobs arriving after we fetch jobs. We provide one minute delay to address this lag.
_currentTime = System.currentTimeMillis() - FETCH_DELAY;
updateAuthToken();
logger.info("Fetching recent finished application runs between last time: " + (_lastTime + 1) + ", and current time: " + _currentTime);
// Fetch all succeeded apps
// "http://${RM_HOST}/ws/v1/cluster/apps?finalStatus=SUCCEEDED&state=FINISHED&finishedTimeBegin=%s&finishedTimeEnd=%s"
URL succeededAppsURL = new URL(new URL("http://" + _resourceManagerAddress), String.format("/ws/v1/cluster/apps?finalStatus=SUCCEEDED&finishedTimeBegin=%s&finishedTimeEnd=%s",String.valueOf(_lastTime + 1), String.valueOf(_currentTime)));
logger.info("The succeeded apps URL is " + succeededAppsURL);
// 解析JSON
List<AnalyticJob> succeededApps = readApps(succeededAppsURL);
appList.addAll(succeededApps);
// Fetch all failed apps
// state: Application Master State
// finalStatus: Status of the Application as reported by the Application Master
// "http://${RM_HOST}/ws/v1/cluster/apps?finalStatus=FAILED&state=FINISHED&finishedTimeBegin=%s&finishedTimeEnd=%s"
URL failedAppsURL = new URL(new URL("http://" + _resourceManagerAddress), String.format("/ws/v1/cluster/apps?finalStatus=FAILED&state=FINISHED&finishedTimeBegin=%s&finishedTimeEnd=%s",
String.valueOf(_lastTime + 1), String.valueOf(_currentTime)));
List<AnalyticJob> failedApps = readApps(failedAppsURL);
logger.info("The failed apps URL is " + failedAppsURL);
appList.addAll(failedApps);
// Append promises from the retry queue at the end of the list
// first队列的job是每分钟重试一次,共重试3次
while (!_firstRetryQueue.isEmpty()) {
appList.add(_firstRetryQueue.poll());
}
Iterator iteratorSecondRetry = _secondRetryQueue.iterator();
while (iteratorSecondRetry.hasNext()) {
AnalyticJob job = (AnalyticJob) iteratorSecondRetry.next();
// 每60s进行fetchAnalyticJobs,导致_timeLeftToRetry--
// 而job每重试一次_secondRetries++,导致this._timeLeftToRetry = (this._secondRetries) * 5; 所以失败job每次重试的时间间隔是5 10 15 20 25分钟,共重试5次
// 失败job尝试的时间间隔总为1 1 1 5 10 15 20 25。所以可以只维护一个retryQueue,然后job有个Queue列表保存执行时间点: 1 2 3 8 18 33 53 78,每次fetchAnalyticJobs时,++retry,若值在执行时间点Queue.peek中,则把job放入appList,同时Queue.poll,思路更清晰
// 算法的时间间隔不太科学,可以参考Curator的BoundedExponentialBackoffRetry算法
if(job.readyForSecondRetry()) {
appList.add(job);
iteratorSecondRetry.remove();
}
}
_lastTime = _currentTime;
return appList;
}
@Override
public void addIntoRetries(AnalyticJob promise) {
_firstRetryQueue.add(promise);
int retryQueueSize = _firstRetryQueue.size();
MetricsController.setRetryQueueSize(retryQueueSize);
logger.info("Retry queue size is " + retryQueueSize);
}
@Override
public void addIntoSecondRetryQueue(AnalyticJob promise) {
// 这里会重置secondretry: this._secondRetries * 5
_secondRetryQueue.add(promise.setTimeToSecondRetry());
int secondRetryQueueSize = _secondRetryQueue.size();
MetricsController.setSecondRetryQueueSize(secondRetryQueueSize);
logger.info("Second Retry queue size is " + secondRetryQueueSize);
}
/**
* Authenticate and update the token
*/
// 项目中没用到: _token、_authenticatedURL
private void updateAuthToken() {
if (_currentTime - _tokenUpdatedTime > TOKEN_UPDATE_INTERVAL) {
logger.info("AnalysisProvider updating its Authenticate Token...");
_token = new AuthenticatedURL.Token();
_authenticatedURL = new AuthenticatedURL();
_tokenUpdatedTime = _currentTime;
}
}
/**
* Connect to url using token and return the JsonNode
*
* @param url The url to connect to
* @return
* @throws IOException Unable to get the stream
* @throws AuthenticationException Authencation problem
*/
// 根据URL,读取Stream,并解析成JsonNode对象
private JsonNode readJsonNode(URL url)
throws IOException, AuthenticationException {
return _objectMapper.readTree(url.openStream());
}
/**
* Parse the returned json from Resource manager
*
* @param url The REST call
* @return
* @throws IOException
* @throws AuthenticationException Problem authenticating to resource manager
*/
private List<AnalyticJob> readApps(URL url) throws IOException, AuthenticationException{
List<AnalyticJob> appList = new ArrayList<AnalyticJob>();
JsonNode rootNode = readJsonNode(url);
// <apps>
// <app>
// <id>xxx</id>
// <user>xxx</user>
// <queue>xxx</queue>
// </app>
// </apps>
JsonNode apps = rootNode.path("apps").path("app");
for (JsonNode app : apps) {
String appId = app.get("id").getValueAsText();
// When called first time after launch, hit the DB and avoid duplicated analytic jobs that have been analyzed
// before.
// 先判定时间,初始时需要检查数据库,保证job不重复获取
if (_lastTime > _fetchStartTime || (_lastTime == _fetchStartTime && AppResult.find.byId(appId) == null)) {
String user = app.get("user").getValueAsText();
String name = app.get("name").getValueAsText();
String queueName = app.get("queue").getValueAsText();
String trackingUrl = app.get("trackingUrl") != null? app.get("trackingUrl").getValueAsText() : null;
long startTime = app.get("startedTime").getLongValue();
long finishTime = app.get("finishedTime").getLongValue();
ApplicationType type =
ElephantContext.instance().getApplicationTypeForName(app.get("applicationType").getValueAsText());
// If the application type is supported
if (type != null) {
AnalyticJob analyticJob = new AnalyticJob();
// 获取的字段信息。特别的,type字段是根据RM URL获取的,而不是XXXConf.xml随便写的
analyticJob.setAppId(appId).setAppType(type).setUser(user).setName(name).setQueueName(queueName).setTrackingUrl(trackingUrl).setStartTime(startTime).setFinishTime(finishTime);
appList.add(analyticJob);
}
}
}
return appList;
}
}
AnalyticJob
AnalyticJob类主要负责: 获取job的配置以及执行过程统计ElephantFetcher.fetchData()、根据统计信息进行启发式算法运算Heuristic.apply(data)
根据FetcherConf.xml配置,MapReduce类型的任务通过MapReduceFSFetcherHadoop2类获取统计信息;Spark类型通过SparkFetcher类获取
/**
* This class wraps some basic meta data of a completed application run (notice that the information is generally the
* same regardless of hadoop versions and application types), and then promises to return the analyzed result later.
*/
public class AnalyticJob {
/**
* Returns the analysed AppResult that could be directly serialized into DB.
*
* This method fetches the data using the appropriate application fetcher, runs all the heuristics on them and
* loads it into the AppResult model.
*
* @throws Exception if the analysis process encountered a problem.
* @return the analysed AppResult
*/
public AppResult getAnalysis() throws Exception {
ElephantFetcher fetcher = ElephantContext.instance().getFetcherForApplicationType(getAppType());
// 根据FetcherConf.xml获取: "tez"->"TezFetcher", "mapreduce"->"MapReduceFSFetcherHadoop2", "spark"->"SparkFetcher"
// tez、mapreduce、spark对应AnalyticJob里的_type
// 封装job的conf配置、map、reduce信息、Counters统计、Time等
HadoopApplicationData data = fetcher.fetchData(this);
// JobTypeConf.xml,以applicationtype为key,value是List<JobType>,JobType封装name、conf、value字段
// 返回rm job的配置信息key跟jobType的conf字段匹配的jobType
JobType jobType = ElephantContext.instance().matchJobType(data);
String jobTypeName = jobType == null ? UNKNOWN_JOB_TYPE : jobType.getName();
// Run all heuristics over the fetched data
List<HeuristicResult> analysisResults = new ArrayList<HeuristicResult>();
if (data == null || data.isEmpty()) {
// Example: a MR job has 0 mappers and 0 reducers
logger.info("No Data Received for analytic job: " + getAppId());
analysisResults.add(HeuristicResult.NO_DATA);
} else {
// 获取HeuristicConf.xml里applicationtype对应的所有classname启发式对象
List<Heuristic> heuristics = ElephantContext.instance().getHeuristicsForApplicationType(getAppType());
for (Heuristic heuristic : heuristics) {
// 获取HeuristicConf.xml里params标签下的exclude_jobtypes_filter值
String confExcludedApps = heuristic.getHeuristicConfData().getParamMap().get(EXCLUDE_JOBTYPE);
if (confExcludedApps == null || confExcludedApps.length() == 0 ||
!Arrays.asList(confExcludedApps.split(",")).contains(jobTypeName)) { // 应用启发式算法生成HeuristicResult分析结果
HeuristicResult result = heuristic.apply(data);
if (result != null) {
analysisResults.add(result);
}
}
}
}
// 根据AggregatorConf.xml获取appType对应的HadoopMetricsAggregator实现类
HadoopMetricsAggregator hadoopMetricsAggregator = ElephantContext.instance().getAggregatorForApplicationType(getAppType());
// 分别计算Job的资源使用、资源浪费、等待耗时 3大指标
hadoopMetricsAggregator.aggregate(data);
HadoopAggregatedData hadoopAggregatedData = hadoopMetricsAggregator.getResult();
// 配置Job基础信息
// Utils.truncateField(),根据limit截断字符串: field.substring(0, limit - 3) + "..."
AppResult result = new AppResult();
result.id = Utils.truncateField(getAppId(), AppResult.ID_LIMIT, getAppId());
result.trackingUrl = Utils.truncateField(getTrackingUrl(), AppResult.TRACKING_URL_LIMIT, getAppId());
result.queueName = Utils.truncateField(getQueueName(), AppResult.QUEUE_NAME_LIMIT, getAppId());
result.username = Utils.truncateField(getUser(), AppResult.USERNAME_LIMIT, getAppId());
result.startTime = getStartTime();
result.finishTime = getFinishTime();
result.name = Utils.truncateField(getName(), AppResult.APP_NAME_LIMIT, getAppId());
result.jobType = Utils.truncateField(jobTypeName, AppResult.JOBTYPE_LIMIT, getAppId());
result.resourceUsed = hadoopAggregatedData.getResourceUsed();
result.totalDelay = hadoopAggregatedData.getTotalDelay();
result.resourceWasted = hadoopAggregatedData.getResourceWasted();
// 配置Job启发式算法计算值
int jobScore = 0;
result.yarnAppHeuristicResults = new ArrayList<AppHeuristicResult>();
Severity worstSeverity = Severity.NONE;
for (HeuristicResult heuristicResult : analysisResults) {
AppHeuristicResult detail = new AppHeuristicResult();
detail.heuristicClass = Utils.truncateField(heuristicResult.getHeuristicClassName(),
AppHeuristicResult.HEURISTIC_CLASS_LIMIT, getAppId());
detail.heuristicName = Utils.truncateField(heuristicResult.getHeuristicName(),
AppHeuristicResult.HEURISTIC_NAME_LIMIT, getAppId());
detail.severity = heuristicResult.getSeverity();
detail.score = heuristicResult.getScore();
// Load Heuristic Details
for (HeuristicResultDetails heuristicResultDetails : heuristicResult.getHeuristicResultDetails()) {
AppHeuristicResultDetails heuristicDetail = new AppHeuristicResultDetails();
heuristicDetail.yarnAppHeuristicResult = detail;
heuristicDetail.name = Utils.truncateField(heuristicResultDetails.getName(),
AppHeuristicResultDetails.NAME_LIMIT, getAppId());
heuristicDetail.value = Utils.truncateField(heuristicResultDetails.getValue(),
AppHeuristicResultDetails.VALUE_LIMIT, getAppId());
heuristicDetail.details = Utils.truncateField(heuristicResultDetails.getDetails(),
AppHeuristicResultDetails.DETAILS_LIMIT, getAppId());
// This was added for AnalyticTest. Commenting this out to fix a bug. Also disabling AnalyticJobTest.
//detail.yarnAppHeuristicResultDetails = new ArrayList<AppHeuristicResultDetails>();
// bug! yarnAppHeuristicResultDetails没有new ArrayList<>();
detail.yarnAppHeuristicResultDetails.add(heuristicDetail);
}
result.yarnAppHeuristicResults.add(detail);
// 取最严重的预警值
worstSeverity = Severity.max(worstSeverity, detail.severity);
jobScore += detail.score;
}
result.severity = worstSeverity;
result.score = jobScore;
// Retrieve information from job configuration like scheduler information and store them into result.
// 根据SchedulerConf.xml,将scheduler的信息保存到result
InfoExtractor.loadInfo(result, data);
return result;
}
...
}
MapReduceFSFetcherHadoop2
该类主要根据job的appId、finishTime获取{done-dir}、{intermediate-done-dir}两个hdfs路径,从而获取job对应的.jhist、.conf文件并解析,返回JobCounter、MapTask、ReducerTask信息
public class MapReduceFSFetcherHadoop2 extends MapReduceFetcher {
private static final Logger logger = Logger.getLogger(MapReduceFSFetcherHadoop2.class);
private static final String LOG_SIZE_XML_FIELD = "history_log_size_limit_in_mb";
private static final String HISTORY_SERVER_TIME_ZONE_XML_FIELD = "history_server_time_zone";
private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d";
private static final int SERIAL_NUMBER_DIRECTORY_DIGITS = 6;
protected static final double DEFALUT_MAX_LOG_SIZE_IN_MB = 500;
private FileSystem _fs;
private String _historyLocation;
private String _intermediateHistoryLocation;
private double _maxLogSizeInMB;
private TimeZone _timeZone;
public MapReduceFSFetcherHadoop2(FetcherConfigurationData fetcherConfData) throws IOException {
super(fetcherConfData);
// 设置单个.jhist文件最大数据量500MB
_maxLogSizeInMB = DEFALUT_MAX_LOG_SIZE_IN_MB;
if (fetcherConfData.getParamMap().get(LOG_SIZE_XML_FIELD) != null) {
double[] logLimitSize = Utils.getParam(fetcherConfData.getParamMap().get(LOG_SIZE_XML_FIELD), 1);
if (logLimitSize != null) {
_maxLogSizeInMB = logLimitSize[0];
}
}
logger.info("The history log limit of MapReduce application is set to " + _maxLogSizeInMB + " MB");
// 设置时区,这个初始时要修改FetcherConf.xml里history_server_time_zone的值为CST
String timeZoneStr = fetcherConfData.getParamMap().get(HISTORY_SERVER_TIME_ZONE_XML_FIELD);
_timeZone = timeZoneStr == null ? TimeZone.getDefault() : TimeZone.getTimeZone(timeZoneStr);
logger.info("Using timezone: " + _timeZone.getID());
Configuration conf = new Configuration();
this._historyLocation = conf.get("mapreduce.jobhistory.done-dir");
this._intermediateHistoryLocation = conf.get("mapreduce.jobhistory.intermediate-done-dir");
try {
URI uri = new URI(this._historyLocation);
// 基于"mapreduce.jobhistory.done-dir",创建FileSystem hdfs文件对象
this._fs = FileSystem.get(uri, conf);
} catch( URISyntaxException ex) {
this._fs = FileSystem.get(conf);
}
logger.info("Intermediate history dir: " + _intermediateHistoryLocation);
logger.info("History done dir: " + _historyLocation);
}
public String getHistoryLocation() {
return _historyLocation;
}
public double getMaxLogSizeInMB() {
return _maxLogSizeInMB;
}
public TimeZone getTimeZone() {
return _timeZone;
}
/**
* The location of a job history file is in format: {done-dir}/yyyy/mm/dd/{serialPart}.
* yyyy/mm/dd is the year, month and date of the finish time.
* serialPart is the first 6 digits of the serial number considering it as a 9 digits number.
* PS: The serial number is the last part of an app id.
* <p>
* For example, if appId = application_1461566847127_84624, then serial number is 84624.
* Consider it as a 9 digits number, serial number is 000084624. So the corresponding
* serialPart is 000084. If this application finish at 2016-5-30, its history file will locate
* at {done-dir}/2016/05/30/000084
* </p>
* <p>
* Furthermore, this location format is only satisfied for finished jobs in {done-dir} and not
* for running jobs in {intermediate-done-dir}.
* </p>
*/
// 根据job.getFinishTime()生成{done-dir}/{yyyy}/{MM}/{dd}/{serialPart}
// appId = application_1461566847127_84624,则{serialNumber}: 84624。再高位补0取9位:000084624,最后取前6位:000084 生成{serialPart}
protected String getHistoryDir(AnalyticJob job) {
// generate the date part
Calendar timestamp = Calendar.getInstance(_timeZone);
timestamp.setTimeInMillis(job.getFinishTime());
String datePart = String.format(TIMESTAMP_DIR_FORMAT,
timestamp.get(Calendar.YEAR),
timestamp.get(Calendar.MONTH) + 1,
timestamp.get(Calendar.DAY_OF_MONTH));
// generate the serial part
String appId = job.getAppId();
int serialNumber = Integer.parseInt(appId.substring(appId.lastIndexOf('_') + 1));
String serialPart = String.format("%09d", serialNumber)
.substring(0, SERIAL_NUMBER_DIRECTORY_DIGITS);
return StringUtils.join(new String[]{_historyLocation, datePart, serialPart, ""}, File.separator);
}
private DataFiles getHistoryFiles(AnalyticJob job) throws IOException {
// application_1461566847127_84624 -> job_1461566847127_84624
String jobId = Utils.getJobIdFromApplicationId(job.getAppId());
String jobConfPath = null;
String jobHistPath = null;
// Search files in done dir
// {done-dir}/{yyyy}/{MM}/{dd}/{serialPart}。注意: 这是hdfs文件系统
String jobHistoryDirPath = getHistoryDir(job);
// 当jobHistoryDir存在时,循环遍历文件名,contains(jobId)且后缀为"_conf.xml"、".jhist"时停止遍历
if (_fs.exists(new Path(jobHistoryDirPath))) {
RemoteIterator<LocatedFileStatus> it = _fs.listFiles(new Path(jobHistoryDirPath), false);
while (it.hasNext() && (jobConfPath == null || jobHistPath == null)) {
String name = it.next().getPath().getName();
if (name.contains(jobId)) {
if (name.endsWith("_conf.xml")) {
jobConfPath = jobHistoryDirPath + name;
} else if (name.endsWith(".jhist")) {
jobHistPath = jobHistoryDirPath + name;
}
}
}
}
// If some files are missing, search in the intermediate-done-dir in case the HistoryServer has
// not yet moved them into the done-dir.
// 当jobConfPath、jobHistPath为null时,说明done文件夹没找到对应job的conf配置
// intermediateDirPath: {intermediate-done-dir}/{user}/,从中转站读取job信息
String intermediateDirPath = _intermediateHistoryLocation + File.separator + job.getUser() + File.separator;
if (jobConfPath == null) {
jobConfPath = intermediateDirPath + jobId + "_conf.xml";
if (!_fs.exists(new Path(jobConfPath))) {
throw new FileNotFoundException("Can't find config of " + jobId + " in neither " + jobHistoryDirPath + " nor " + intermediateDirPath);
}
logger.info("Found job config in intermediate dir: " + jobConfPath);
}
if (jobHistPath == null) {
try {
RemoteIterator<LocatedFileStatus> it = _fs.listFiles(new Path(intermediateDirPath), false);
while (it.hasNext()) {
String name = it.next().getPath().getName();
if (name.contains(jobId) && name.endsWith(".jhist")) {
jobHistPath = intermediateDirPath + name;
logger.info("Found history file in intermediate dir: " + jobHistPath);
break;
}
}
} catch (FileNotFoundException e) {
logger.error("Intermediate history directory " + intermediateDirPath + " not found");
}
if (jobHistPath == null) {
throw new FileNotFoundException("Can't find history file of " + jobId + " in neither " + jobHistoryDirPath + " nor " + intermediateDirPath);
}
}
return new DataFiles(jobConfPath, jobHistPath);
}
@Override
public MapReduceApplicationData fetchData(AnalyticJob job) throws IOException {
// 封装jobConfPath、jobHistPath路径
DataFiles files = getHistoryFiles(job);
String confFile = files.getJobConfPath();
String histFile = files.getJobHistPath();
String appId = job.getAppId();
// application_1461566847127_84624 -> job_1461566847127_84624
String jobId = Utils.getJobIdFromApplicationId(appId);
MapReduceApplicationData jobData = new MapReduceApplicationData();
jobData.setAppId(appId).setJobId(jobId);
// 加载confFile配置: jobConf.addResource(),然后转成Properties
Configuration jobConf = new Configuration(false);
jobConf.addResource(_fs.open(new Path(confFile)), confFile);
Properties jobConfProperties = new Properties();
for (Map.Entry<String, String> entry : jobConf) {
jobConfProperties.put(entry.getKey(), entry.getValue());
}
jobData.setJobConf(jobConfProperties);
// Check if job history file is too large and should be throttled
// 限制job history文件大小,也就是字节长度<=500MB
if (_fs.getFileStatus(new Path(histFile)).getLen() > _maxLogSizeInMB * FileUtils.ONE_MB) {
String errMsg = "The history log of MapReduce application: " + appId + " is over the limit size of " + _maxLogSizeInMB + " MB, the parsing process gets throttled.";
logger.warn(errMsg);
jobData.setDiagnosticInfo(errMsg);
jobData.setSucceeded(false); // set succeeded to false to avoid heuristic analysis
return jobData;
}
// Analyze job history file
// 解析job history文件,new JobHistoryParser().parse()生成JobInfo对象
JobHistoryParser parser = new JobHistoryParser(_fs, histFile);
JobHistoryParser.JobInfo jobInfo = parser.parse();
// 判定是否解析异常
IOException parseException = parser.getParseException();
if (parseException != null) {
throw new RuntimeException("Could not parse history file " + histFile, parseException);
}
jobData.setSubmitTime(jobInfo.getSubmitTime());
jobData.setStartTime(jobInfo.getLaunchTime());
jobData.setFinishTime(jobInfo.getFinishTime());
String state = jobInfo.getJobStatus();
if (state.equals("SUCCEEDED")) {
jobData.setSucceeded(true);
}
else if (state.equals("FAILED")) {
jobData.setSucceeded(false);
jobData.setDiagnosticInfo(jobInfo.getErrorInfo());
} else {
throw new RuntimeException("job neither succeeded or failed. can not process it ");
}
// Fetch job counter
// 获取Counter统计信息 <group, <counter.name, counter.value>>
MapReduceCounterData jobCounter = getCounterData(jobInfo.getTotalCounters());
// Fetch task data
// 获取AllTasks,然后根据TaskType进行MAP、REDUCE分类。一个job对应多个Map、Reduce
Map<TaskID, JobHistoryParser.TaskInfo> allTasks = jobInfo.getAllTasks();
List<JobHistoryParser.TaskInfo> mapperInfoList = new ArrayList<JobHistoryParser.TaskInfo>();
List<JobHistoryParser.TaskInfo> reducerInfoList = new ArrayList<JobHistoryParser.TaskInfo>();
for (JobHistoryParser.TaskInfo taskInfo : allTasks.values()) {
if (taskInfo.getTaskType() == TaskType.MAP) {
mapperInfoList.add(taskInfo);
} else {
reducerInfoList.add(taskInfo);
}
}
if (jobInfo.getTotalMaps() > MAX_SAMPLE_SIZE) {
logger.debug(jobId + " total mappers: " + mapperInfoList.size());
}
if (jobInfo.getTotalReduces() > MAX_SAMPLE_SIZE) {
logger.debug(jobId + " total reducers: " + reducerInfoList.size());
}
// 获取Task信息列表
MapReduceTaskData[] mapperList = getTaskData(jobId, mapperInfoList);
MapReduceTaskData[] reducerList = getTaskData(jobId, reducerInfoList);
jobData.setCounters(jobCounter).setMapperData(mapperList).setReducerData(reducerList);
return jobData;
}
// <group, <counter.name, counter.value>>
private MapReduceCounterData getCounterData(Counters counters) {
MapReduceCounterData holder = new MapReduceCounterData();
if (counters != null) {
for (CounterGroup group : counters) {
String groupName = group.getName();
for (Counter counter : group) {
holder.set(groupName, counter.getName(), counter.getValue());
}
}
}
return holder;
}
// 根据TaskAttemptInfo计算map、shuffle、sort、reduce各阶段的执行时间
// taskExecTime按顺序:_totalTimeMs、_shuffleTimeMs、_sortTimeMs、_startTimeMs、_finishTimeMs
// MAP类型时,没有shuffle、sort阶段;REDUCE类型时,start->shuffle->sort->finish
private long[] getTaskExecTime(JobHistoryParser.TaskAttemptInfo attempInfo) {
long startTime = attempInfo.getStartTime();
long finishTime = attempInfo.getFinishTime();
boolean isMapper = (attempInfo.getTaskType() == TaskType.MAP);
long[] time;
if (isMapper) {
time = new long[]{finishTime - startTime, 0, 0, startTime, finishTime};
} else {
long shuffleFinishTime = attempInfo.getShuffleFinishTime();
long mergeFinishTime = attempInfo.getSortFinishTime();
time = new long[]{finishTime - startTime, shuffleFinishTime - startTime,
mergeFinishTime - shuffleFinishTime, startTime, finishTime};
}
return time;
}
// 获取Task信息列表,包括:taskId、attemptId、taskStatus、taskExecTime、taskCounter
protected MapReduceTaskData[] getTaskData(String jobId, List<JobHistoryParser.TaskInfo> infoList) {
// 获取FetcherConf.xml里sampling_enabled的设置。当需要抽样且infoList长度大于200时,Collections.shuffle(taskList),然后返回最小值Math.min(taskList.size(), MAX_SAMPLE_SIZE)
// 这里的infoList是单个job对应的Map或Reduce类型任务数列表
int sampleSize = sampleAndGetSize(jobId, infoList);
List<MapReduceTaskData> taskList = new ArrayList<MapReduceTaskData>();
for (int i = 0; i < sampleSize; i++) {
JobHistoryParser.TaskInfo tInfo = infoList.get(i);
String taskId = tInfo.getTaskId().toString();
TaskAttemptID attemptId = null;
// 根据TaskStatus获取对应的attemptId
if(tInfo.getTaskStatus().equals("SUCCEEDED")) {
attemptId = tInfo.getSuccessfulAttemptId();
} else {
attemptId = tInfo.getFailedDueToAttemptId();
}
MapReduceTaskData taskData = new MapReduceTaskData(taskId, attemptId == null ? "" : attemptId.toString() , tInfo.getTaskStatus());
// Counters细分Job粒度、Task粒度
MapReduceCounterData taskCounterData = getCounterData(tInfo.getCounters());
long[] taskExecTime = null;
if (attemptId != null) {
// 根据TaskAttemptInfo,获取Task各阶段执行耗时
taskExecTime = getTaskExecTime(tInfo.getAllTaskAttempts().get(attemptId));
}
taskData.setTimeAndCounter(taskExecTime, taskCounterData);
taskList.add(taskData);
}
return taskList.toArray(new MapReduceTaskData[taskList.size()]);
}
private class DataFiles {
private String jobConfPath;
private String jobHistPath;
public DataFiles(String confPath, String histPath) {
this.jobConfPath = confPath;
this.jobHistPath = histPath;
}
public String getJobConfPath() {
return jobConfPath;
}
public void setJobConfPath(String jobConfPath) {
this.jobConfPath = jobConfPath;
}
public String getJobHistPath() {
return jobHistPath;
}
public void setJobHistPath(String jobHistPath) {
this.jobHistPath = jobHistPath;
}
}
}
引申
Hadoop集群Rest Api汇总
MR Application Master
MR History Server
Introduction
Resource Manager
Node Manager
Timeline Server
Timeline Service V2