spring data elasticsearch 使用及源码分析

spring integrate es.png
最近一直在用elasticsearch做一些数据的存储和查询 ,开始为了方便就直接使用了spring data elasticsearch ,期间也经历了很多坑,比如findByName(String name)这种接口默认只返回10条数据,自己也对源码做个跟踪,也研究了一下为什么elasticsearch要这样做。接下来我们通过分析spring data es的两个接口,来看一下是如果实现对es进行操作的。
1.findByName接口分析
  • spring data es repository 接口
public interface UserRepository extends ElasticsearchRepository<User,String> {
    /**
     * 默认只返回10条
     */
    List<User> findByName(String name);


    List<User> deleteByName(String name);
}

  • spring 启动分析构造UserRepository代理接口
    启动的时候ElasticsearchRepositoryFactoryBean的方法会进行对应代理类的生成,并注入到spring容器
    @Override
    public void afterPropertiesSet() {
        super.afterPropertiesSet();
        Assert.notNull(operations, "ElasticsearchOperations must be configured!");
    }

这里的super指是RepositoryFactoryBeanSupport,spring data jpa启动整体代理接口注入到spring容器中也是通过这个类做的,其中spring data jpa 对应的类是JpaRepositoryFactoryBean

#其中RepositoryFactoryBeanSupport类中的下面这段代码比较核心
public void afterPropertiesSet() {
        #getRepositoryMetadata是获取这个接口的元数据,就是这个接口的一些基本的属
        this.repositoryMetadata = this.factory.getRepositoryMetadata(repositoryInterface);
        #这个方法是生成这个接口的实现类
        this.repository = Lazy.of(() -> this.factory.getRepository(repositoryInterface, repositoryFragmentsToUse));
        #如果不是懒加载立即初始化
        if (!lazyInit) {
            this.repository.get();
        }
    }

接着我们分析上面的核心代码getRepository这个方法

public <T> T getRepository(Class<T> repositoryInterface, RepositoryFragments fragments) {

        Assert.notNull(repositoryInterface, "Repository interface must not be null!");
        Assert.notNull(fragments, "RepositoryFragments must not be null!");

        RepositoryMetadata metadata = getRepositoryMetadata(repositoryInterface);
        RepositoryComposition composition = getRepositoryComposition(metadata, fragments);
        RepositoryInformation information = getRepositoryInformation(metadata, composition);

        validate(information, composition);

        //这里是构造一个对象,然后填充一些这个接口对应的entity信息,比如索引名,类型名等
        Object target = getTargetRepository(information);

        // Create proxy 接下来通过代理工厂创建代理
        ProxyFactory result = new ProxyFactory();
        result.setTarget(target);
        result.setInterfaces(repositoryInterface, Repository.class, TransactionalProxy.class);

        result.addAdvice(SurroundingTransactionDetectorMethodInterceptor.INSTANCE);
        result.addAdvisor(ExposeInvocationInterceptor.ADVISOR);

        postProcessors.forEach(processor -> processor.postProcess(result, information));
    
        result.addAdvice(new DefaultMethodInvokingMethodInterceptor());
        //这个DefaultMethodInvokingMethodInterceptor
        //会为所有我们在repository接口中自定义的方法加上切面
        result.addAdvice(new QueryExecutorMethodInterceptor(information));

        composition = composition.append(RepositoryFragment.implemented(target));
        result.addAdvice(new ImplementationMethodExecutionInterceptor(composition));
        //生产代理类返回注入到spring容器
        return (T) result.getProxy(classLoader);
    }

接下来我们看看QueryExecutorMethodInterceptor类,主要是resolveQuery这个方法

this.queries = lookupStrategy.map(it -> {

                SpelAwareProxyProjectionFactory factory = new SpelAwareProxyProjectionFactory();
                factory.setBeanClassLoader(classLoader);
                factory.setBeanFactory(beanFactory);

                return repositoryInformation.getQueryMethods().stream()//
                        .map(method -> Pair.of(method, it.resolveQuery(method, repositoryInformation, factory, namedQueries)))//
                        .peek(pair -> invokeListeners(pair.getSecond()))//
                        .collect(Pair.toMap());

            }).orElse(Collections.emptyMap());

resolveQuery 中会为每一个方法创建一个ElasticsearchPartQuery,其中ElasticsearchPartQuery是自定义操作实现的核心类,我们看一下代码

    //构建类的时候会创建一个PartTree主要用来描述这个方法是什么方法,有没有分页等到
    private final PartTree tree;
    private final MappingContext<?, ElasticsearchPersistentProperty> mappingContext;

    public ElasticsearchPartQuery(ElasticsearchQueryMethod method, ElasticsearchOperations elasticsearchOperations) {
        super(method, elasticsearchOperations);
        this.tree = new PartTree(method.getName(), method.getEntityInformation().getJavaType());
        this.mappingContext = elasticsearchOperations.getElasticsearchConverter().getMappingContext();
    }

    //这个是在方法运行时调用时通过切面到这里然后转换成elasticsearch的查询接口
    @Override
    public Object execute(Object[] parameters) {
        ParametersParameterAccessor accessor = new ParametersParameterAccessor(queryMethod.getParameters(), parameters);
        CriteriaQuery query = createQuery(accessor);
        if(tree.isDelete()) {
            Object result = countOrGetDocumentsForDelete(query, accessor);
            elasticsearchOperations.delete(query, queryMethod.getEntityInformation().getJavaType());
            return result;
        } else if (queryMethod.isPageQuery()) {
            query.setPageable(accessor.getPageable());
            return elasticsearchOperations.queryForPage(query, queryMethod.getEntityInformation().getJavaType());
        } else if (queryMethod.isStreamQuery()) {
            Class<?> entityType = queryMethod.getEntityInformation().getJavaType();
            if (query.getPageable().isUnpaged()) {
                int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
                query.setPageable(PageRequest.of(0, Math.max(1, itemCount)));
            }

            return StreamUtils.createStreamFromIterator((CloseableIterator<Object>) elasticsearchOperations.stream(query, entityType));

        } else if (queryMethod.isCollectionQuery()) {
            if (accessor.getPageable() == null) {
                int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
                query.setPageable(PageRequest.of(0, Math.max(1, itemCount)));
            } else {
                query.setPageable(accessor.getPageable());
            }
            return elasticsearchOperations.queryForList(query, queryMethod.getEntityInformation().getJavaType());
        } else if (tree.isCountProjection()) {
            return elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
        }
        return elasticsearchOperations.queryForObject(query, queryMethod.getEntityInformation().getJavaType());
    }

    

可以看到上面那个execute方法中主要有个ElasticsearchOperations类,负责调用es的方法,因为我们的List<User> findByName(String name)接口返回的是一个List,如果我们没有设置分页接口,这里会填充Unpaged.INSTANCE也就是表示客户端没有设置分页,es服务端会有默认填充。

image.png

然后逻辑走到了elasticsearchOperations.queryForList这个方法,这个方法调用的是queryForPage方法。这样整个流程我们就走通了,我们也看到了为什么我们写的es repository接口默认返回的是10条数据,所以如果想用这个接口去做大批量数据查询的话是会出现问题的,需要自己实现原生的接口,用scroll方式去拉取数据。其中deleteByName方法用按同样方式分析。

2.userRepository.save(User user)接口分析
public <S extends T> S save(S entity) {
        Assert.notNull(entity, "Cannot save 'null' entity.");
        elasticsearchOperations.index(createIndexQuery(entity));
        elasticsearchOperations.refresh(entityInformation.getIndexName());
        return entity;
    }

    /**
     * Index an object. Will do save or update
     *
     * @param query
     * @return returns the document id
     */
    String index(IndexQuery query);

    @Override
    public String index(IndexQuery query) {
        String documentId = prepareIndex(query).execute().actionGet().getId();
        // We should call this because we are not going through a mapper.
        if (query.getObject() != null) {
            setPersistentEntityId(query.getObject(), documentId);
        }
        return documentId;
    }

    /**
     * refresh the index
     *
     * @param indexName
     *
     */
    void refresh(String indexName);

    @Override
    public void refresh(String indexName) {
        Assert.notNull(indexName, "No index defined for refresh()");
        client.admin().indices().refresh(refreshRequest(indexName)).actionGet();
    }

可以看到在save entity的时候做了两件事情,首先是index,也就是这条数据会存到es,但是此时我们查询是不能查到的,因为查询到es数据需要建立对应的倒排索引,index只是把数据放到es中的buffer中,还没有建立倒排索引所以index后是不能立即搜索到es中对应的数据的,然后又调用了refresh方法,这个方法会触发es去清空buffer 并写入文件系统缓存,也就是会为我们的索引文件建立倒排索引,此时就可以搜索到了。所以save这个接口性能是很差的,比起mysql单挑插入,我这边测试了,性能下降一半,大概是300ms,服务器上面可能性能好一点,速度快一点;es插入最好调用批量的bulk接口

public <S extends T> List<S> save(List<S> entities) {
        Assert.notNull(entities, "Cannot insert 'null' as a List.");
        Assert.notEmpty(entities, "Cannot insert empty List.");
        List<IndexQuery> queries = new ArrayList<>();
        for (S s : entities) {
            queries.add(createIndexQuery(s));
        }
        elasticsearchOperations.bulkIndex(queries);
        elasticsearchOperations.refresh(entityInformation.getIndexName());
        return entities;
    }`

可以看到这里批量插入最后只调1次refresh,refresh操作会清空es的buffer,并且写入到文件系统缓存,这个操作开销还是比较大的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容