读取HDFS中的数据,并简单分析,最后结果写入mysql数据库中。
首先建立工程,pom文件中引入以下几个依赖
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.13</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
首先需要引入spark的包,这里使用的是spark1.4的包。由于需要读取HDFS中的数据,所以需要hadoop-client文件,这个Hadoop的环境是2.6.0的环境。最后需要把结果写入mysql中,需要mysql的驱动文件,所以需要加入mysql的依赖,最后加入单元测试的包。
创建文件ReadHDFSErrorToMysql.java
- 在main函数中首先创建JavaSparkcontext对象。
SparkConf conf = new SparkConf().setAppName("FindError");
JavaSparkContext sc = new JavaSparkContext(conf);
- 找到指定目录下的所有文件路径,因为只有找到这个路径才能加载相应的文件。
/**
*
* 列出指定目录中的文件,这里的文件是不包括子目录的。
* @param pathOfDirectory
* 目录路径
* @return
* @throws IOException
*/
public static String[] findFilePathFromDir(String dst) throws IOException {
Set<String> filePathSet = new HashSet<String>();
String[] result = null;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
FileStatus fileList[] = fs.listStatus(new Path(dst));
int size = fileList.length;
for (int i = 0; i < size; i++) {
filePathSet.add(fileList[i].getPath().toString());
}
if (filePathSet.size() > 0) {
result = new String[filePathSet.size()];
int i = 0;
for (String str : filePathSet) {
result[i++] = str;
}
}
fs.close();
return result;
}
- 依次遍历文件路径并为每个文件创建一个新的RDD然后计算出这个文件中包涵ERROR字符串的行数。
Map<String, Long> result = new HashMap<String, Long>();
if (filePaths != null) {
for (String path : filePaths) {
result.put(path, sc.textFile(path).filter(new Function<String, Boolean>() {
public Boolean call(String line) throws Exception {
return line.contains("ERROR");
}
}).count());
}
}
- 将results中的数据写入mysql中
/**
* 将结果写入mysql中
* @param result
* @throws Exception
*/
public static void wirteResultToMysql(Map<String, Long> result) throws Exception {
String DBDRIVER = "com.mysql.jdbc.Driver";
//连接地址是由各个数据库生产商单独提供的,所以需要单独记住
String DBURL = "jdbc:mysql://ip:3306/test";
//连接数据库的用户名
String DBUSER = "root";
//连接数据库的密码
String DBPASS = "root";
Connection con = null; //表示数据库的连接对象
PreparedStatement pstmt = null; //表示数据库更新操作
String sql = "insert into aaa values(?,?)";
Class.forName(DBDRIVER); //1、使用CLASS 类加载驱动程序
con = DriverManager.getConnection(DBURL,DBUSER,DBPASS); //2、连接数据库
pstmt = con.prepareStatement(sql); //使用预处理的方式创建对象
if (result != null) {
for (String str : result.keySet()) {
pstmt.setString(1, str);
pstmt.setLong(2, result.get(str));
pstmt.addBatch();
}
}
//pstmt.executeUpdate(); //执行SQL 语句,更新数据库
pstmt.executeBatch();
pstmt.close();
con.close(); // 4、关闭数据库
}
总结一下:
虽然整个过程比较简单,但通过实际的编码可以更加熟悉spark的api和功能。