What is a task in Spark? How does the Spark worker execute the jar file?
To get a clear insight on how tasks are created and scheduled, we must understand how execution model works in Spark. Shortly speaking, an application in spark is executed in three steps :
Create RDD graph
Create execution plan according to the RDD graph. Stages are created in this step
Generate tasks based on the plan and get them scheduled across workers
In your word-count example, the RDD graph is rather simple, it's something as follows :
file -> lines -> words -> per-word count -> global word count -> output
Based on this graph, two stages are created. The stage creation rule is based on the idea to pipeline as many narrow transformations as possible. In your example, the narrow transformation finishes at per-word count. Therefore, you get two stages
file -> lines -> words -> per-word count
global word count -> output
Once stages are figured out, spark will generate tasks from stages. The first stage will create ShuffleMapTasks and the last stage will create ResultTasks because in the last stage, one action operation is included to produce results.
The number of tasks to be generated depends on how your files are distributed. Suppose that you have 3 three different files in three different nodes, the first stage will generate 3 tasks : one task per partition.
Therefore, you should not map your steps to tasks directly. A task belongs to a stage, and is related to a partition.
Usually, the number of tasks ran for a stage is exactly the number of partitions of the final RDD, but since RDDs can be shared (and hence ShuffleMapStages
) their number varies depending on the RDD/stage sharing. Please refer to Where to learn how DAG works under the covers in RDD?
How are stages split into tasks in Spark?
Let's assume for the following that only one Spark job is running at every point in time.
What I get so far
Here is what I understand what happens in Spark:
When a SparkContext
is created, each worker node starts an executor. Executors are separate processes (JVM), that connects back to the driver program. Each executor has the jar of the driver program. Quitting a driver, shuts down the executors. Each executor can hold some partitions.
When a job is executed, an execution plan is created according to the lineage graph.
The execution job is split into stages, where stages containing as many neighbouring (in the lineage graph) transformations and action, but no shuffles. Thus stages are separated by shuffles.
I understand that
A task is a command sent from the driver to an executor by serializing the Function object.
The executor deserializes (with the driver jar) the command (task) and executes it on a partition.
but
Question(s)
How do I split the stage into those tasks?
Specifically:
Are the tasks determined by the transformations and actions or can be multiple transformations/actions be in a task?
Are the tasks determined by the partition (e.g. one task per per stage per partition).
Are the tasks determined by the nodes (e.g. one task per stage per node)?
What I think (only partial answer, even if right)
In https://0x0fff.com/spark-architecture-shuffle, the shuffle is explained with the image
and I get the impression that the rule is
each stage is split into #number-of-partitions tasks, with no regard for the number of nodes
For my first image I'd say that I'd have 3 map tasks and 3 reduce tasks.
For the image from 0x0fff, I'd say there are 8 map tasks and 3 reduce tasks (assuming that there are only three orange and three dark green files).
Open questions in any case
Is that correct? But even if that is correct, my questions above are not all answered, because it is still open, whether multiple operations (e.g. multiple maps) are within one task or are separated into one tasks per operation.
What others say
What is a task in Spark? How does the Spark worker execute the jar file? and How does the Apache Spark scheduler split files into tasks? are similar, but I did not feel that my question was answered clearly there.
You have a pretty nice outline here. To answer your questions
A separate task
does need to be launched for each partition of data for each stage
. Consider that each partition will likely reside on distinct physical locations - e.g. blocks in HDFS or directories/volumes for a local file system.
Note that the submission of Stage
s is driven by the DAG Scheduler
. This means that stages that are not interdependent may be submitted to the cluster for execution in parallel: this maximizes the parallelization capability on the cluster. So if operations in our dataflow can happen simultaneously we will expect to see multiple stages launched.
We can see that in action in the following toy example in which we do the following types of operations:
load two datasources
perform some map operation on both of the data sources separately
join them
perform some map and filter operations on the result
save the result
So then how many stages will we end up with?
1 stage each for loading the two datasources in parallel = 2 stages
A third stage representing the join
that is dependent on the other two stages
Note: all of the follow-on operations working on the joined data may be performed in the same stage because they must happen sequentially. There is no benefit to launching additional stages because they can not start work until the prior operation were completed.
Here is that toy program
val sfi = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}val spj = sfi.join(sp)val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}val sf = sm.filter{ case (k,v) => v % 10 == 0 }sf.saveAsTextFile("/data/blah/out")
And here is the DAG of the result
Now: how many tasks ? The number of tasks should be equal to
Sum of (Stage
-
Partitions in the stage
)
In my case the #Partitions in the stage
equals number of processors
on my cluster machines.
Where to learn how DAG works under the covers in RDD?
Even i have been looking in the web to learn about how spark computes the DAG from the RDD and subsequently executes the task.
At high level, when any action is called on the RDD, Spark creates the DAG and submits it to the DAG scheduler.
The DAG scheduler divides operators into stages of tasks. A stage is comprised of tasks based on partitions of the input data. The DAG scheduler pipelines operators together. For e.g. Many map operators can be scheduled in a single stage. The final result of a DAG scheduler is a set of stages.
The Stages are passed on to the Task Scheduler.The task scheduler launches tasks via cluster manager (Spark Standalone/Yarn/Mesos). The task scheduler doesn't know about dependencies of the stages.
The Worker executes the tasks on the Slave.
Let's come to how Spark builds the DAG.
At high level, there are two transformations that can be applied onto the RDDs, namely narrow transformation and wide transformation. Wide transformations basically result in stage boundaries.
Narrow transformation - doesn't require the data to be shuffled across the partitions. for example, Map, filter etc..
wide transformation - requires the data to be shuffled for example, reduceByKey etc..
Let's take an example of counting how many log messages appear at each level of severity,
Following is the log file that starts with the severity level,
INFO I'm Info messageWARN I'm a Warn messageINFO I'm another Info message
and create the following scala code to extract the same,
val input = sc.textFile("log.txt")val splitedLines = input.map(line => line.split(" ")) .map(words => (words(0), 1)) .reduceByKey{(a,b) => a + b}
This sequence of commands implicitly defines a DAG of RDD objects (RDD lineage) that will be used later when an action is called. Each RDD maintains a pointer to one or more parents along with the metadata about what type of relationship it has with the parent. For example, when we call val b = a.map()
on a RDD, the RDD b
keeps a reference to its parent a
, that's a lineage.
To display the lineage of an RDD, Spark provides a debug method toDebugString()
. For example executing toDebugString()
on the splitedLines
RDD, will output the following:
(2) ShuffledRDD[6] at reduceByKey at <console>:25 [] +-(2) MapPartitionsRDD[5] at map at <console>:24 [] | MapPartitionsRDD[4] at map at <console>:23 [] | log.txt MapPartitionsRDD[1] at textFile at <console>:21 [] | log.txt HadoopRDD[0] at textFile at <console>:21 []
The first line (from the bottom) shows the input RDD. We created this RDD by calling sc.textFile()
. Below is the more diagrammatic view of the DAG graph created from the given RDD.
Once the DAG is build, the Spark scheduler creates a physical execution plan. As mentioned above, the DAG scheduler splits the graph into multiple stages, the stages are created based on the transformations. The narrow transformations will be grouped (pipe-lined) together into a single stage. So for our example, Spark will create two stage execution as follows:
The DAG scheduler will then submit the stages into the task scheduler. The number of tasks submitted depends on the number of partitions present in the textFile. Fox example consider we have 4 partitions in this example, then there will be 4 set of tasks created and submitted in parallel provided there are enough slaves/cores. Below diagram illustrates this in more detail:
For more detailed information i suggest you to go through the following youtube videos where the Spark creators give in depth details about the DAG and execution plan and lifetime.
Advanced Apache Spark- Sameer Farooqui (Databricks)
A Deeper Understanding of Spark Internals - Aaron Davidson (Databricks)
Introduction to AmpLab Spark Internals
Other useful discussion:
http://stackoverflow.com/questions/29955133/how-to-allocate-more-executors-per-worker-in-standalone-cluster-mode