上一章节讲解了如何使用java以及hadoop进行二次排序,这一章节分别尝试java、scala与spark结合实现二次排序。
spark启动
为了启动spark方便一点这里写了简单的脚本文本
[root@master Templates]# cat stop_spark_yarn.sh
# stop hadoop yarn spark
$SPARK_HOME/sbin/stop-all.sh
$HADOOP_HOME/sbin/stop-all.sh
[root@master Templates]# cat start_spark_yarn.sh
# start hadoop yarn spark
$HADOOP_HOME/sbin/start-all.sh
$SPARK_HOME/sbin/start-all.sh
[root@master Templates]# chmod 777 ../Templates/*
启动后
maven依赖
spark使用scala语言的,这里为了能让java能在spark中跑起来,需要添加些maven依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>kean.learn</groupId>
<artifactId>hadoop_spark</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<!--<plugin>-->
<!--<groupId>org.scala-tools</groupId>-->
<!--<artifactId>maven-scala-plugin</artifactId>-->
<!--<version>2.15.2</version>-->
<!--<executions>-->
<!--<execution>-->
<!--<goals>-->
<!--<goal>compile</goal>-->
<!--<goal>testCompile</goal>-->
<!--</goals>-->
<!--</execution>-->
<!--</executions>-->
<!--</plugin>-->
</plugins>
</build>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-math3 -->
<!--<dependency>-->
<!--<groupId>org.apache.commons</groupId>-->
<!--<artifactId>commons-math3</artifactId>-->
<!--<version>3.6.1</version>-->
<!--</dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-math -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math</artifactId>
<version>2.2</version>
</dependency>
</dependencies>
<repositories>
<!-- 代码库 -->
<repository>
<id>maven-ali</id>
<url>http://maven.aliyun.com/nexus/content/groups/public//</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
</repositories>
</project>
运行代码直接使用大数据处理技巧书中的代码如下:
package org.dataalgorithms.chap01.sparkwithlambda;
// STEP-0: import required Java/Spark classes.
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
//
import scala.Tuple2;
//
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
//
import org.dataalgorithms.util.SparkUtil;
import org.dataalgorithms.util.DataStructures;
/**
* SecondarySortUsingCombineByKey class implements the secondary sort design pattern
* by using combineByKey().
* <p>
* <p>
* Input:
* <p>
* name, time, value
* x,2,9
* y,2,5
* x,1,3
* y,1,7
* y,3,1
* x,3,6
* z,1,4
* z,2,8
* z,3,7
* z,4,0
* p,1,10
* p,3,60
* p,4,40
* p,6,20
* <p>
* Output: generate a time-series looking like this:
* <p>
* t1 t2 t3 t4 t5 t6
* x => [3, 9, 6]
* y => [7, 5, 1]
* z => [4, 8, 7, 0]
* p => [10, null, 60, 40, null , 20]
* <p>
* x => [(1,3), (2,9), (3,6)] where 1 < 2 < 3
* y => [(1,7), (2,5), (3,1)] where 1 < 2 < 3
* z => [(1,4), (2,8), (3,7), (4,0)] where 1 < 2 < 3 < 4
* p => [(1,10), (3,60), (4,40), (6,20)] where 1 < 3 < 4 < 6
*
* @author Mahmoud Parsian
*/
public class SecondarySortUsingCombineByKey {
public static void main(String[] args) throws Exception {
// STEP-1: read input parameters and validate them
if (args.length < 2) {
System.err.println("Usage: SecondarySortUsingCombineByKey <input> <output>");
System.exit(1);
}
String inputPath = args[0];
System.out.println("inputPath=" + inputPath);
String outputPath = args[1];
System.out.println("outputPath=" + outputPath);
// STEP-2: Connect to the Sark master by creating JavaSparkContext object
final JavaSparkContext ctx = SparkUtil.createJavaSparkContext();
// STEP-3: Use ctx to create JavaRDD<String>
// input record format: <name><,><time><,><value>
JavaRDD<String> lines = ctx.textFile(inputPath, 1);
// STEP-4: create (key, value) pairs from JavaRDD<String> where
// key is the {name} and value is a pair of (time, value).
// The resulting RDD will be JavaPairRDD<String, Tuple2<Integer, Integer>>.
// convert each record into Tuple2(name, time, value)
// PairFunction<T, K, V> T => Tuple2(K, V) where K=String and V=Tuple2<Integer, Integer>
System.out.println("=== DEBUG STEP-4 ===");
// 获取读取行数据利用Tuple2进行保存
JavaPairRDD<String, Tuple2<Integer, Integer>> pairs = lines.mapToPair((String s) -> {
String[] tokens = s.split(","); // x,2,5
// System.out.println(tokens[0] + "," + tokens[1] + "," + tokens[2]);
Tuple2<Integer, Integer> timevalue = new Tuple2<>(Integer.parseInt(tokens[1]), Integer.parseInt(tokens[2]));
return new Tuple2<>(tokens[0], timevalue);
});
// STEP-5: validate STEP-4, we collect all values from JavaPairRDD<> and print it.
// List<Tuple2<String, Tuple2<Integer, Integer>>> output = pairs.collect();
// for (Tuple2 t : output) {
// Tuple2<Integer, Integer> timevalue = (Tuple2<Integer, Integer>) t._2;
// System.out.println(t._1 + "," + timevalue._1 + "," + timevalue._2);
// }
// How to use combineByKey(): to use combineByKey(), you
// need to define 3 basic functions f1, f2, f3:
// and then you invoke it as: combineByKey(f1, f2, f3)
// function 1: create a combiner data structure
// function 2: merge a value into a combined data structure
// function 3: merge two combiner data structures
// function 1: create a combiner data structure
// Here, the combiner data structure is a SortedMap<Integer,Integer>,
// which keeps track of (time, value) for a given key
// Tuple2<Integer, Integer> = Tuple2<time, value>
// SortedMap<Integer, Integer> = SortedMap<time, value> 默认按键值升序排列
Function<Tuple2<Integer, Integer>, SortedMap<Integer, Integer>> createCombiner = (Tuple2<Integer, Integer> x)
-> {
Integer time = x._1;
Integer value = x._2;
SortedMap<Integer, Integer> map = new TreeMap<>();
map.put(time, value);
return map;
};
// function 2: merge a value into a combined data structure
Function2<SortedMap<Integer, Integer>, Tuple2<Integer, Integer>, SortedMap<Integer, Integer>> mergeValue
= (SortedMap<Integer, Integer> map, Tuple2<Integer, Integer> x) -> {
Integer time = x._1;
Integer value = x._2;
map.put(time, value);
return map;
};
// function 3: merge two combiner data structures
Function2<SortedMap<Integer, Integer>, SortedMap<Integer, Integer>, SortedMap<Integer, Integer>> mergeCombiners
= (SortedMap<Integer, Integer> map1, SortedMap<Integer, Integer> map2) -> {
if (map1.size() < map2.size()) {
return DataStructures.merge(map1, map2);
} else {
return DataStructures.merge(map1, map2);
}
};
// STEP-5: create sorted (time, value)
JavaPairRDD<String, SortedMap<Integer, Integer>> combined = pairs.combineByKey(
createCombiner,
mergeValue,
mergeCombiners);
// STEP-7: validate STEP-6, we collect all values from JavaPairRDD<> and print it.
// System.out.println("=== DEBUG STEP-6 ===");
// List<Tuple2<String, SortedMap<Integer, Integer>>> output2 = combined.collect();
// for (Tuple2<String, SortedMap<Integer, Integer>> t : output2) {
// String name = t._1;
// SortedMap<Integer, Integer> map = t._2;
// System.out.println(name);
// System.out.println(map);
// }
// persist output
combined.saveAsTextFile(outputPath);
// done!
ctx.close();
// exit
System.exit(0);
}
}
spark集群提交任务
[root@master bin]# $HADOOP_HOME/bin/hadoop fs -cat /data_algorithms/chapter1/input2/timeseries.txt
x,2,9
y,2,5
x,1,3
y,1,7
y,3,1
x,3,6
z,1,4
z,2,8
z,3,7
z,4,0
p,1,10
p,3,60
p,4,40
p,6,20
[root@master bin]# ./spark-submit --master local[2] --class org.dataalgorithms.chap01.sparkwithlambda.SecondarySortUsingCombineByKey /root/Data/data_algorithms/chapter1/hadoop_spark-1.0-SNAPSHOT.jar /data_algorithms/chapter1/input2 /data_algorithms/chapter1/output2
[root@master bin]# $HADOOP_HOME/bin/hadoop fs -cat /data_algorithms/chapter1/out*/p*
( p,{1=10, 3=60, 4=40, 6=20})
( x,{1=3, 2=9, 3=6})
( y,{1=7, 2=5, 3=1})
( z,{1=4, 2=8, 3=7, 4=0})
代码很好理解,如果懂一点scala的tuple集合就更好理解,用java写spark感觉怪怪的,有点像混合编程,以前看过一点scala知识,索性再次结合spark学习下scala,这次希望不是从入门到放弃。
scala
build.sbt
name := "hadoop_spark_scala"
version := "0.1"
scalaVersion := "2.11.8"
//resolvers ++= Seq( //额外仓库添加
// "Admonitor Repository" at "http://maven.mzsvn.com/repository/admonitor",
// "Local Maven Repository" at "local-maven:file://D:/java_workspace/repository"
//)
libraryDependencies ++= Seq( //依赖库
"org.apache.spark" % "spark-core_2.10" % "1.6.0",
"org.apache.hadoop" % "hadoop-common" % "2.7.3",
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % "2.7.3"
)
package org.dataalgorithms.chap01.scala
import org.apache.spark.Partitioner
/**
* A custom partitioner
*
* org.apache.spark.Partitioner:
* An abstract class that defines how the elements in a
* key-value pair RDD are partitioned by key. Maps each
* key to a partition ID, from 0 to numPartitions - 1.
*/
class CustomPartitioner(partitions: Int) extends Partitioner {
require(partitions > 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case (k: String, v: Int) => math.abs(k.hashCode % numPartitions)
case null => 0
case _ => math.abs(key.hashCode % numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: CustomPartitioner => h.numPartitions == numPartitions
case _ => false
}
override def hashCode: Int = numPartitions
}
package org.dataalgorithms.chap01.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* Spark/Scala solution to secondary sort
*
* @author Gaurav Bhardwaj (gauravbhardwajemail@gmail.com)
* @editor Mahmoud Parsian (mahmoud.parsian@yahoo.com)
*
*/
object SecondarySort {
def main(args: Array[String]): Unit = {
//
if (args.length != 3) {
println("Usage <number-of-partitions> <input-path> <output-path>")
sys.exit(1)
}
val partitions = args(0).toInt
val inputPath = args(1)
val outputPath = args(2)
val config = new SparkConf
config.setAppName("SecondarySort")
val sc = new SparkContext(config)
val input = sc.textFile(inputPath)
//------------------------------------------------
// each input line/record has the following format:
// <id><,><time><,><value>
//-------------------------------------------------
val valueToKey = input.map(x => {
val line = x.split(",")
((line(0) + "-" + line(1), line(2).toInt), line(2).toInt)
})
implicit def tupleOrderingDesc = new Ordering[Tuple2[String, Int]] {
override def compare(x: Tuple2[String, Int], y: Tuple2[String, Int]): Int = {
if (y._1.compare(x._1) == 0) y._2.compare(x._2)
else y._1.compare(x._1)
}
}
val sorted = valueToKey.repartitionAndSortWithinPartitions(new CustomPartitioner(partitions))
val result = sorted.map {
case (k, v) => (k._1, v)
}
result.saveAsTextFile(outputPath)
// done
sc.stop()
}
}
打包成jar提交集群
[root@master bin]# ./spark-submit --master local[2] --class org.dataalgorithms.chap01.scala.SecondarySort /root/Data/data_algorithms/chapter1/hadoop_spark_scala.jar 3 /data_algorithms/chapter1/input2 /data_algorithms/chapter1/output2
19/04/18 00:07:42 INFO spark.SparkContext: Running Spark version 2.2.1
19/04/18 00:07:43 INFO spark.SparkContext: Submitted application: SecondarySort
19/04/18 00:07:43 INFO spark.SecurityManager: Changing view acls to: root
19/04/18 00:07:43 INFO spark.SecurityManager: Changing modify acls to: root
19/04/18 00:07:43 INFO spark.SecurityManager: Changing view acls groups to:
19/04/18 00:07:43 INFO spark.SecurityManager: Changing modify acls groups to:
19/04/18 00:07:43 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
19/04/18 00:07:43 INFO util.Utils: Successfully started service 'sparkDriver' on port 39070.
19/04/18 00:07:44 INFO spark.SparkEnv: Registering MapOutputTracker
19/04/18 00:07:44 INFO spark.SparkEnv: Registering BlockManagerMaster
19/04/18 00:07:44 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/04/18 00:07:44 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/04/18 00:07:44 INFO storage.DiskBlockManager: Created local directory at /opt/spark-2.2.1-bin-hadoop2.7/blockmgr-72b30be5-8b8a-4589-90da-20596018bfff
19/04/18 00:07:44 INFO memory.MemoryStore: MemoryStore started with capacity 93.3 MB
19/04/18 00:07:44 INFO spark.SparkEnv: Registering OutputCommitCoordinator
19/04/18 00:07:44 INFO util.log: Logging initialized @2833ms
19/04/18 00:07:44 INFO server.Server: jetty-9.3.z-SNAPSHOT
19/04/18 00:07:44 INFO server.Server: Started @2926ms
19/04/18 00:07:44 INFO server.AbstractConnector: Started ServerConnector@b93aad{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
19/04/18 00:07:44 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@120f38e6{/jobs,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@fac80{/jobs/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@649f2009{/jobs/job,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@797501a{/jobs/job/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@57f791c6{/stages,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6c4f9535{/stages/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@30c31dd7{/stages/stage,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@241a53ef{/stages/stage/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2db2cd5{/stages/pool,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@615f972{/stages/pool/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@73393584{/storage,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1827a871{/storage/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7249dadf{/storage/rdd,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@66238be2{/storage/rdd/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@200606de{/environment,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@f8908f6{/environment/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2ef8a8c3{/executors,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@63fd4873{/executors/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7544a1e4{/executors/threadDump,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7957dc72{/executors/threadDump/json,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3aacf32a{/static,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3d4d3fe7{/,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@51684e4a{/api,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6c451c9c{/jobs/job/kill,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@372b0d86{/stages/stage/kill,null,AVAILABLE,@Spark}
19/04/18 00:07:44 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.16.21.220:4040
19/04/18 00:07:44 INFO spark.SparkContext: Added JAR file:/root/Data/data_algorithms/chapter1/hadoop_spark_scala.jar at spark://172.16.21.220:39070/jars/hadoop_spark_scala.jar with timestamp 1555517264889
19/04/18 00:07:45 INFO executor.Executor: Starting executor ID driver on host localhost
19/04/18 00:07:45 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43710.
19/04/18 00:07:45 INFO netty.NettyBlockTransferService: Server created on 172.16.21.220:43710
19/04/18 00:07:45 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/04/18 00:07:45 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.16.21.220, 43710, None)
19/04/18 00:07:45 INFO storage.BlockManagerMasterEndpoint: Registering block manager 172.16.21.220:43710 with 93.3 MB RAM, BlockManagerId(driver, 172.16.21.220, 43710, None)
19/04/18 00:07:45 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.16.21.220, 43710, None)
19/04/18 00:07:45 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.16.21.220, 43710, None)
19/04/18 00:07:45 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@4d8539de{/metrics/json,null,AVAILABLE,@Spark}
19/04/18 00:07:46 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 240.0 KB, free 93.1 MB)
19/04/18 00:07:46 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.2 KB, free 93.0 MB)
19/04/18 00:07:46 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.16.21.220:43710 (size: 23.2 KB, free: 93.3 MB)
19/04/18 00:07:46 INFO spark.SparkContext: Created broadcast 0 from textFile at SecondarySort.scala:30
19/04/18 00:07:47 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/18 00:07:47 INFO spark.SparkContext: Starting job: saveAsTextFile at SecondarySort.scala:54
19/04/18 00:07:47 INFO mapred.FileInputFormat: Total input paths to process : 1
19/04/18 00:07:47 INFO scheduler.DAGScheduler: Registering RDD 2 (map at SecondarySort.scala:36)
19/04/18 00:07:47 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at SecondarySort.scala:54) with 3 output partitions
19/04/18 00:07:47 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (saveAsTextFile at SecondarySort.scala:54)
19/04/18 00:07:47 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
19/04/18 00:07:47 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 0)
19/04/18 00:07:47 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at map at SecondarySort.scala:36), which has no missing parents
19/04/18 00:07:47 INFO memory.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 93.0 MB)
19/04/18 00:07:47 INFO memory.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.4 KB, free 93.0 MB)
19/04/18 00:07:47 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.16.21.220:43710 (size: 2.4 KB, free: 93.3 MB)
19/04/18 00:07:47 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
19/04/18 00:07:47 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at map at SecondarySort.scala:36) (first 15 tasks are for partitions Vector(0, 1))
19/04/18 00:07:47 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
19/04/18 00:07:47 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 4873 bytes)
19/04/18 00:07:47 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, ANY, 4873 bytes)
19/04/18 00:07:47 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
19/04/18 00:07:47 INFO executor.Executor: Fetching spark://172.16.21.220:39070/jars/hadoop_spark_scala.jar with timestamp 1555517264889
19/04/18 00:07:47 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
19/04/18 00:07:47 INFO client.TransportClientFactory: Successfully created connection to /172.16.21.220:39070 after 29 ms (0 ms spent in bootstraps)
19/04/18 00:07:47 INFO util.Utils: Fetching spark://172.16.21.220:39070/jars/hadoop_spark_scala.jar to /opt/spark-2.2.1-bin-hadoop2.7/spark-e259f203-6f2d-4445-80f3-24d6b187f600/userFiles-d948e5f9-0504-4421-aa88-19f186e67f39/fetchFileTemp2679433532910837467.tmp
19/04/18 00:07:48 INFO executor.Executor: Adding file:/opt/spark-2.2.1-bin-hadoop2.7/spark-e259f203-6f2d-4445-80f3-24d6b187f600/userFiles-d948e5f9-0504-4421-aa88-19f186e67f39/hadoop_spark_scala.jar to class loader
19/04/18 00:07:48 INFO rdd.HadoopRDD: Input split: hdfs://master:9000/data_algorithms/chapter1/input2/timeseries.txt:0+51
19/04/18 00:07:48 INFO rdd.HadoopRDD: Input split: hdfs://master:9000/data_algorithms/chapter1/input2/timeseries.txt:51+51
19/04/18 00:07:48 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 1070 bytes result sent to driver
19/04/18 00:07:48 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 1070 bytes result sent to driver
19/04/18 00:07:48 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 802 ms on localhost (executor driver) (1/2)
19/04/18 00:07:48 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 801 ms on localhost (executor driver) (2/2)
19/04/18 00:07:48 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (map at SecondarySort.scala:36) finished in 0.836 s
19/04/18 00:07:48 INFO scheduler.DAGScheduler: looking for newly runnable stages
19/04/18 00:07:48 INFO scheduler.DAGScheduler: running: Set()
19/04/18 00:07:48 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 1)
19/04/18 00:07:48 INFO scheduler.DAGScheduler: failed: Set()
19/04/18 00:07:48 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SecondarySort.scala:54), which has no missing parents
19/04/18 00:07:48 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/04/18 00:07:48 INFO memory.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 71.9 KB, free 93.0 MB)
19/04/18 00:07:48 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 26.0 KB, free 92.9 MB)
19/04/18 00:07:48 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.16.21.220:43710 (size: 26.0 KB, free: 93.2 MB)
19/04/18 00:07:48 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
19/04/18 00:07:48 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SecondarySort.scala:54) (first 15 tasks are for partitions Vector(0, 1, 2))
19/04/18 00:07:48 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 3 tasks
19/04/18 00:07:48 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, executor driver, partition 0, ANY, 4621 bytes)
19/04/18 00:07:48 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, ANY, 4621 bytes)
19/04/18 00:07:48 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 2)
19/04/18 00:07:48 INFO executor.Executor: Running task 1.0 in stage 1.0 (TID 3)
19/04/18 00:07:48 INFO storage.ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
19/04/18 00:07:48 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 5 ms
19/04/18 00:07:48 INFO storage.ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
19/04/18 00:07:48 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 12 ms
19/04/18 00:07:48 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/18 00:07:48 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/18 00:07:49 INFO output.FileOutputCommitter: Saved output of task 'attempt_20190418000747_0001_m_000001_3' to hdfs://master:9000/data_algorithms/chapter1/output2/_temporary/0/task_20190418000747_0001_m_000001
19/04/18 00:07:49 INFO mapred.SparkHadoopMapRedUtil: attempt_20190418000747_0001_m_000001_3: Committed
19/04/18 00:07:49 INFO executor.Executor: Finished task 1.0 in stage 1.0 (TID 3). 1224 bytes result sent to driver
19/04/18 00:07:49 INFO output.FileOutputCommitter: Saved output of task 'attempt_20190418000747_0001_m_000000_2' to hdfs://master:9000/data_algorithms/chapter1/output2/_temporary/0/task_20190418000747_0001_m_000000
19/04/18 00:07:49 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, localhost, executor driver, partition 2, ANY, 4621 bytes)
19/04/18 00:07:49 INFO mapred.SparkHadoopMapRedUtil: attempt_20190418000747_0001_m_000000_2: Committed
19/04/18 00:07:49 INFO executor.Executor: Running task 2.0 in stage 1.0 (TID 4)
19/04/18 00:07:49 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 2). 1181 bytes result sent to driver
19/04/18 00:07:49 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 849 ms on localhost (executor driver) (1/3)
19/04/18 00:07:49 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 855 ms on localhost (executor driver) (2/3)
19/04/18 00:07:49 INFO storage.ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
19/04/18 00:07:49 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
19/04/18 00:07:49 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/18 00:07:49 INFO output.FileOutputCommitter: Saved output of task 'attempt_20190418000747_0001_m_000002_4' to hdfs://master:9000/data_algorithms/chapter1/output2/_temporary/0/task_20190418000747_0001_m_000002
19/04/18 00:07:49 INFO mapred.SparkHadoopMapRedUtil: attempt_20190418000747_0001_m_000002_4: Committed
19/04/18 00:07:49 INFO executor.Executor: Finished task 2.0 in stage 1.0 (TID 4). 1181 bytes result sent to driver
19/04/18 00:07:49 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 4) in 514 ms on localhost (executor driver) (3/3)
19/04/18 00:07:49 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
19/04/18 00:07:49 INFO scheduler.DAGScheduler: ResultStage 1 (saveAsTextFile at SecondarySort.scala:54) finished in 1.359 s
19/04/18 00:07:49 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at SecondarySort.scala:54, took 2.695980 s
19/04/18 00:07:50 INFO server.AbstractConnector: Stopped Spark@b93aad{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
19/04/18 00:07:50 INFO ui.SparkUI: Stopped Spark web UI at http://172.16.21.220:4040
19/04/18 00:07:50 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/04/18 00:07:50 INFO memory.MemoryStore: MemoryStore cleared
19/04/18 00:07:50 INFO storage.BlockManager: BlockManager stopped
19/04/18 00:07:50 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
19/04/18 00:07:50 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/04/18 00:07:50 INFO spark.SparkContext: Successfully stopped SparkContext
19/04/18 00:07:50 INFO util.ShutdownHookManager: Shutdown hook called
19/04/18 00:07:50 INFO util.ShutdownHookManager: Deleting directory /opt/spark-2.2.1-bin-hadoop2.7/spark-e259f203-6f2d-4445-80f3-24d6b187f600
[root@master bin]# $HADOOP_HOME/bin/hadoop fs -cat /data_algorithms/chapter1/out*/p*
( z-2,8)
( y-3,1)
( x-1,3)
( p-6,20)
( p-3,60)
( z-3,7)
( y-1,7)
( x-2,9)
( p-4,40)
( p-1,10)
( z-4,0)
( z-1,4)
( y-2,5)
( x-3,6)