Flink提供了三种通用的基于迭代的图计算模型的实现(Flink-Gelly:Iterative Graph Processing),分别是:Vertex-Centric, Scatter-Gather和Gather-Sum-Apply,接下来将分三篇文章分别来详细介绍每个模型的特点和具体使用方式。
Vertex-Centric模型概述
Vertex-Centric模型也被称之为Pregel,核心思想为:从图中每个顶点的角度表达计算。其计算过程以同步迭代地方式进行,每次迭代过程称之为一个Superstep(超步,也不知道这样翻译对不对),每个Superstep中处于活跃状态(active)的顶点并行地执行同样地UDF。顶点之间通过消息进行通信,在知道目标顶点ID的前提下,一个顶点可以向任何目标顶点发送消息。Superstep之间是同步执行的,下一个Superstep的执行需要依赖前一个Superstep的执行完成,因此上一个Superstep传递的消息会保证在下一个Superstep开始之前传递完毕。消息传递除了能在顶点之间传递消息之外,还能用于判断当前轮次的Superstep下,哪些顶点处于活跃状态,在每个轮次的Superstep中,只有那些接收到消息的顶点才会被认为是处于活跃状态。Vertex-Centric模型的执行过程如下图所示:
Vertex-Centric模型使用
Vertex-Centric模型原理十分简单,使用起来也不难,只需要我们定义两个部分:每个顶点需要执行的用户自定义函数ComputeFunction
和消息在每个迭代轮次中的组合方式MessageCombiner
,其中MessageCombiner
是可选的。下面我们以一段完整的示例代码来阐述Vertex-Centric模型的使用方法。
package com.quan.graph;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.pregel.ComputeFunction;
import org.apache.flink.graph.pregel.MessageCombiner;
import org.apache.flink.graph.pregel.MessageIterator;
import java.util.LinkedList;
import java.util.List;
public class VC_SSSP {
//Set 1 as the source.
public static int srcId = 1;
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
List<Edge<Integer, Integer>> edgesList = new LinkedList<>();
edgesList.add(new Edge<Integer, Integer>(1, 2, 12));
edgesList.add(new Edge<Integer, Integer>(1, 6, 3));
edgesList.add(new Edge<Integer, Integer>(1, 7, 14));
edgesList.add(new Edge<Integer, Integer>(2, 6, 7));
edgesList.add(new Edge<Integer, Integer>(2, 3, 10));
edgesList.add(new Edge<Integer, Integer>(3, 4, 3));
edgesList.add(new Edge<Integer, Integer>(3, 5, 5));
edgesList.add(new Edge<Integer, Integer>(3, 6, 4));
edgesList.add(new Edge<Integer, Integer>(4, 5, 4));
edgesList.add(new Edge<Integer, Integer>(5, 6, 2));
edgesList.add(new Edge<Integer, Integer>(5, 7, 8));
edgesList.add(new Edge<Integer, Integer>(6, 7, 9));
DataSet<Edge<Integer, Integer>> edges = env.fromCollection(edgesList);
// Read the input data and create a graph.
Graph<Integer, Integer, Integer> graph = Graph.fromDataSet(edges, new InitVertices(), env);
// Convert the graph to undirected.
Graph<Integer, Integer, Integer> undirected_graph = graph.getUndirected();
// Define the maximum number of iterations.
int maxIterations = 10;
// Execute the vertex-centric iteration.
Graph<Integer, Integer, Integer> result = undirected_graph.runVertexCentricIteration(
new SSSPComputeFunction(), new SSSPMessageCombiner(), maxIterations);
// Extract the vertices as the result.
DataSet<Vertex<Integer, Integer>> singleSourceShortestPaths = result.getVertices();
// Print the result.
singleSourceShortestPaths.print();
}
// User Define Function.
@SuppressWarnings("serial")
public static final class SSSPComputeFunction extends ComputeFunction<Integer, Integer, Integer, Integer> {
@Override
public void compute(Vertex<Integer, Integer> vertex, MessageIterator<Integer> messages) throws Exception {
Integer minDistance = vertex.getId().equals(srcId) ? 0 : Integer.MAX_VALUE;
for (Integer msg : messages) {
minDistance = Math.min(minDistance, msg);
}
if (minDistance < vertex.getValue()) {
setNewVertexValue(minDistance);
for (Edge<Integer, Integer> e : getEdges()) {
sendMessageTo(e.getTarget(), minDistance + e.getValue());
}
}
}
}
// Message combiner
@SuppressWarnings("serial")
public static final class SSSPMessageCombiner extends MessageCombiner<Integer, Integer> {
@Override
public void combineMessages(MessageIterator<Integer> messageIterator) throws Exception {
Integer minMessage = Integer.MAX_VALUE;
for (Integer msg : messageIterator) {
minMessage = Math.min(minMessage, msg);
}
sendCombinedMessage(minMessage);
}
}
@SuppressWarnings("serial")
private static final class InitVertices implements MapFunction<Integer, Integer> {
public Integer map(Integer id) {
return Integer.MAX_VALUE;
}
}
这里我们用一个下图所示的无向图为例,初始化的时候将每个节点的值都设置成当前数据类型的最大值,其中顶点ID为1的点作为源点,该图的单源点最短路径(SSP)执行结果如下图所示。在第一个Superstep期间,所有顶点都处于活跃状态,但是根据compute
的执行过程,最终仅有源点可以向其邻居传播距离。在接下来的Superstep骤中,每个顶点检查其接收到的消息并选择出它们之间的最小距离,如果这个距离小于它的当前值,该顶点就会更新它的当前值,并为它的邻居产生消息(当前最小值+到邻居的边的距离)。如果一个顶点在上一步中没有改变它的值,那么该顶点在当前轮次的迭代中不执行任何compute
操作,也不向下一个Superstep中的任何顶点发送消息。当所有顶点的状态不再改变或达到最大迭代次数时算法收敛。在该算法中,可以使用MessageCombiner
减少发送到目标顶点的消息数量。
Vertex-Centric模型参数配置
可以使用VertexCentricConfiguration对象配置Vertex-Centric模型。目前可指定的参数有
Name: 可以使用setName()
方法 为vertex-centric迭代模型指定一个名称。
Parallelism: 可以使用setParallelism()
方法为每个轮次迭代中顶点执行ComputeFunction
计算的并行度。
Solution set in unmanaged memory: 可以使用setSolutionSetUnmanagedMemory()
方法来指定结果集是否保存在托管内存中,默认情况下结果集是运行在托管内存中。
Aggregators: 可以使用registerAggregator()
方法来为每个迭代注册聚合函数,迭代聚合器在每个超步骤中将所有聚合全局地组合一次,并使它们在下一个超步骤中可用。
Broadcast Variables: 可以使用addBroadcastSet()
方法为 ComputeFunction
添加广播变量(Broadcast Variables)。
// configure the iteration
VertexCentricConfiguration parameters = new VertexCentricConfiguration();
// set the iteration name
parameters.setName("Gelly Iteration");
// set the parallelism
parameters.setParallelism(16);
//Defines whether the solution set is kept in managed memory
parameters.setSolutionSetUnmanagedMemory(true);
// register an aggregator
parameters.registerAggregator("sumAggregator", new LongSumAggregator());