Spark写入mongoDB的时候,如果数据源是Dataset,则可以通过设置mode(SaveMode.Overwrite)模式,例如:
MongoSpark.write(xxxDataset)
//覆盖模式
.mode(SaveMode.Overwrite)
.save();
但是,如果是MongoSpark.save方法,写入类似JavaRDD<Document>格式数据的时候,有时候需要覆盖整个mongodb的collection,而不是只更新当前数据,找了好久都没有找到好的方法,最后直接看上面Dataset方法里面的save方法,发现可以通过MongoConnector里面有支持对Collection操作的方法,详细Api地址:
https://www.javadoc.io/doc/org.mongodb.spark/mongo-spark-connector_2.11/2.3.0
通过MongoConnector.withCollectionDo这个方法先把Collection提前drop掉,然后再save,于是把RDD数据直接以Overwrite模式写入Mongodb方法解决掉了。示例代码如下:
//JavaSparkContext
JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());
//WriteConfig配置
WriteConfig writeConfig = WriteConfig.create(jsc);
//提前把Collection先drop掉,也就是Overwrite模式
MongoConnector.create(jsc).withCollectionDo(writeConfig, Collection.class, new Function<MongoCollection<Collection>, Document>() {
@Override
public Document call(MongoCollection<Collection> v1) throws Exception {
v1.drop();
return null;
}
});
//rddMongoResult为您的rdd格式数据
//JavaRDD<Document> rddMongoResult = xxx;
MongoSpark.save(rddMongoResult, writeConfig);
这样就每次都可OverWrite模式写入MongoDB了,当然MongoConnector还可以对数据库或者Collection进行其它操作,请查看具体Api,记录一下顺便留个备忘存档。
如果遇到此问题的,希望能帮助到大家。
- By Foolin