使用spark 程序 读取 mongodb

原因:有同事需要连接 mongodb ,保存数据到 大数据平台。之前尝试了 hive 建立外部表的方式。但是一直不成功。报错原因不明。尝试 mongo Hadoop 中 的 spark 例子,提交到 集群中会报 任务无法序列化的错误。因此采用了 mongo spark 连接器来做一个测试程序

安装 mongodb

主机环境

CentOS Linux release 7.4.1708 (Core)

关闭防火墙

关闭selinux

192.168.14.7

1.下载安装包

https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-3.4.18-1.el7.x86_64.rpm

https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-mongos-3.4.18-1.el7.x86_64.rpm

https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-server-3.4.18-1.el7.x86_64.rpm

https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-shell-3.4.18-1.el7.x86_64.rpm

https://repo.mongodb.org/yum/redhat/7/mongodb-org/3.4/x86_64/RPMS/mongodb-org-tools-3.4.18-1.el7.x86_64.rpm

2.安装 httpd createrepo

yum -y install httpd

3.建立目录

mkdir /var/www/html/mongodb

4.移动rpm包到 /var/www/html/mongodb 目录

mv *rpm /var/www/html/mongodb

5.进入 mkdir /var/www/html/mongodb 目录,建立 repo 档案

cd /var/www/html/mongodb

createrepo .

6.启动 httpd 服务并设置开机启动

systemctl start httpd.service

systemctl enable httpd.service

7.建立 mongodb 的 repo 文件

vi /etc/yum.repos.d/mongodb.repo

[mongodb]

name=mongodb

baseurl=http://192.168.14.7/mongodb

enable=1

gpgcheck=0

8.建立 yum 缓存

yum makecache

9.安装 mongodb server 和 shell

yum -y install mongodb-org-server.x86_64 mongodb-org-shell.x86_64

10.修改 /etc/mongod.conf 配置文件

#  bindIp: 127.0.0.1

11.修改系统配置

echo never > /sys/kernel/mm/transparent_hugepage/defrag

echo never > /sys/kernel/mm/transparent_hugepage/enabled

12.启动 mongodb,并设置开机启动

systemctl start mongod.service

systemctl enable mongod.service

13.新建 test 库

use test

14.往 test 表中插入数据

db.test.insertOne({"123" : "123-val", "a" : 34, "b" : 36})

db.test.insertOne({"456" : "456-val", "a" : 45, "b" : 67})

15.检查 test 表中的数据

db.test.find()

spark集群搭建

主机环境

CentOS Linux release 7.4.1708 (Core)

关闭防火墙

关闭 selinux

192.168.14.8

1.安装 jdk

tar -zxvf jdk-8u171-linux-x64.tar.gz

mv jdk1.8.0_171/ /opt/jdk

2.安装 spark

tar -zxvf spark-2.4.5-bin-hadoop2.7.tgz

mv spark-2.4.5-bin-hadoop2.7/  /opt/spark

3.安装 maven

tar -zxvf apache-maven-3.5.2-bin.tar.gz

mv apache-maven-3.5.2/ /opt/maven

3.修改环境变量

vi ~/.bashrc

export JAVA_HOME=/opt/jdk

export SPARK_HOME=/opt/spark

export M2_HOME=/opt/maven

export PATH=$PATH:$JAVA_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin:$M2_HOME/bin

4.使环境变量生效

source ~/.bashrc

5.重命名 spark 配置文件

cd /opt/spark/conf/

mv spark-env.sh.template spark-env.sh

mv spark-defaults.conf.template spark-defaults.conf

6.修改 spark 配置

vi spark-defaults.conf

spark.master                    spark://192.168.14.8:7077

vi spark-env.sh

SPARK_MASTER_HOST=192.168.14.8

7.启动 spark master

start-master.sh

8.启动 slave

start-slave.sh spark://192.168.14.8:7077

9.查看 web 页面

浏览器访问 http://192.168.14.8:8080/

10.新建 maven 项目 进入 目录

mvn archetype:generate -DgroupId=com.packt.samples -DartifactId=com.packt.samples.archetype -Dversion=1.0.0 -DinteractiveMode=false -DarchetypeCatalog=internal

cd com.packt.samples

11.编辑 pom.xml

    <dependency>

      <groupId>org.apache.spark</groupId>

      <artifactId>spark-core_2.11</artifactId>

      <version>2.4.5</version>

    </dependency>

    <dependency>

      <groupId>org.apache.spark</groupId>

      <artifactId>spark-sql_2.11</artifactId>

      <version>2.4.5</version>

    </dependency>

        <dependency>

            <groupId>org.mongodb.spark</groupId>

            <artifactId>mongo-spark-connector_2.11</artifactId>

            <version>2.2.1</version>

        </dependency>

<build>

  <plugins>

    <plugin>

      <groupId>org.apache.maven.plugins</groupId>

      <artifactId>maven-assembly-plugin</artifactId>

      <version>2.4</version>

      <configuration>

        <descriptorRefs>

          <descriptorRef>jar-with-dependencies</descriptorRef>

        </descriptorRefs>

        <archive>

          <manifest>

            <addClasspath>true</addClasspath>

            <mainClass>com.packt.samples.App</mainClass>

          </manifest>

        </archive>

      </configuration>

      <executions>

        <execution>

          <id>assemble-all</id>

          <phase>package</phase>

          <goals>

            <goal>single</goal>

          </goals>

        </execution>

      </executions>

    </plugin>

  </plugins>

</build>

12.编辑源代码

vi src/main/java/com/packt/samples/App.java

package com.packt.samples;

import com.mongodb.MongoClient;

import com.mongodb.MongoClientURI;

import com.mongodb.client.MongoDatabase;

import com.mongodb.spark.MongoConnector;

import com.mongodb.spark.MongoSpark;

import com.mongodb.spark.config.ReadConfig;

import com.mongodb.spark.config.WriteConfig;

import com.mongodb.spark.rdd.api.java.JavaMongoRDD;

import com.mongodb.spark.sql.helpers.StructFields;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.StructType;

import org.bson.Document;

import org.bson.types.ObjectId;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import static java.lang.String.format;

import static java.util.Arrays.asList;

import static java.util.Collections.singletonList;

public final class App {

    public static void main(final String[] args) throws InterruptedException {

        //构建 java spark 上下文对象

        JavaSparkContext jsc = createJavaSparkContext(args);

        //删除测试库

        dropDatabase(getMongoClientURI(args));

        // Add Sample Data,制造一些测试数据

        List<String> characters = asList(

            "{'name': 'Bilbo Baggins', 'age': 50}",

            "{'name': 'Gandalf', 'age': 1000}",

            "{'name': 'Thorin', 'age': 195}",

            "{'name': 'Balin', 'age': 178}",

            "{'name': 'Kíli', 'age': 77}",

            "{'name': 'Dwalin', 'age': 169}",

            "{'name': 'Óin', 'age': 167}",

            "{'name': 'Glóin', 'age': 158}",

            "{'name': 'Fíli', 'age': 82}",

            "{'name': 'Bombur'}"

        );

        //将测试数据集合先并行和,构造成 rdd,然后将 集合中的字符串数据,转换成 Document 对象,保存到 mongodb 中

        MongoSpark.save(jsc.parallelize(characters).map(new Function<String, Document>() {

            @Override

            public Document call(final String json) throws Exception {

                return Document.parse(json);

            }

        }));

        // Load inferring schema

        //将 jsc 连接的 mongodb 中的 rdd 转换成 dataset

        Dataset<Row> df = MongoSpark.load(jsc).toDF();

        //打印出这个 dataset 的 schema

        df.printSchema();

        //展示 dataset 中的数据

        df.show();

        //将这个 dataset 注册为临时表

        df.registerTempTable("characters2");

        //构造一个 spark session

        SparkSession sparkSession = SparkSession.builder().getOrCreate();

        //使用 sql 方法查询 dataset 中的数据

        Dataset<Row> centenarians2 = sparkSession.sql("SELECT name, age FROM characters2 WHERE age >= 100");

        //展示查询的结果

        centenarians2.show();

    }

    private static JavaSparkContext createJavaSparkContext(final String[] args) {

        //先获取 mongodb 地址字符串

        String uri = getMongoClientURI(args);

        //删除测试库

        dropDatabase(uri);

        //构建 spark conf 对象

        SparkConf conf = new SparkConf()

                .setAppName("MongoSparkConnectorTour")

                .set("spark.app.id", "MongoSparkConnectorTour")

                .set("spark.mongodb.input.uri", uri)

                .set("spark.mongodb.output.uri", uri);

        //构建 java spark 上下文对象

        return new JavaSparkContext(conf);

    }

    private static String getMongoClientURI(final String[] args) {

        String uri;

        if (args.length == 0) {

            uri = "mongodb://192.168.14.7/test.coll"; // 测试库地址

        } else {

            uri = args[0];

        }

        return uri;

    }

    private static void dropDatabase(final String connectionString) {

        MongoClientURI uri = new MongoClientURI(connectionString);

        new MongoClient(uri).dropDatabase(uri.getDatabase());

    }

}

13.编译

mvn clean package -DskipTests

14.提交任务

spark-submit --class com.packt.samples.App --master spark://192.168.14.8:7077  --executor-memory 1G --total-executor-cores 1 /root/com.packt.samples.archetype/target/com.packt.samples.archetype-1.0.0-jar-with-dependencies.jar

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容