大数据spark,让你更快进入角色

在我们介绍Executor执行Task之前,先看一个重要的类,它就是CoarseGrainedExecutorBackend类

它创建这个进程的时候会调用onStart方法

它是ExecutorBackend粗粒度进程,

它负责向Driver发送Executor的注册请求

它是一个通信的进程,它可以与Driver相互通信

它是Executor所在的一个进程名称,Executor才是处理Task真正的对象,Executor处理Task都是由线程池来进行Task的处理的。

它负责接受Driver返回回来的Executor注册信息,然后创建Executor上下文。

它负责接受TaskSchedule发送过来的LaunchTask消息,开始Task的启动与计算

Executor执行Task的原理分析

当CoarseGrainedExecutorBackend接收到Driver发送过来的RegisteredExecutor消息的时候就会创建Executor

然后当再次接受Driver发送过来的LaunchTask消息后就会开始执行Task,首先它会对发送来的TaskTaskDescription进行反序列化,然后调用launchTask方法交由Executor去执行Task。

在launchTask方法中,创建了TaskRunner,然后TaskRunner继承了Runnable接口,然后将这个TaskRunner加入到线程池和缓存中,然后线程池调用executor方法开始Task的执行。

有需要 的联系我2317384986          yxxy1717

Executor执行Task的原码分析

CoarseGrainedExecutorBackend的onStart方法:该方法在创建CoarseGrainedExecutorBackend类的时候被执行,它会向Driver注册Executor

override def onStart() {

logInfo("Connecting to driver: " + driverUrl)

rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>

// This is a very fast action so we can use "ThreadUtils.sameThread"

driver = Some(ref)

//向Driver发送Executor的注册请求

ref.ask[RegisterExecutorResponse](

RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))

}(ThreadUtils.sameThread).onComplete {

// This is a very fast action so we can use "ThreadUtils.sameThread"

case Success(msg) => Utils.tryLogNonFatalError {

Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse

}

case Failure(e) => {

logError(s"Cannot register with driver: $driverUrl", e)

System.exit(1)

}

}(ThreadUtils.sameThread)

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

CoarseGrainedExecutorBackend的receive方法:该方法作用就是接受各种消息用的。

override def receive: PartialFunction[Any, Unit] = {

//Driver返回Executor注册成功的消息,然后就会创建Executor对象。

case RegisteredExecutor(hostname) =>

logInfo("Successfully registered with driver")

executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

//Driver返回Executor注册失败的消息,然后程序结束执行。

case RegisterExecutorFailed(message) =>

logError("Slave registration failed: " + message)

System.exit(1)

//接受Driver发送过来的LaunchTask消息,这个消息作用就是要求Executor开始执行Task任务

case LaunchTask(data) =>

if (executor == null) {

logError("Received LaunchTask command but executor was null")

System.exit(1)

} else {

//首先会对传过来的TaskDescription进行反序列化,

val taskDesc = ser.deserialize[TaskDescription](data.value)

logInfo("Got assigned task " + taskDesc.taskId)

//调用executor的launchTask方法开始执行Task任务。

//this:ExecutorBackend,taskId:task的索引Id,attemptNumber:尝试执行的次数,

//taskDesc.name:task的名称,taskDesc.serializedTask:TaskDescription序列化后的对象

executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,

taskDesc.name, taskDesc.serializedTask)

}

case KillTask(taskId, _, interruptThread) =>

if (executor == null) {

logError("Received KillTask command but executor was null")

System.exit(1)

} else {

executor.killTask(taskId, interruptThread)

}

case StopExecutor =>

logInfo("Driver commanded a shutdown")

// Cannot shutdown here because an ack may need to be sent back to the caller. So send

// a message to self to actually do the shutdown.

self.send(Shutdown)

case Shutdown =>

executor.stop()

stop()

rpcEnv.shutdown()

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

Executor的launchTask方法:该方法的作用是为每个Task创建一个TaskRunner,然后将TaskRunner放入内存缓存中,然后再将TaskRunner放入线程池中,等待线程执行。

def launchTask(

context: ExecutorBackend,

taskId: Long,

attemptNumber: Int,

taskName: String,

serializedTask: ByteBuffer): Unit = {

//为每一个Task都创建一个对应的TaskRunner对象,TaskRunner继承了Java的Runnable接口

val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,

serializedTask)

//将TaskRunner放入内存缓存

runningTasks.put(taskId, tr)

//Executor内部有一个Java线程池,然后将Task封装到TaskRunner线程,直接放到

//线程池中去执行,如果线程池中线程不够用的,就会等待有了空闲的线程在开始执行

threadPool.execute(tr)

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

TaskRunner继承了Runable接口,执行Task的程序都放在了多线程的run方法里了,每当一个Task过来就会创建一个TaskRunner对象,并且创建一个线程线程去执行Task,然后这些TaskRunner会放到线程池中去执行。下边是run方法的源码解析

override def run(): Unit = {

//为Task分配一个内存管理器

val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)

//记录反序列化的时间

val deserializeStartTime = System.currentTimeMillis()

Thread.currentThread.setContextClassLoader(replClassLoader)

//创建一个序列化器,用来对Task数据进行反序列化

val ser = env.closureSerializer.newInstance()

logInfo(s"Running $taskName (TID $taskId)")

//向Driver发送Task当前的执行状态

execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)

var taskStart: Long = 0

startGCTime = computeTotalGcTime()

try {

//对序列化后的Task数据进行反序列化

val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)

//通过网络通信,获取Task依赖的文件、资源、jar包,比如说Hadoop的配置文件

updateDependencies(taskFiles, taskJars)

//通过反序列化将Task进行反序列化

//类加载的作用:用发射动态加载一个类,创建类的对象

task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

task.setTaskMemoryManager(taskMemoryManager)

//如果在序列化之前以及被停掉了,那么就会马上退出,否则就会继续执行Task

if (killed) {

throw new TaskKilledException

}

logDebug("Task " + taskId + "'s epoch is " + task.epoch)

env.mapOutputTracker.updateEpoch(task.epoch)

// 计算出Task开始的时间

taskStart = System.currentTimeMillis()

.........

有需要的请关注小编

2317384986

yxxy1717

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容

  • 1.1、 分配更多资源 1.1.1、分配哪些资源? Executor的数量 每个Executor所能分配的CPU数...
    miss幸运阅读 3,173评论 3 15
  • Spark Job执行流程大体如下:用户提交Job后会生成SparkContext对象,SparkContext向...
    imarch1阅读 3,647评论 0 7
  • 1、 性能调优 1.1、 分配更多资源 1.1.1、分配哪些资源? Executor的数量 每个Executor所...
    Frank_8942阅读 4,524评论 2 36
  • 友情可贵 Golden Friendship 在这个物质化倾向非常严重的时代,人们疯狂地追求成功,而衡量成功的标准...
    无心剑阅读 226评论 0 4
  • 上周计划在本周的执行情况——心态未变,但是变得松散,晚上有时候还会打打游戏。花更多的时间去总结,重新写一遍,感觉效...
    刘小粤阅读 330评论 0 0