Master 和 Worker关系图
总结
- master:通过读取配置,创建actorSystem,反射调用master,master启动后,执行生命周期方法,
preStart
和receiveWithLogging
,定时val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
清理失去心跳的Worker - worker:通过读取配置,加载worker所在服务器的cpu cores,memory大小等信息,创建actorSystem,反射调用worker,worker启动后执行生命周期方法
preStart
和receiveWithLogging
,向master注册信息,最重要的信息worker的cpu cores和memory资源大小,定时向master报心跳val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
,防止被master清理 - 所以master会保存worker各个节点的资源信息,与保持心跳,作为后续执行job资源分配,调度的基础
Spark中start-all.sh脚本
Master
1.查看master启动脚本start-master.sh
start-master.sh
脚本中可以看到master启动的时候,启动的是org.apache.spark.deploy.master.Master
类,所以要看源码,从这个类查看,在从Master
伴生对象main
方法入手
2. 源码分析
main
方法主要做了以下三件事
- 读取配置
- 创建
ActorSystem
- 通过
ActorSystem
启动Master
服务
流程1.加载配置文件 2.启动master
val args = new MasterArguments(argStrings, conf)
这句代码的功能就是加载配置文件,但是里面有可以借鉴Utils工具类的代码
关键点在val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
,主要作用,调用创建了ActorSystem
startService函数
作为Utils.startServiceOnPort(port, startService, conf, name)
的参数,
Utils.startServiceOnPort(port, startService, conf, name)
中只是计算出master启动的端口
所以关键还是要看startService
方法,该方法又调用doCreateActorSystem
所以第一个红框的作用就是读取配置,包括端口信息,创建ActorSystem
,第二个红框,通过反射启动Master
启动Master
,Master
会走Actor的生命周期方法preStart
启动,receiveWithLogging
,接收信息
preStart
方法中,启动webUi等操作,最重要的是这句代码,代码,启动一个定时器,定时发送给自己一个case objec CheckForWorkerTimeOut
,间隔是val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
Master中最最重要的方法,receiveWithLogging
,master启动后,通过该方法接收message做相应的处理,首先查看preStart中,查看定时发CheckForWorkerTimeOut
给自己的receive调用的方法,查看源码,
总结:Master启动后,定时发送CheckForWorkerTimeOut
,给自己,在receiveWithLogging
,调用timeOutDeadWorkers
,定时清理超过心跳时间的Worker,从val workers = new HashSet[WorkerInfo]
移除
Worker
1.查看worker启动脚本start-slave.sh
start-slaves.sh
启动start-slave.sh
,启动org.apache.spark.deploy.worker.Worker
类
2.源码分析
Worker启动跟Master启动几乎一模一样,
- 读取配置,获取
cpu cores
和`memeory - 创建
ActorSystem
- 反射创建
Worker
,Worker启动,调用生命周期方法
所以直接看Worker的preStart
跟receiveWithLogging
preStart
方法中,会创建工作目录WorkDir
,启动WorkWebUi
,最最重要的是,向master
注册,registerWithMaster
查看方法,调用tryRegisterAllMasters
,获取master uri 比如master:7070
,获取master的actor,然后向master发送异步无返回值message,将自己的信息封装到case class RegisterWorker
,包括自己的id,ip,port, cpu cores,内存大小信息等,所以此时需要到master的receiveWithLogging
查看接收到的RegisterWorker
做出什么样的操作
master接收到worker的信息后,将RegisterWorker 的信息封装成一个WorkerInfo(拥有worker的信息,id,ip,port, cpu cores,内存大小信息等),再将workerinfo的信息添加到persistenceEngine
持久化起来,然后向worker发送RegisteredWorker,告诉worker注册成功,接着调用调度方法schedule()
,这个方法大概是这样的,master可能拥有许多client提交的任务,当资源不足的时候,任务会排队,所以当有新的资源,就是worker加入的时候,如果此时有任务排队,又有资源加入master会调度任务分配资源,就是这个schedule()
方法。woker收到注册成功的信息RegisteredWorker
,所以此时需要去worker的receiveWithLogging
中查看
worker接收到master的信息后,启动定时器,定时
val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
向自己发送心跳SendHeartbeat
,此时需要在worker的receiveWithLogging方法中查看SendHeartbeat
,查看代码,又发送heartBeat
给mastermaster收到心跳后,判断是否存在workerId,如果存在则更新workerInfo的心跳时间,如果不存在,发送信息
ReconnectWorker
,让worker重新向注册。