原因:有同事需要连接 mongodb ,保存数据到 大数据平台。之前尝试了 hive 建立外部表的方式。但是一直不成功。报错原因不明。尝试 mongo Hadoop 中 的 spark 例子,提交到 集群中会报 任务无法序列化的错误。因此采用了 mongo spark 连接器来做一个测试程序
安装 mongodb
主机环境
CentOS Linux release 7.4.1708 (Core)
关闭防火墙
关闭selinux
192.168.14.7
1.下载安装包
https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-3.4.18-1.el7.x86_64.rpm
https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-mongos-3.4.18-1.el7.x86_64.rpm
https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-server-3.4.18-1.el7.x86_64.rpm
https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-shell-3.4.18-1.el7.x86_64.rpm
https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-tools-3.4.18-1.el7.x86_64.rpm
2.安装 httpd createrepo
yum -y install httpd
3.建立目录
mkdir /var/www/html/mongodb
4.移动rpm包到 /var/www/html/mongodb 目录
mv *rpm /var/www/html/mongodb
5.进入 mkdir /var/www/html/mongodb 目录,建立 repo 档案
cd /var/www/html/mongodb
createrepo .
6.启动 httpd 服务并设置开机启动
systemctl start httpd.service
systemctl enable httpd.service
7.建立 mongodb 的 repo 文件
vi /etc/yum.repos.d/mongodb.repo
[mongodb]
name=mongodb
baseurl=http://192.168.14.7/mongodb
enable=1
gpgcheck=0
8.建立 yum 缓存
yum makecache
9.安装 mongodb server 和 shell
yum -y install mongodb-org-server.x86_64 mongodb-org-shell.x86_64
10.修改 /etc/mongod.conf 配置文件
# bindIp: 127.0.0.1
11.修改系统配置
echo never > /sys/kernel/mm/transparent_hugepage/defrag
echo never > /sys/kernel/mm/transparent_hugepage/enabled
12.启动 mongodb,并设置开机启动
systemctl start mongod.service
systemctl enable mongod.service
13.新建 test 库
use test
14.往 test 表中插入数据
db.test.insertOne({"123" : "123-val", "a" : 34, "b" : 36})
db.test.insertOne({"456" : "456-val", "a" : 45, "b" : 67})
15.检查 test 表中的数据
db.test.find()
spark集群搭建
主机环境
CentOS Linux release 7.4.1708 (Core)
关闭防火墙
关闭 selinux
192.168.14.8
1.安装 jdk
tar -zxvf jdk-8u171-linux-x64.tar.gz
mv jdk1.8.0_171/ /opt/jdk
2.安装 spark
tar -zxvf spark-2.4.5-bin-hadoop2.7.tgz
mv spark-2.4.5-bin-hadoop2.7/ /opt/spark
3.安装 maven
tar -zxvf apache-maven-3.5.2-bin.tar.gz
mv apache-maven-3.5.2/ /opt/maven
3.修改环境变量
vi ~/.bashrc
export JAVA_HOME=/opt/jdk
export SPARK_HOME=/opt/spark
export M2_HOME=/opt/maven
export PATH=$PATH:$JAVA_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin:$M2_HOME/bin
4.使环境变量生效
source ~/.bashrc
5.重命名 spark 配置文件
cd /opt/spark/conf/
mv spark-env.sh.template spark-env.sh
mv spark-defaults.conf.template spark-defaults.conf
6.修改 spark 配置
vi spark-defaults.conf
spark.master spark://192.168.14.8:7077
vi spark-env.sh
SPARK_MASTER_HOST=192.168.14.8
7.启动 spark master
start-master.sh
8.启动 slave
start-slave.sh spark://192.168.14.8:7077
9.查看 web 页面
浏览器访问 http://192.168.14.8:8080/
10.新建 maven 项目 进入 目录
mvn archetype:generate -DgroupId=com.packt.samples -DartifactId=com.packt.samples.archetype -Dversion=1.0.0 -DinteractiveMode=false -DarchetypeCatalog=internal
cd com.packt.samples
11.编辑 pom.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>com.packt.samples.App</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
12.编辑源代码
vi src/main/java/com/packt/samples/App.java
package com.packt.samples;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoDatabase;
import com.mongodb.spark.MongoConnector;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.config.ReadConfig;
import com.mongodb.spark.config.WriteConfig;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
import com.mongodb.spark.sql.helpers.StructFields;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.bson.Document;
import org.bson.types.ObjectId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
public final class App {
public static void main(final String[] args) throws InterruptedException {
//构建 java spark 上下文对象
JavaSparkContext jsc = createJavaSparkContext(args);
//删除测试库
dropDatabase(getMongoClientURI(args));
// Add Sample Data,制造一些测试数据
List<String> characters = asList(
"{'name': 'Bilbo Baggins', 'age': 50}",
"{'name': 'Gandalf', 'age': 1000}",
"{'name': 'Thorin', 'age': 195}",
"{'name': 'Balin', 'age': 178}",
"{'name': 'Kíli', 'age': 77}",
"{'name': 'Dwalin', 'age': 169}",
"{'name': 'Óin', 'age': 167}",
"{'name': 'Glóin', 'age': 158}",
"{'name': 'Fíli', 'age': 82}",
"{'name': 'Bombur'}"
);
//将测试数据集合先并行和,构造成 rdd,然后将 集合中的字符串数据,转换成 Document 对象,保存到 mongodb 中
MongoSpark.save(jsc.parallelize(characters).map(new Function<String, Document>() {
@Override
public Document call(final String json) throws Exception {
return Document.parse(json);
}
}));
// Load inferring schema
//将 jsc 连接的 mongodb 中的 rdd 转换成 dataset
Dataset<Row> df = MongoSpark.load(jsc).toDF();
//打印出这个 dataset 的 schema
df.printSchema();
//展示 dataset 中的数据
df.show();
//将这个 dataset 注册为临时表
df.registerTempTable("characters2");
//构造一个 spark session
SparkSession sparkSession = SparkSession.builder().getOrCreate();
//使用 sql 方法查询 dataset 中的数据
Dataset<Row> centenarians2 = sparkSession.sql("SELECT name, age FROM characters2 WHERE age >= 100");
//展示查询的结果
centenarians2.show();
}
private static JavaSparkContext createJavaSparkContext(final String[] args) {
//先获取 mongodb 地址字符串
String uri = getMongoClientURI(args);
//删除测试库
dropDatabase(uri);
//构建 spark conf 对象
SparkConf conf = new SparkConf()
.setAppName("MongoSparkConnectorTour")
.set("spark.app.id", "MongoSparkConnectorTour")
.set("spark.mongodb.input.uri", uri)
.set("spark.mongodb.output.uri", uri);
//构建 java spark 上下文对象
return new JavaSparkContext(conf);
}
private static String getMongoClientURI(final String[] args) {
String uri;
if (args.length == 0) {
uri = "mongodb://192.168.14.7/test.coll"; // 测试库地址
} else {
uri = args[0];
}
return uri;
}
private static void dropDatabase(final String connectionString) {
MongoClientURI uri = new MongoClientURI(connectionString);
new MongoClient(uri).dropDatabase(uri.getDatabase());
}
}
13.编译
mvn clean package -DskipTests
14.提交任务
spark-submit --class com.packt.samples.App --master spark://192.168.14.8:7077 --executor-memory 1G --total-executor-cores 1 /root/com.packt.samples.archetype/target/com.packt.samples.archetype-1.0.0-jar-with-dependencies.jar