Elasticsearch源码分析-架构设计之Action

1 请求URL 与对应Action

如果我们发送如下的http请求,elasticsearch是如何匹配对应Action的呢?

curl -XGET 'http://elasticsearch_ip:port/xxx/url/path' -d '{
    "请求的content"
}'

在org.elasticsearch.rest.action包下面,存放了处理各种请求的rest action类
在每个action的构造方法中,会将当前action对象注册到指定的url上

public class RestXxxAction extends BaseRestHandler {

    @Inject
    public RestXxxAction(Settings settings, RestController controller, Client client) {
        super(settings, controller, client);
        controller.registerHandler(GET, "/xxx/url/path", this);
        controller.registerHandler(POST, "/xxx/url/path", this);
    }
}

在registerHandler()方法中,会将url path和对应handler添加到http method对象上

public class RestController extends AbstractLifecycleComponent<RestController> {
    public void registerHandler(RestRequest.Method method, String path, RestHandler handler) {
        switch (method) {
            case GET:
                getHandlers.insert(path, handler);
                break;
            case DELETE:
                deleteHandlers.insert(path, handler);
                break;
            case POST:
                postHandlers.insert(path, handler);
                break;
            case PUT:
                putHandlers.insert(path, handler);
                break;
            case OPTIONS:
                optionsHandlers.insert(path, handler);
                break;
            case HEAD:
                headHandlers.insert(path, handler);
                break;
            default:
                throw new ElasticsearchIllegalArgumentException("Can't handle [" + method + "] for path [" + path + "]");
        }
    }
}

elasticsearch使用HttpRequestHandler接收http请求消息,最终会在executeHandler()方法中调用getHandler()方法获取path对应的handler

public class RestController extends AbstractLifecycleComponent<RestController> {
    void executeHandler(RestRequest request, RestChannel channel) throws Exception {
        final RestHandler handler = getHandler(request);
        if (handler != null) {
            handler.handleRequest(request, channel);
        } else {
            if (request.method() == RestRequest.Method.OPTIONS) {
                channel.sendResponse(new BytesRestResponse(OK));
            } else {
                channel.sendResponse(new BytesRestResponse(BAD_REQUEST, 
  "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"));
            }
        }
    }
}

在getHandler()方法中,主要是根据method和path获取在RestXxxAction中添加的handler

public class RestController extends AbstractLifecycleComponent<RestController> {
    private RestHandler getHandler(RestRequest request) {
        String path = getPath(request);
        RestRequest.Method method = request.method();
        if (method == RestRequest.Method.GET) {
            return getHandlers.retrieve(path, request.params());
        } else if (method == RestRequest.Method.POST) {
            return postHandlers.retrieve(path, request.params());
        } else if (method == RestRequest.Method.PUT) {
            return putHandlers.retrieve(path, request.params());
        } else if (method == RestRequest.Method.DELETE) {
            return deleteHandlers.retrieve(path, request.params());
        } else if (method == RestRequest.Method.HEAD) {
            return headHandlers.retrieve(path, request.params());
        } else if (method == RestRequest.Method.OPTIONS) {
            return optionsHandlers.retrieve(path, request.params());
        } else {
            return null;
        }
    }
}

elasticsearch在path对应获取Handler后,就执行该handler的handleRequest()方法,作为rest逻辑入口

2 Action与对应TransportAction

在ActionModule中,会将Action和对应的TransportAction注册到actions对象中

public class ActionModule extends AbstractModule {
    @Override
    protected void configure() {
        registerAction(XxxAction.INSTANCE, TransportXxxAction.class);
    }

    public <Request extends ActionRequest, Response extends ActionResponse> void registerAction(
                        GenericAction<Request, 
                        Response> action, Class<? extends TransportAction<Request, Response>> transportAction, 
                        Class... supportTransportActions) {
        actions.put(action.name(), new ActionEntry<>(action, transportAction, supportTransportActions));
    }
}

在RestXxxAction中,最终会调用AbstractClient的xxx方法,并传入Action参数为XxxAction.INSTANCE,即为在ActionModule中注册的Action类

public abstract class AbstractClient implements Client {
    @Override
    public void xxx(final SuggestRequest request, final ActionListener<SuggestResponse> listener) {
        execute(XxxAction.INSTANCE, request, listener);
    }
}

在NodeClient中会根据传入的Action参数从actions获取在ActionModule中注册的TransportAction,并执行其execute()方法

public class NodeClient extends AbstractClient {
    @Override
    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {
        headers.applyTo(request);
        TransportAction<Request, Response> transportAction = actions.get((ClientAction)action); 
        transportAction.execute(request, listener);
    }
}

在TransportAction类图个execute()方法中可以看出:
① TransportXxxAction都继承自TransportAction类
② TransportAction的execute()方法仅调用了doExecute()方法
④ TransportXxxAction重写了TransportAction的doExecute()方法
因此,在最终的对应关系为在ActionModule中注册的Action调用的为TransportXxxAction的doExecute()方法

TransportAction类图

TransportAction的execute()方法仅调用了需要子类重写的抽象方法doExecute()

public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
    
    protected abstract void doExecute(Request request, ActionListener<Response> listener);

    public final void execute(Request request, ActionListener<Response> listener) {
        // ...
        if (filters.length == 0) {
            try {
                // TransportAction 子类都要重写这个方法
                doExecute(request, listener);
            } catch(Throwable t) {
                logger.trace("Error during transport action execution.", t);
                listener.onFailure(t);
            }
        } else {
            RequestFilterChain requestFilterChain = new RequestFilterChain<>(this, logger);
            requestFilterChain.proceed(actionName, request, listener);
        }
    }
}

3 节点间用Action匹配Handler

如果elasticsearch要请求的节点不是当前节点,则需要将请求发送到对应的节点上执行
在TransportXxxAction中会将ACTION和对应的Handler注册到对应的serverHandlers对象中,在使用transportService发送请求后,MessageChannelHandler.messageReceived()会接受到信息,然后在handleRequest()方法中获取注册的Handler,执行其messageReceived()方法或者交给线程池处理

public class TransportXxxAction {
    private final TransportService transportService;
    private final SearchService searchService;

    @Inject
    public TransportXxxAction(Settings settings, 
                        ThreadPool threadPool, 
                        TransportService transportService, 
                        ClusterService clusterService, 
                        XxxService xxxService) {
        this.transportService = transportService;
        transportService.registerHandler(XXX_ACTION_NAME, new XxxTransportHandler());
        this.xxxService= xxxService;
    }

    public void executeXxx(DiscoveryNode node, final XxxTransportRequest request, final XxxListener listener) {
        // 如果shard所在的节点id和当前节点id相同
        if (clusterService.state().nodes().localNodeId().equals(node.id())) {
            // 当前节点执行逻辑
            XxxService.execute(request);
        } else {
            // shard 所在节点不是当前节点, 发送请求执行远程节点搜索
            // node是要发送的节点
            transportService.sendRequest(node, XXX_ACTION_NAME, request, new BaseTransportResponseHandler() {
                @Override
                public QueryXxxResult newInstance() {
                    return new QueryXxxResult();
                }
                @Override
                public void handleResponse(XxxProvider response) {
                    listener.onResult(response);
                }
                @Override
                public void handleException(TransportException exp) {
                    listener.onFailure(exp);
                }
                @Override
                public String executor() {
                    return ThreadPool.Names.SAME;
                }
            });
        }
    }

    private class XxxTransportHandler extends BaseTransportRequestHandler {
        /**
         * 接收远程端口的tcp 请求, 执行Xxx 逻辑
         * @param request   TransportRequest
         * @param channel   TransportChannel
         * @throws Exception    Exception
         */
        @Override
        public void messageReceived(TransportRequest request, TransportChannel channel) throws Exception {
            // 远程节点待执行逻辑
            XxxProvider result = XxxService.execute(request);
            // 将Query结果响应发送给调用节点
            channel.sendResponse(result);
        }

        @Override
        public String executor() {
            return ThreadPool.Names.Xxx;
        }
    }
}

在transportServiceAdapter.handler()方法中,即根据注册的action获取对应的TransportHandler,如果executor是same则执行handler的messageReceived()方法,否则交给线程池执行

public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
    protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
        final String action = buffer.readString();
        transportServiceAdapter.onRequestReceived(requestId, action);
        final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, profileName);
        try {
            final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);
            if (handler == null) {
                throw new ActionNotFoundTransportException(action);
            }
            final TransportRequest request = handler.newInstance();
            request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
            request.readFrom(buffer);
            if (ThreadPool.Names.SAME.equals(handler.executor())) {
                //noinspection unchecked
                handler.messageReceived(request, transportChannel);
            } else {
                threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
            }
        } catch (Throwable e) {
            try {
                transportChannel.sendResponse(e);
            } catch (IOException e1) {
                logger.warn("Failed to send error message back to client for action [" + action + "]", e);
                logger.warn("Actual Exception", e1);
            }
        }
        return action;
    }
}

下面讲另外一种情况,在ActionModule中注册的Action也都会注册到handler中

public class TransportXxxAction extends HandledTransportAction {
    @Inject
    public TransportXxxAction (Settings settings, ThreadPool threadPool) {
        super(settings, XxxAction.NAME, threadPool);
}

在TransportAction类中同样使用transportService.registerHandler()方法注册handler

public abstract class HandledTransportAction<Request extends ActionRequest, Response extends ActionResponse> extends TransportAction<Request,Response>{
    protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters){
        super(settings, actionName, threadPool, actionFilters);
        transportService.registerHandler(actionName, new TransportHandler() {
            @Override
            public Request newInstance(){
                return newRequestInstance();
            }
        });
    }
}

因此在InternalTransportClient中,主要是回调proxy.execute()

public class InternalTransportClient extends AbstractClient {
   @Inject
    public InternalTransportClient(
                                   TransportClientNodesService nodesService, 
                                   InternalTransportAdminClient,
                                   Map<String, GenericAction> actions, 
                                   Headers headers) {
        MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();
        for (GenericAction action : actions.values()) {
            if (action instanceof Action) {
                actionsBuilder.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
            }
        }
        this.actions = actionsBuilder.immutableMap();
    }
    @Override
    public void execute(final Action action, final Request request, ActionListener listener) {
        headers.applyTo(request);
        final TransportActionNodeProxy<Request, Response> proxy = actions.get(action);
        nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
            @Override
            public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
                proxy.execute(node, request, listener);
            }
        }, listener);
    }
}

transportService.sendRequest()中的action.name()即为Action的Name参数,已被注册到对应的handler中了

public class TransportActionNodeProxy extends AbstractComponent {
    public void execute(DiscoveryNode node, 
                            final Request request, 
                            final ActionListener<Response> listener) {
        ActionRequestValidationException validationException = request.validate();
        if (validationException != null) {
            listener.onFailure(validationException);
            return;
        }
        transportService.sendRequest(node, action.name(), 
                            request, transportOptions, 
                            new BaseTransportResponseHandler<Response>() {
            @Override
            public Response newInstance() {
                return action.newResponse();
            }

            @Override
            public String executor() {
                if (request.listenerThreaded()) {
                    return ThreadPool.Names.LISTENER;
                }
                return ThreadPool.Names.SAME;
            }

            @Override
            public void handleResponse(Response response) {
                listener.onResponse(response);
            }

            @Override
            public void handleException(TransportException exp) {
                listener.onFailure(exp);
            }
        });
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335

推荐阅读更多精彩内容