本次以叠加分析为例,介绍Sedona的RDD和SQL两种空间分析方式,数据源选择Postgresql,是已经入好库的OSM的building数据,可能有人问那为什么不用Postgis做分析,主要是千万级的我除了这个不知道选择什么数据来源……
废话不多说,开始干。
准备工作
-
Spark-Shell
如果是用Spark-Shell方式,提前把sedona编译好的包下载下来,然后用下面这个命令替换下jar包路径和master地址就可以了(注意自己准备postgresql的jar包)。
spark-shell --jars file:///opt/sedona/sedona-core-3.0_2.12-1.1.0-incubating.jar,file:///opt/sedona/sedona-sql-3.0_2.12-1.1.0-incubating.jar,file:///opt/sedona/sedona-python-adapter-3.0_2.12-1.1.0-incubating.jar,file:///opt/sedona/sedona-viz-3.0_2.12-1.1.0-incubating.jar,file:///opt/sedona/postgresql-42.2.24.jar,file:///opt/sedona/jts-core-1.18.0.jar,file:///opt/sedona/geotools-wrapper-1.1.0-25.2.jar --driver-class-path file:///opt/sedona/postgresql-42.2.24.jar --master spark://master:7077
IDE的话,留在下一讲整理好工程,在讲
SQL代码实现
-
如果是做大数据的叠加分析,对PG的表最好加个索引ID,然后就能利用Spark的并行计算优势。最简单的就是加一个自增序列,然后添加一个索引
alter table [your_table_name] add column pid bigserial primary key create index [your_index_name] on [your_table_name]("[column_name]")
-
导入包
import org.apache.spark.serializer.KryoSerializer import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator import org.apache.sedona.core.spatialRDD.SpatialRDD import org.apache.sedona.core.utils.SedonaConf import org.apache.sedona.sql.utils.{Adapter, SedonaSQLRegistrator} import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.SparkSession import org.apache.sedona.sql.utils.Adapter import org.locationtech.jts.geom.Geometry import org.locationtech.jts.geom.Envelope import org.apache.log4j.{Level, Logger} import org.apache.sedona.core.enums.{GridType, IndexType} import org.apache.sedona.core.spatialOperator.JoinQuery import org.apache.sedona.core.spatialRDD.{CircleRDD, SpatialRDD} import org.apache.sedona.sql.utils.{Adapter, SedonaSQLRegistrator} import org.apache.sedona.viz.core.{ImageGenerator, RasterOverlayOperator} import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator import org.apache.sedona.viz.utils.ImageType import org.apache.spark.serializer.KryoSerializer import org.apache.spark.sql.SparkSession import org.locationtech.jts.geom.Geometry import org.locationtech.jts.geom.{Coordinate, Geometry, GeometryFactory}
-
注册Sedona ,并设置一些全局参数(代码的sc就是SparkContext,如果是在IDE里,需要自己初始化)
sc.getConf.set("spark.serializer", classOf[KryoSerializer].getName) sc.getConf.set("spark.kryo.registrator", classOf[SedonaVizKryoRegistrator].getName) sc.getConf.set("sedona.join.numpartition", "6000") // 取决于数据量大小,这个数字设置越大,task就越多,需要的服务器性能也就越高 sc.getConf.set("sedona.global.index", "true") SedonaSQLRegistrator.registerAll(spark) SedonaVizRegistrator.registerAll(spark)
-
并行读取PG数据
val pgSourceDF = spark.read.format("jdbc").option("url", "jdbc:postgresql://[ip]:[port]/[database]?user=[user]&password=[password]").option("dbtable","[table]").option("numPartitions", [分多少task去读,我设置的1200,决定了你后面空间分析的效率]).option("partitionColumn","pid").option("lowerBound",1L).option("upperBound",[你最后一行数据的PID]).option("fetchSize",10000).load() pgSourceDF.createOrReplaceTempView("pgsource")
-
因为从PG读来的Geom是WKB字段,我们需要在利用Sedona的SQL函数转换Geom字段
val pgSourceMapDF = spark.sql("select pid, st_geomfromwkb(geom) as geom from pgsource") pgSourceMapDF.createOrReplaceTempView("pgsource")
-
利用Sedona的SQL函数进行叠加分析(这里做了一个自己和自己叠加)
val joinDF = spark.sql("select count(*) from pgsource as left, pgsource as right where st_intersects(left.geom, right.geom)") joinDF.count
OK了,到了这里,其实叠加主要代码就实现了,后面大家可以根据自己需要进行分析。