ONOS集群内的多个instance间通过TCP连接建立通信(目的端口是9876),包括Raft,Anti-entropy,Heatbeat以及其它的数据同步信息,ONOS通过数据包中的metadata来区分不同作用的数据信息。在任何两个ONOS实例间,可以同时建立多个TCP连接通信,这些TCP连接在需要时建立,若这些TCP连接空闲超过1min,TCP连接会中断。通过查看ControllerNode的代码实现,可以发现默认的9876端口信息
同时,为了方便使用东西向通信服务,ONOS又提供了一个集群通信服务服务接口(ClusterCommunicationService),通过使用该服务,可以很容易的实现东西数据的传输和处理。很多的最终一致性store都使用了ClusterCommunicationService,如ECDeviceStore,ECLinkStore,DistributedFlowRuleStore等,同时EventuallyConsistentMapImpl中的anti-entropy的实现也都使用了该服务。
MessageSubject类:代表了Message
subject,集群之间的消息和subject绑定,subject指示这些消息在接收端如何被处理
ClusterCommunicationManager实现了ClusterCommunicationService接口
下面为ClusterCommunicationManager中的broadcast方法
Broadcast接收三个参数——message,subject和encoder,Broadcast首先检查是否得到了CLUSTER_WRITE的权限。调用multicast函数,传入message、subject、encoder和集群内其他控制器的节点。
culsterService.getNodes()获取集群内所有node id ,使用java8中新特性stream、filter和lambda表达式过滤出集群中不和localnode id相等的node id,即其他控制器的node id并把它们放在集合里穿给multicast函数。
Multicast函数如下所示:
根据传入的参数,multicast函数首先也是检查是否有CLUSTER_WRITE的权限,调用ClusterMessage函数的getBytes方法生成一个字节数组payload。传入ClusterMessage的参数有localNodeId(发送方nodeID),subject,还有通过timeFunction函数生成的encoder message需要的时间。getBytes方法如下图所示:
其中sender即localNodeID,subject即传入的subject,payload即调用timefunction获得的结果。getBytes函数返回一个字节数组格式的buffer.array并赋给multicast函数中的字节数组payload。
获得了payload之后,multicast函数对每一个nodeID(传入的由集群内其他控制器NodeId组成的集合)调用doUnicast函数。doUnicast函数如下图所示:
根据传入的参数返回消息被传送到相应nodeid的future消息。