修改ES IK插件源码,配合MySQL实现词库热更新

ES IK词库热更新简介

在实际工作中,我们经常需要更新ElasticSearch中IKAnalyzer插件的自定义词库,以获得更好的中文分词和搜索效果。在默认情况下,每次更新之后都需要重启ES集群才能生效,极其不方便。因此IKAnalyzer官方也提供了一种热更新的方法,在其GitHub主页上写道:


在其源码内部对应的是Monitor类,实现了Runnable接口。我们采用的ES版本是2.3.2,对应IK插件版本为1.9.2。

public class Monitor implements Runnable {
    public static ESLogger logger= Loggers.getLogger("ik-analyzer");
    private static CloseableHttpClient httpclient = HttpClients.createDefault();

    private String last_modified;
    private String eTags;
    private String location;

    public Monitor(String location) {
        this.location = location;
        this.last_modified = null;
        this.eTags = null;
    }
    /**
     * 监控流程:
     *  ①向词库服务器发送Head请求
     *  ②从响应中获取Last-Modify、ETags字段值,判断是否变化
     *  ③如果未变化,休眠1min,返回第①步
     *  ④如果有变化,重新加载词典
     *  ⑤休眠1min,返回第①步
     */
    public void run() {
        RequestConfig rc = RequestConfig.custom().setConnectionRequestTimeout(10*1000)
                .setConnectTimeout(10*1000).setSocketTimeout(15*1000).build();
        HttpHead head = new HttpHead(location);
        head.setConfig(rc);

        if (last_modified != null) {
            head.setHeader("If-Modified-Since", last_modified);
        }
        if (eTags != null) {
            head.setHeader("If-None-Match", eTags);
        }
        CloseableHttpResponse response = null;

        try {
            response = httpclient.execute(head);
            if(response.getStatusLine().getStatusCode()==200){
                if (!response.getLastHeader("Last-Modified").getValue().equalsIgnoreCase(last_modified)
                        ||!response.getLastHeader("ETag").getValue().equalsIgnoreCase(eTags)) {
                    Dictionary.getSingleton().reLoadMainDict();
                    last_modified = response.getLastHeader("Last-Modified")==null?null:response.getLastHeader("Last-Modified").getValue();
                    eTags = response.getLastHeader("ETag")==null?null:response.getLastHeader("ETag").getValue();
                }
            }else if (response.getStatusLine().getStatusCode()==304) {
                //noop
            }else{
                Dictionary.logger.info("remote_ext_dict {} return bad code {}" , location , response.getStatusLine().getStatusCode() );
            }
        } catch (Exception e) {
            Dictionary.logger.error("remote_ext_dict {} error!",e , location);
        }finally{
            try {
                if (response != null) {
                    response.close();
                }
            } catch (IOException e) {
                logger.error(e.getMessage(), e);
            }
        }
    }
}

原理很容易理解,但是我们按照它的标准实现起来就要颇费一番周折,并且还是得手动编辑词库文件。如果今后词库变大变多,或者想让运营人员也参与词库的管理,这种方式就有些僵硬了。因此,我们决定直接修改ES IK插件的源码,使之能够从MySQL表中定时拉取词库的更新,再通过MySQL设计一个简易的管理工具,一劳永逸。

在MySQL建表

建表语句如下:

create table es_dynamic_dict (
  id int(11) primary key not null auto_increment,
  word varchar(50) not null default '' comment '词条',
  is_stopword tinyint(1) not null default '0' comment '是否为停止词, 1为是',
  is_deleted tinyint(1) not null default '0' comment '删除状态, 1为删除',
  last_update int(11) not null default '0' comment '最后更新时间',
  key is_stopword_idx(is_stopword),
  key is_deleted_idx(is_deleted),
  key update_time_idx(last_update)
) engine=InnoDB default charset=utf8 comment='ES热更新词库表';

修改IK插件源码

将源码clone下来,先在pom文件中加入MySQL驱动的依赖:

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.30</version>
    </dependency>

再在maven-assembly-plugin的描述符文件中,将MySQL驱动添加到依赖集合中。直接上图比较好说明一些:


然后找到源码中管理词典的类Dictionary,它位于org.wltea.analyzer.dic包。在相同的包下新建DatabaseDictionary类,代码如下:

public class DatabaseDictionary {
    private static final ESLogger LOGGER = Loggers.getLogger("ik-analyzer");
    private static final String DB_PROP_PATH = "ik/db-ext-dict.properties";

    private static DatabaseDictionary instance;
    private Properties dbProperties;
    private Connection connection;

    private String getDictRoot() {
        return PathUtils.get(new File(
            AnalysisIkPlugin.class.getProtectionDomain().getCodeSource().getLocation().getPath()
        ).getParent(), "config").toAbsolutePath().toString();
    }

    private DatabaseDictionary() {
        try {
            Class.forName("com.mysql.jdbc.Driver");
            dbProperties = new Properties();
            dbProperties.load(new FileInputStream(PathUtils.get(getDictRoot(), DB_PROP_PATH).toFile()));
            // LOGGER.info("Loaded MySQL driver and " + DB_PROP_PATH);
        } catch (ClassNotFoundException e) {
            LOGGER.error("MySQL driver not found");
        } catch (IOException e) {
            LOGGER.error("Error reading file " + DB_PROP_PATH);
        }
    }

    public static DatabaseDictionary getInstance() {
        if (instance == null) {
            synchronized (DatabaseDictionary.class) {
                if (instance == null) {
                    instance = new DatabaseDictionary();
                }
            }
        }
        return instance;
    }

    private void initConnection() {
        try {
            connection = DriverManager.getConnection(
                dbProperties.getProperty("jdbc.url"),
                dbProperties.getProperty("jdbc.user"),
                dbProperties.getProperty("jdbc.password")
            );
            // LOGGER.info("Created JDBC connnection");
        } catch (SQLException e) {
            LOGGER.error("Error creating JDBC connection: " + e.getMessage());
        }
    }

    private void closeConnection(ResultSet resultSet, PreparedStatement statement) {
        try {
            if (resultSet != null) {
                resultSet.close();
                resultSet = null;
            }
            if (statement != null) {
                statement.close();
                statement = null;
            }
            if (connection != null) {
                connection.close();
                connection = null;
            }
            // LOGGER.info("Closed JDBC connnection");
        } catch (SQLException e) {
            LOGGER.error("Error closing connection: " + e.getMessage());
        }
    }

    public Set<String> fetchWords(long lastUpdate, boolean isStopword, boolean isDeleted) {
        initConnection();
        Set<String> result = new HashSet<>();
        PreparedStatement statement = null;
        ResultSet resultSet = null;

        try {
            StringBuilder sql = new StringBuilder("select word from ");
            sql.append(dbProperties.getProperty("ext_dict.table.name"));
            sql.append(isDeleted ? " where is_deleted = 1 " : " where is_deleted = 0 ");
            sql.append(isStopword ? "and is_stopword = 1 " : "and is_stopword = 0 ");
            sql.append("and last_update >= ");
            sql.append(lastUpdate);

            statement = connection.prepareStatement(sql.toString());
            resultSet = statement.executeQuery();
            while (resultSet.next()) {
                String word = resultSet.getString("word");
                if (word != null && word.length() > 0) {
                    result.add(word);
                }
            }

            LOGGER.info("Executed query: " + sql.toString() + ", return count: " + result.size());
        } catch (SQLException e) {
            LOGGER.error("Error executing query of words: " + e.getMessage());
        } finally {
            closeConnection(resultSet, statement);
        }

        return result;
    }
}

就是一个标准的JDBC连接单例。为了提供基本的可配置性,数据库的连接地址、用户名、密码,以及热更新词库表的表名都通过一个.properties文件来获取。

回到词典类Dictionary,可以看到已经有了用于批量加载和卸载新词条的方法addWords()和disableWords(),但没有批量加载和卸载新停止词的方法,所以需要在它里面新写两个。singleton则是Dictionary的单例。

    public void addStopwords(Collection<String> stopwords) {
        if(stopwords != null) {
            for(String word : stopwords){
                if (word != null) {
                    singleton._StopWords.fillSegment(word.trim().toCharArray());
                }
            }
        }
    }

    public void disableStopwords(Collection<String> stopwords) {
        if (stopwords != null) {
            for (String word : stopwords) {
                if (word != null) {
                    singleton._StopWords.disableSegment(word.trim().toCharArray());
                }
            }
        }
    }

然后写一个线程用来执行词库的更新。新建一个DatabaseMonitor类,如下:

public class DatabaseMonitor implements Runnable {
    private static final ESLogger LOGGER = Loggers.getLogger("ik-analyzer");
    private int periodMinutes;

    public DatabaseMonitor(int periodMinutes) {
        this.periodMinutes = periodMinutes;
        LOGGER.info("Constructed DatabaseMonitor");
    }

    @Override
    public void run() {
        try {
            DatabaseDictionary dbDict = DatabaseDictionary.getInstance();
            long lastUpdate = (System.currentTimeMillis() - periodMinutes * 60 * 1000) / 1000;

            Set<String> words = dbDict.fetchWords(lastUpdate, false, false);
            Set<String> stopwords = dbDict.fetchWords(lastUpdate, true, false);
            Set<String> deletedWords = dbDict.fetchWords(lastUpdate, false, true);
            Set<String> deletedStopwords = dbDict.fetchWords(lastUpdate, true, true);

            Dictionary dict = Dictionary.getSingleton();
            dict.addWords(words);
            dict.addStopwords(stopwords);
            dict.disableWords(deletedWords);
            dict.disableStopwords(deletedStopwords);
            // LOGGER.info("Updated dictionary from MySQL");
        } catch (Throwable t) {
            LOGGER.error("Caught throwable in DatabaseMonitor. Message: " + t.getMessage());
            LOGGER.error("Stack trace:");
            for (StackTraceElement trace : t.getStackTrace()) {
                LOGGER.error(trace.toString());
            }
        }
    }
}

最后,利用单线程的调度线程池来定期执行DatabaseMonitor线程。这个逻辑写在initial()方法中原定时逻辑的下面就行。

private static ScheduledExecutorService dbPool = Executors.newSingleThreadScheduledExecutor();

dbPool.scheduleAtFixedRate(new DatabaseMonitor(7), 1, 5, TimeUnit.MINUTES);
logger.info("Scheduled MySQL dictionary update");

调度的初始延时为1分钟,周期为5分钟。每次取得当前时间戳前7分钟(通过periodMinutes参数控制)内的变更进行操作,可以避免有缺漏。当然这个周期可以更短,或者同样做成可配置的,但对我们而言必要性并不大。

打包运行

代码修改的工作完成了,用Maven打包,将其中的elasticsearch-analysis-ik-1.9.2.jar和mysql-connector-java-5.1.30.jar上传到各个ES节点的${ES_HOME}/plugins/ik目录下即可。

然后在${ES_HOME}/plugins/ik/config/ik目录下新建db-ext-dict.properties文件,写入如下内容:

jdbc.url=jdbc\:mysql\://10.11.12.123\:3306/some_db?tinyInt1isBit=false
jdbc.user=some_user
jdbc.password=some_password
ext_dict.table.name=es_dynamic_dict

接下来滚动重启ES集群。这属于ES基操,复习一下步骤吧。

  • 禁止分片分配:
curl -s -XPUT es0:9200/_cluster/settings -d '{
    "transient" : {
        "cluster.routing.allocation.enable" : "none"
    }
}'
  • 切换到es帐户,杀掉elasticsearch进程:
su - es
ps aux | grep elasticsearch
kill -9 1480
  • 执行更新的操作。
  • 重启ES进程:
bin/elasticsearch -d
  • 重启分片分配:
curl -s -XPUT es0:9200/_cluster/settings -d '{
    "transient" : {
        "cluster.routing.allocation.enable" : "all"
    }
}'
  • 等待集群状态变成GREEN之后,重复操作其余的节点即可。

来做个测试。在MySQL表中插入一条记录:

replace into es_dynamic_dict values(1,'除你武器',0,0,unix_timestamp(now()));

等待更新的日志输出之后,尝试分词:


大功告成。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容