(nacos源码系列)springBoot下实现http请求的异步长轮询—AsyncContext

  1. AsyncContext介绍
    1.1 概念
    1.2 疑问
  2. 项目实战
    2.1 API方法
    2.2 项目实战—实现配置更新
  3. nacos实现长轮询

1. AsyncContext介绍

有这么一个场景:客户端轮询的去服务器读取更新的配置信息,但读取的频率高就会影响客户端和服务器的性能。那么如何优化这种场景呢?

1.1 概念

SpringBoot集成了servlet一系列的操作,故servlet提供的新特性,在SpringBoot环境中可以正常使用。

而servlet3.0提供了异步处理的特性【AsyncContext】:使Servlet线程不再一直阻塞,直到业务处理完成才能输出响应,最后结束Servlet线程。而是在接受到请求之后,Servlet线程可以将耗时的操作委派给一个异步线程执行,而Servlet线程在不生成响应的情况下返回容器。这种方案可以大大减少服务器资源占用,提高并发处理速度。

1.2 疑问

servlet线程返回容器,异步线程处理完耗时的操作后,还可以设置响应对象进行返回吗?

可以!服务端的servlet线程返回容器,客户端依旧和服务端保持着http连接,通过AysncContext依旧可以返回响应数据

image.png

2. 项目实战

2.1 API方法

在Servlet3.0中,ServletRequest提供了startAysnc()方法。

   /**
     * @see AsyncContext#dispatch()
     * @since Servlet 3.0
     */
    public AsyncContext startAsync() throws IllegalStateException;
 
    /**
     *
     * @since Servlet 3.0
     */
    public AsyncContext startAsync(ServletRequest servletRequest,
                                   ServletResponse servletResponse)
            throws IllegalStateException;

2.2 项目实战—实现配置更新

http层面实现的推拉结合的配置更新。

在controller层获取到HttpServletRequest和HttpServletResponse对象。

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;

@RestController
@RequestMapping
public class LongPollingController {
    //定时任务,阻塞的最大超时时间。
    private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);

    private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d")
            .build();
    private static final Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create());


    @GetMapping(path = "getConf")
    @ResponseBody
    public String getThreadPoolConf(HttpServletRequest request, HttpServletResponse response)
        String serviceName = request.getParameter("serviceName");
        String timeOut =request.getParameter("timeOut");
        if (timeOut == null) {
            //如果没设置过期时间则默认 29秒超时
            timeOut = "29";
        }
        // 开启异步
        AsyncContext asyncContext = request.startAsync(request, response);
        AsyncTask asyncTask = new AsyncTask(asyncContext, true);

        // 维护 serviceName 和异步请求上下文的关联
        dataIdContext.put(serviceName, asyncTask);

        // 启动定时器,30s 后写入 304 响应
        timeoutChecker.schedule(() -> {
            //触发定时后,判断任务是否被执行,即isTimeout为true(没有被执行)
           //则返回客户端304的状态码-即无修改。
            if (asyncTask.isTimeout()) {
                //清除缓存中的任务
                if (dataIdContext.remove(serviceName, asyncTask)) {
                    response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
                    asyncTask.getAsyncContext().complete();
                }
            }
        }, Integer.parseInt(timeOut), TimeUnit.SECONDS);
    }


    //当配置被修改,触发事件后,会调用该方法。
    public void publishConfig(String serviceName,  List<ThreadPoolProperties> result) {
        if (StringUtil.isEmpty(serviceName) && CollectionUtils.isEmpty(result)) {
            log.error("publishConfig:serviceName:result all is null");
            return;
        }
        if (StringUtil.isNotEmpty(serviceName) && CollectionUtils.isEmpty(result)) {
            log.error("publishConfig result is null");
            //去持久化的存储源(es/mysql...)读取配置信息
            result = threadPoolService.getConfigByServiceName(serviceName);
            if (result == null) {
                log.error("publishConfig:serviceName:result all is null but not find threadPoolProperties serviceName:{}", serviceName);
                return;
            }
        }
        //读取到的配置信息,json化
        String configInfo = JSONArray.toJSONString(result);
        //移除AsyncTask的缓存
        Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(serviceName);
        if (CollectionUtils.isEmpty(asyncTasks)) return;
        //为每一个AsyncContext设置200的状态码以及响应数据。
        for (AsyncTask asyncTask : asyncTasks) {
            //表明未超时,已经进行处理了。
            asyncTask.setTimeout(false);
            HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
            response.setStatus(HttpServletResponse.SC_OK);
            response.getWriter().println(configInfo);
            asyncTask.getAsyncContext().complete();
        }
    }

    //自定义任务对象
    @Data
    private static class AsyncTask {
        // 长轮询请求的上下文,包含请求和响应体
        private AsyncContext asyncContext;
        // 超时标记
        private boolean timeout;

        public AsyncTask(AsyncContext asyncContext, boolean timeout) {
            this.asyncContext = asyncContext;
            this.timeout = timeout;
        }
    }
}

上面代码实现的逻辑:客户端拉取服务端的配置,服务端开启长轮询,使用AysncContext进行异步阻塞。当服务端有配置被改变时,(例如:由redis的订阅发布模式)来进行通知,最终执行publishConfig方法,将服务器的配置推送给客户端,其本质实现的是http推拉结合的配置通知。


  • startAsync()会直接利用原有的请求与响应对象来创建AsyncContext
  • startAsync(ServletRequest request,ServletResponse response)可以传入自行创建的请求、响应封装对象;

注意事项:

  1. 可以通过AsyncContext的getRequest()、getResponse()方法取得请求、响应对象,此次对客户端的响应将暂缓至调用AsyncContext的complete()或dispatch()方法为止,前者表示响应完成,后者表示将调派指定的URL进行响应。
  2. AysncContext.setTimeout()的超时时间不准,所以需要自己控制。
  3. 客户端的http并未断开连接。
  4. 第一次客户端请求,可以将全量的数据直接发生(普通连接),后续的请求,可以使用AysncContext来实现长轮询。

3. nacos实现长轮询

项目源码:nacos也是借助AysncContext来实现配置的更新。

@Service
public class LongPollingService extends AbstractEventListener {

    private static final int FIXED_POLLING_INTERVAL_MS = 10000;

    private static final int SAMPLE_PERIOD = 100;

    private static final int SAMPLE_TIMES = 3;

    private static final String TRUE_STR = "true";

    private Map<String, Long> retainIps = new ConcurrentHashMap<String, Long>();

    private static boolean isFixedPolling() {
        return SwitchService.getSwitchBoolean(SwitchService.FIXED_POLLING, false);
    }

    //......

    static public boolean isSupportLongPolling(HttpServletRequest req) {
        return null != req.getHeader(LONG_POLLING_HEADER);
    }

    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
                                     int probeRequestSize) {

        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        /**
         * 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动  add delay time for LoadBalance
         */
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
            // do nothing but set fix polling timeout
        } else {
            long start = System.currentTimeMillis();
            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                    System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
                    clientMd5Map.size(), probeRequestSize, changedGroups.size());
                return;
            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
                return;
            }
        }
        String ip = RequestUtil.getRemoteIp(req);
        // 一定要由HTTP线程调用,否则离开后容器会立即发送响应
        final AsyncContext asyncContext = req.startAsync();
        // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
        asyncContext.setTimeout(0L);

        scheduler.execute(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    }

    //......
}

分析:LongPollingService的isSupportLongPolling是通过判断request是否有LONG_POLLING_HEADER的header来实现的;addLongPollingClient方法主要是创建ClientLongPolling,然后提交到scheduler定时线程池执行。

4. 文章参考

AsyncContext异步请求的用法

聊聊nacos config的doPollingConfig

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容