基于Ignite的ContinuousQuery

知识准备

持续查询语言(CQL, continuous query language)类似于:
内存数据库+视图+触发器 的解决方案。
简单来说,一有符合条件的对象进入查询结果集,就执行一次回调函数。

本文的实现是基于C/S模式的,即Client端先按照一定规则从Server端查询数据,返回结果集后,Server端继续添加符合条件的数据,Client端仍然可以实时查询返回结果。
持续查询可以监听缓存中数据的变更。持续查询一旦启动,如果有,就会收到符合查询条件的数据变化的通知。

主要maven依赖:


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-core</artifactId>
            <version>${ignite.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-spring</artifactId>
            <version>${ignite.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-indexing</artifactId>
            <version>${ignite.version}</version>
        </dependency>
    <properties>
        <ignite.version>2.4.0</ignite.version>
    </properties>

TIPS:本工程使用的ignite的版本是2.4.0,ignite更新迭代较快,版本见得差异还是很大的。

主要代码实现

Server端实现:

github源代码

package xx.xx.searchengine;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author: wangjie
 * @Description:
 * @Date: Created in 10:13 2018/3/27
 */

@SpringBootApplication
@RestController
public class ServerApplication {

    //cache name
    private static final String CACHE_NAME = "serverCache";

    private static Ignite ignite = Ignition.start("example-cache.xml");

    public static void main(String[] args) throws InterruptedException {

        SpringApplication.run(ServerApplication.class,args);

    }

    @RequestMapping(value = "/testIgnite",method = RequestMethod.GET)
    public String testIgnite(Integer key,String value) throws InterruptedException{
        ignite.active(true);
        System.out.println("*******insert data begins*********");

        try(IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)){
            cache.put(key,value);
            Thread.sleep(2000);
        }
        return "*******insert data succeed*********";
    }
}

example-cache.xml主要配置(在初始化文件之后添加的):

 <property name="clientMode" value="false"/>
 <property name="peerClassLoadingEnabled" value="true"/>

Client端实现:

github源代码

package xx.xx.searchengine;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;

/**
 * @Author: wangjie
 * @Description:
 * @Date: Created in 10:26 2018/3/27
 */
@SpringBootApplication
public class ClientApplication {

    //cache name
    private static final String CACHE_NAME = "serverCache";

    public static void main(String[] args) throws InterruptedException {

        SpringApplication.run(ClientApplication.class, args);


        try (Ignite ignite = Ignition.start("example-cache.xml")) {
            ignite.active(true);
            System.out.println("**********Cache continuous query example started**********");

            try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)) {

                // Create new continuous query.
                ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
                //init query
                qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() {
                    @Override
                    public boolean apply(Integer key, String val) {
                        return key > 0;
                    }
                }));
                 //set local listener
                qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
                    @Override
                    public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
                        for (CacheEntryEvent<? extends Integer, ? extends String> e : evts) {
                            System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
                        }
                    }
                });


                try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
                    // Iterate through existing data.
                    for (Cache.Entry<Integer, String> e : cur) {
                        System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
                        Thread.sleep(2000000000);
                    }
                } finally {
                    ignite.destroyCache(CACHE_NAME);
                }
            }
        }
    }
}
  • 初始化查询
    当要执行持续查询时,在将持续查询注册在集群中以及开始接收更新之前,可以有选择地指定一个初始化查询。
    初始化查询可以通过ContinuousQuery.setInitialQuery(Query)方法进行设置,并且可以是任意查询类型,包括扫描查询,SQL查询和文本查询。
  • 远程过滤器
    这个过滤器在给定键对应的主和备节点上执行,然后评估更新是否需要作为一个事件传播给该查询的本地监听器。
    如果过滤器返回true,那么本地监听器就会收到通知,否则事件会被忽略。产生更新的特定主和备节点,会在主/备节点以及应用端执行的本地监听器之间,减少不必要的网络流量。
    远程过滤器可以通过ContinuousQuery.setRemoteFilter(CacheEntryEventFilter<K, V>)方法进行设置。
  • 本地监听器
    当缓存被修改时(一个条目被插入、更新或者删除),更新对应的事件就会发送给持续查询的本地监听器,之后应用就可以做出对应的反应。
    当事件通过了远程过滤器,他们就会被发送给客户端,通知哪里的本地监听器。
    本地监听器是通过ContinuousQuery.setLocalListener(CacheEntryUpdatedListener<K, V>)方法设置的。

example-cache.xml主要配置(在初始化文件之后添加的):

  <property name="clientMode" value="true"/>
  <property name="peerClassLoadingEnabled" value="true"/>

启动程序,测试连续查询

启动Server端:

Server.png

启动Client端:

Client.png

在postman中发送get请求:

http://localhost:8080/testIgnite?key=26&value="hahahah"

postman.png

查看Client端控制台的输出信息:

Client-updata.png

关于ignite的其它文章:
Ignite CS 模式 java初探
Ignite 之计算运用的 Hello world

程序媛小白一枚,如有错误,烦请批评指正!(#.#)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,179评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,229评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,032评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,533评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,531评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,539评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,916评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,813评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,568评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,654评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,354评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,937评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,918评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,152评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,852评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,378评论 2 342

推荐阅读更多精彩内容

  • 阅读完蔡崇达的《皮囊》,我似乎也脱离了肉体,只剩灵魂在思考。皮也就是作者的外曾祖母。她活到九十九岁,从来不在乎自己...
    陈斌Will阅读 1,066评论 0 0
  • 前言 自控力不是和自己的战争,而是与自己的和解,需要真正的认识自我和关心自我。自控不是等待一个理想自我出现拯救一切...
    林对对阅读 2,213评论 26 137
  • 文/艺人崔 我很庆幸,我能够生活在她的臂弯之下,她的笑容之下,即使是寒冬,也温暖如春,即使艰难,也在前行! 在家的...
    艺人崔是小可爱阅读 317评论 0 0
  • 春风杨柳青堤岸,卵石径、馥香弥漫。 古滩头、望断碧云天,也不见、画桥溪畔。 夕阳西坠夷江晚,只剩下、簟纹波乱。 柳...
    刘小地阅读 779评论 17 44
  • 今天是培训的第五天,上午主要是理论学:1分娩舍生产管理,內容较多,面也较广。下午主要进产房实操练习。先是产房环境评...
    b84d0d529959阅读 236评论 0 0