知识准备
持续查询语言(CQL, continuous query language)类似于:
内存数据库+视图+触发器 的解决方案。
简单来说,一有符合条件的对象进入查询结果集,就执行一次回调函数。
本文的实现是基于C/S模式的,即Client端先按照一定规则从Server端查询数据,返回结果集后,Server端继续添加符合条件的数据,Client端仍然可以实时查询返回结果。
持续查询可以监听缓存中数据的变更。持续查询一旦启动,如果有,就会收到符合查询条件的数据变化的通知。
主要maven依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
<version>${ignite.version}</version>
</dependency>
<properties>
<ignite.version>2.4.0</ignite.version>
</properties>
TIPS:本工程使用的ignite的版本是2.4.0,ignite更新迭代较快,版本见得差异还是很大的。
主要代码实现
Server端实现:
package xx.xx.searchengine;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: wangjie
* @Description:
* @Date: Created in 10:13 2018/3/27
*/
@SpringBootApplication
@RestController
public class ServerApplication {
//cache name
private static final String CACHE_NAME = "serverCache";
private static Ignite ignite = Ignition.start("example-cache.xml");
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ServerApplication.class,args);
}
@RequestMapping(value = "/testIgnite",method = RequestMethod.GET)
public String testIgnite(Integer key,String value) throws InterruptedException{
ignite.active(true);
System.out.println("*******insert data begins*********");
try(IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)){
cache.put(key,value);
Thread.sleep(2000);
}
return "*******insert data succeed*********";
}
}
example-cache.xml主要配置(在初始化文件之后添加的):
<property name="clientMode" value="false"/>
<property name="peerClassLoadingEnabled" value="true"/>
Client端实现:
package xx.xx.searchengine;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
/**
* @Author: wangjie
* @Description:
* @Date: Created in 10:26 2018/3/27
*/
@SpringBootApplication
public class ClientApplication {
//cache name
private static final String CACHE_NAME = "serverCache";
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ClientApplication.class, args);
try (Ignite ignite = Ignition.start("example-cache.xml")) {
ignite.active(true);
System.out.println("**********Cache continuous query example started**********");
try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)) {
// Create new continuous query.
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
//init query
qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() {
@Override
public boolean apply(Integer key, String val) {
return key > 0;
}
}));
//set local listener
qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
@Override
public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
for (CacheEntryEvent<? extends Integer, ? extends String> e : evts) {
System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
}
}
});
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
// Iterate through existing data.
for (Cache.Entry<Integer, String> e : cur) {
System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
Thread.sleep(2000000000);
}
} finally {
ignite.destroyCache(CACHE_NAME);
}
}
}
}
}
- 初始化查询
当要执行持续查询时,在将持续查询注册在集群中以及开始接收更新之前,可以有选择地指定一个初始化查询。
初始化查询可以通过ContinuousQuery.setInitialQuery(Query)方法进行设置,并且可以是任意查询类型,包括扫描查询,SQL查询和文本查询。 - 远程过滤器
这个过滤器在给定键对应的主和备节点上执行,然后评估更新是否需要作为一个事件传播给该查询的本地监听器。
如果过滤器返回true,那么本地监听器就会收到通知,否则事件会被忽略。产生更新的特定主和备节点,会在主/备节点以及应用端执行的本地监听器之间,减少不必要的网络流量。
远程过滤器可以通过ContinuousQuery.setRemoteFilter(CacheEntryEventFilter<K, V>)方法进行设置。 - 本地监听器
当缓存被修改时(一个条目被插入、更新或者删除),更新对应的事件就会发送给持续查询的本地监听器,之后应用就可以做出对应的反应。
当事件通过了远程过滤器,他们就会被发送给客户端,通知哪里的本地监听器。
本地监听器是通过ContinuousQuery.setLocalListener(CacheEntryUpdatedListener<K, V>)方法设置的。
example-cache.xml主要配置(在初始化文件之后添加的):
<property name="clientMode" value="true"/>
<property name="peerClassLoadingEnabled" value="true"/>
启动程序,测试连续查询
启动Server端:
启动Client端:
在postman中发送get请求:
http://localhost:8080/testIgnite?key=26&value="hahahah"
查看Client端控制台的输出信息:
关于ignite的其它文章:
Ignite CS 模式 java初探
Ignite 之计算运用的 Hello world
程序媛小白一枚,如有错误,烦请批评指正!(#.#)