之前的例子中,我们实现了一个Service Grid服务,可以周期性的向client节点发送消息。 一个基本需求是,当client收到消息后总要做点儿什么,并且service当所有client都做完了该做的事情后做些后续工作。
根据上述描述,可以进一步完善我们的需求
需求
一个定时任务,可以根据指定的时间周期向client节点发出信息
client将service发出的消息打印出来
当所有client完成消息打印任务后,service打印一条信息,表明所有任务结束
实现
- 计数器
我们需求一个分布式计数器以便于service了解是否所有的client都完成了自己的任务,Ignite提供了一个CountDownLatch的分布式数据结构可以实现此需求。 此CountDownLatch可以看做是java 多线程中的CountDownLatch的一个分布式实现,功能基本相同。改造后的代码如下
ignite.scheduler().scheduleLocal(
new Runnable() {
@Override
public void run() {
//execute(ctx);
IgniteCountDownLatch latch = ignite.countDownLatch(
"latchName", // Latch name.
ignite.cluster().forClients().metrics().getTotalNodes(), // Initial count.
false, // Auto remove, when counter has reached zero.
true // Create if it does not exist.
);
IgniteCache<String, Object> cache = ignite.getOrCreateCache("CountDown");
//put this countdown in cache
cache.put("CountDown",latch);
System.out.println("Send message");
rmtMsg.send("MyOrderedTopic", "hello");
//waite to finish
latch.await();
System.out.println("All thread done.");
}
}, "* * * * *"
);
基本上,我们生成了一个CountDownLatch并将个数设为client的数量,随后将此CountDownLatch放入cache中以便其他client使用, 随后等待所有client完成任务后,打印日志"All thread done."
- Client side
之前的代码中,我们将client的逻辑写在了service中,但更好的做法是各个client各自实现自己的逻辑代码
public class ClientDemo {
public static void main(String[] args) throws IgniteException {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setPeerClassLoadingEnabled(true);
// Enable client mode.
cfg.setClientMode(true);
Ignite ignite = Ignition.start(cfg);
IgniteMessaging msg = ignite.message();
msg.localListen("MyOrderedTopic", (nodeId, message) -> {
System.out.println("Received ordered message [msg=" + message + ", from=" + nodeId + ']');
IgniteCache<String, Object> cache = ignite.cache("CountDown");
IgniteCountDownLatch latch = (IgniteCountDownLatch)cache.get("CountDown");
latch.countDown();
return true; // Return true to continue listening.
});
}
}
- 启动Ignite server和client
Server 输出
[18:55:05] Topology snapshot [ver=1, servers=1, clients=0, CPUs=4, heap=1.0GB]
[18:55:16] Topology snapshot [ver=2, servers=1, clients=1, CPUs=4, heap=2.8GB]
Send message
All thread done.
Client 输出
[18:55:17] Topology snapshot [ver=2, servers=1, clients=1, CPUs=4, heap=2.8GB]
Received ordered message [msg=hello, from=dd076b4b-f1df-491d-969c-db04edf6f9df]
至此,我们实现了一个比较完整的service grid服务