Server-Sent Events

什么是 SSE

我们先看一看一个标准的 HTTP Request/Response 的过程。

  • 客户端与服务器建立连接,发送 HTTP request 给服务器
  • 服务器收到客户端的 HTTP request,发送 HTTP response 给客户端
  • 完成 HTTP Request/Response 过程,断开连接

从上述流程可以看出,总是客户端主动发起数据请求,而服务器只能被动接受并发送请求数据。

现在我们再看一看 SSE 有什么不同。

  • 客户端与服务器建立连接,监听服务器事件
  • 服务器准备数据,准备完成后异步推送数据到客户端
  • 客户端监听到服务器发送数据事件,接受数据,处理数据,并继续监听服务器事件

很明显,决定发送数据时机的不再是客户端,而是服务器。

每次当服务器打算发送数据时,就会产生一个 Data Event,并发送给客户端,所以这种机制,我们又称为 SSE(Server Sent Events)。

注意

还有其他技术也支持 Server-to-Client 通信。

轮询「Poolling」

客户端向服务器重复发送的请求。如果服务器有要发送的数据,就发送数据,若没有,就发送一个标志符,告诉客户端没有新的数据。不管怎样,服务器都会关闭连接。然后客户端在经过一小段时间(比如 1 秒)后再次向服务器发起新的请求。

长轮询「Long-Poolling」

长轮询与轮询的区别在于,服务器一定会在发送完数据后再断开连接,也就是说服务器与客户端建立连接时若还没有数据需要发送,服务器不会直接关闭连接,而是等数据准备就绪并发送完成后才关闭连接。

服务器发送事件「Server-Sent Events」

SSE 和长轮询很相似,但 SSE 不是一次连接就发送一条数据(message),而是会一直保留连接,重用这一次连接发送多条数据(events)。另外,SSE 还定义了一个专用的 MIME 类型 text/event-stream,用来描述服务器发送给客户端的 Events 的格式。SSE 还提供了一个 JavaScript Client API

WebSocket

WebSocket 提供了真正的全双工连接「Full Duplex Connection」
实现过程如下:

  • 客户端发送一个特殊的 HTTP Header 给服务器,告诉服务器需要将 HTTP 连接升级到全双工 TCP/IP WebSocket 连接
  • 如果服务器支持 WebSocket 的话,就可以建立 WebSocket 连接
  • 建立连接之后,服务器和客户端随时都可以发送数据给对方

什么时候使用 SSE

SSE 实际上提供了一种「单向发布/订阅模型」(One-Way publish-Subscribe Model)的解决方案。

  • 客户端订阅服务器的消息
  • 服务器一旦有新消息,就发布给所有已订阅的客户端

一个不错的应用场景就是一个交换消息的 RESTful service,简单来说就是多人聊天服务。

服务器:使用 Java 实现 SSE

Message 类

首先声明一个简单的 Message 类,代表交互的消息。

属性

  • id 「消息的标志符」
  • message 「消息的内容」

这里只是简单定义了 id 和 message 属性,实际上还可以添加时间戳、发送人、IP地址等属性。

public class Message {
    private long id;
    private String message;

    public Message(long id, String message) {
        this.id = id;
        this.message = message;
    }

    public long getId() {
        return id;
    }

    public String getMessage() {
        return message;
    }
}

Chat Servlet

接下来,我们来写一个 Servlet,来处理客户端请求,并返回响应内容。

属性

属性 描述
counter 为客户端的每一个连接生成唯一的id
running 终止线程
asyncContexts 存储所有从浏览器打开的连接
messageQueue 消息队列
messageStore 消息储存
@WebServlet("/chat")
public class ChatServlet extends HttpServlet {
    private AtomicLong counter = new AtomicLong();
    private boolean running;

    // 保留所有的连接
    private Map<String, AsyncContext> asyncContexts = new ConcurrentHashMap<>();

    // 消息队列
    private BlockingQueue<Message> messageQueue = new LinkedBlockingDeque<>();

    // 存储消息
    private List<Message> messageStore = new CopyOnWriteArrayList<>();
}

这里简化了数据存储,只是把数据存储到内存中。
实际上应该存储到数据库中,这样的话,
counter 可以由数据库自动生成,
messageStore 也不必存储数据。

线程

我们接着再 ChatServlet 中创建一个线程,用来

  • 从消息队列中取出消息
  • 存储消息
  • 转发消息给所有的客户端
// 监听消息,分发消息
private Thread notifier = new Thread(() -> {
    while (running) {
        try {
            // 从消息队列中提取消息(若无消息,阻塞)
            Message message = messageQueue.take();

            // 存储消息
            messageStore.add(message);

            // 最多存储 100 条消息
            if (messageStore.size() > 100) {
                messageStore.remove(0);
            }

            // 发送消息给所有的客户端
            for (AsyncContext asyncContext : asyncContexts.values()) {
                try {
                    sendMeassage(asyncContext.getResponse().getWriter(), message);
                } catch (IOException e) {
                    // 出现异常时移除出错的客户端
                    asyncContexts.values().remove(asyncContext);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});

初始化

重载 init 方法,启动线程 notifier

@Override
public void init(ServletConfig config) throws ServletException {
    super.init(config);

    // TODO: 从数据库中加载消息(100条)
    // messageStore.addAll(db.loadMessages(100));

    // 启动线程
    running = true;
    notifier.start();
}

如果使用数据库存储消息的话,还需要先从数据库中加载数据。

处理用户发送的消息

用户发送过来的消息是 form 表单里的 post 数据,所以这里重载 doPost 方法来处理用户输入的消息

  • 验证消息非空
  • 存储消息
/**
 * 接受客户端的消息「调用 AJAX」
 * 验证消息,保存到数据库
 * 将消息放入消息队列
 *
 * @param request
 * @param resp
 * @throws ServletException
 * @throws IOException
 */
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse resp) throws ServletException, IOException {

    // 设置字符编码——最好放在 Filter 中
    request.setCharacterEncoding("UTF-8");

    // 获取消息
    String message = request.getParameter("msg");

    // 验证消息,并将消息存储到数据库中
    if (message != null && !message.trim().isEmpty()) {
        try {
            // 清屏命令
            if (message.equals("#clear")) {
                messageQueue.clear();
                messageStore.clear();
                return;
            }

            // TODO:保存消息到数据库

            // 创建一条消息
            Message msg = new Message(counter.incrementAndGet(), message);
            
            // 放入消息队列
            messageQueue.put(msg);
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }
}

发送消息给所有用户

首先封装一个发送 SSE 格式消息的方法。

/**
 * 发送 SSE 格式的消息到客户端
 *
 * @param writer  输出流,可以向客户端写字符
 * @param message 发送的消息
 */
private void sendMessage(PrintWriter writer, Message message) {
    writer.print("id: ");
    writer.println(message.getId());
    writer.print("data: ");
    writer.println(message.getMessage());
    writer.println();
    writer.flush();
}

然后重载 doGet 方法,因为用户会通过 HTTP GET 方法监听 SSE 事件。

  • 根据是否设置 Last-Event-ID 标头判断用户是否第一次加入聊天室
    • 第一次加入聊天室就构造一条欢迎消息
    • 否则,从 messageStore 中取出一条消息,发送给用户
  • 保存请求的异步操作上下文(即客户端连接)
    • 如果出现异常(掉线、超时、关闭),移除这次连接
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
    // 如果用户从主页跳转过来,跳到 /chat.jsp
    if (request.getAttribute("index") != null) {
        request.setAttribute("message", messageStore);
        request.getRequestDispatcher("/chat.jsp").forward(request, response);
        return;
    }

    // 如果客户端在监听 SSE 请求
    if ("text/event-stream".equals(request.getHeader("Accept"))) {

        // 异步请求(Tomcat 特定属性)
        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);

        // 设置 Header
        response.setContentType("text/event-stream");
        response.setHeader("Cache-Control", "no-cache");
        response.setHeader("Connection", "keep-alive");
        response.setCharacterEncoding("UTF-8");

        // 解析 Last-Event-ID
        String lastMessageId = request.getHeader("Last-Event-ID");
        if (lastMessageId != null && !lastMessageId.trim().isEmpty()) {
            long lastId = 0;
            try {
                lastId = Long.parseLong(lastMessageId);
            } catch (NumberFormatException e) {
                // 因为 lastId 有默认值,不执行任何操作
            }
            if (lastId > 0) {
                // 发送还没发送的所有消息
                for (Message message : messageStore) {
                    if (message.getId() > lastId) {
                        sendMessage(response.getWriter(), message);
                    }
                }
            }
        } else {
            long lastId = 0;
            try {
                lastId = messageStore.get(messageStore.size() - 1).getId();
            } catch (Exception ignored) {
                // 不执行任何操作
            }
            if (lastId > 0) {
                // 使用 lastId 发送消息
                // 如果浏览器连接失败,1s 后重试
                response.getWriter().println("retry:1000\n");
                Message message = new Message(lastId, "欢迎来到聊天室,输入消息,并按下回车键发送");
                sendMessage(response.getWriter(), message);
            }
        }

        // 生成一些唯一的标志符用来保存 context
        final String id = UUID.randomUUID().toString();

        // 启动异步 context 并且添加监听器移除 context
        AsyncContext asyncContext = request.startAsync();
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                asyncContexts.remove(id);
            }

            @Override
            public void onTimeout(AsyncEvent event) throws IOException {
                asyncContexts.remove(id);
            }

            @Override
            public void onError(AsyncEvent event) throws IOException {
                asyncContexts.remove(id);
            }

            @Override
            public void onStartAsync(AsyncEvent event) throws IOException {
                // 不执行任何操作
            }
        });

        // 添加 context 到 map
        asyncContexts.put(id, asyncContext);
    }
}

关闭服务器

服务器关闭时,终止线程,回收资源。

/**
 * 停止线程,回收资源
 */
@Override
public void destroy() {
    running = false;
    asyncContexts.clear();
    messageQueue.clear();
    messageStore.clear();
}

浏览器:使用 JavaScript 实现 SSE

页面主要结构

<div id="container">

    <div id="chat">
        <%
            // 打印消息列表 - 使用 JSTL 更佳
            List<Message> messages = (List<Message>) request.getAttribute("messages");
            // 检查 message 非空,不然编译报错
            if (messages != null) {
                for (Message msg : messages) { %>
                    <%= msg.getMessage() %><br/>
                <% }
            }
        %>
    </div>

    <form id="msgForm" action="chat" method="post" onsubmit="return sendMsg(this);">
        <input type="text" name="msg" id="msg" placeholder="在这输入消息,回车发送"/>
    </form>

    <p>在上面的输入框中输入消息,按下回车键发送消息。</p>

</div>

订阅 SSE

JavaScript 使用 EventSource 处理 SSE。

  • 构造 EventSource 对象
  • 注册 onmessage 事件监听器
// 检测浏览器对 EventSource 的支持
if (!!window.EventSource) {
    // 监听 SSE 源
    var source = new EventSource('/chat');

    // 收到服务器的消息
    source.onmessage = function (e) {
        var el = document.getElementById("chat");
        el.innerHTML += e.data + "<br/>";

        // 向上滚动一行
        el.scrollTop += 50;
    };
} else {
    alert("你的浏览器不支持 EventSource!");
}

因为 EventSource 的浏览器兼容性问题,需要先进行检测浏览器是否支持。

初始化页面

页面加载完成后,初始化环境。

  • 滚动至消息栏的底部,显示最新的消息
  • 聚焦输入框,方便用户输入
// 滚动聊天框,聚焦输入框
window.onload = function () {
    // 滚动 100 行消息(有的话)
    document.getElementById("chat").scrollTop += 50 * 100;

    // 聚焦输入框
    document.getElementById("msg").focus();
};

发送消息

定义发送消息给服务器的方法 sendMsg

  • 接受一个表单元素作为参数
  • 验证发送的消息非空
  • 解决 XMLHTTPRequest 的兼容性问题
  • 注册获取到服务器数据的事件监听器 onreadystatechange
// 发送消息给服务器
function sendMsg(form) {

    if (form.msg.value.trim() === "") {
        alert("消息为空!");
    }

    // 初始化 XHR 对象
    let http = false;
    if (typeof ActiveXObject !== "undefined") {
        try {
            http = new ActiveXObject("Msxml2.XMLHTTP");
        } catch (ex) {
            try {
                http = new ActiveXObject("Microsoft.XMLHTTP");
            } catch (ex2) {
                http = false;
            }
        }
    } else if (window.XMLHttpRequest) {
        try {
            http = new XMLHttpRequest();
        } catch (ex) {
            http = false;
        }
    }

    // 浏览器不支持 XHR
    if (!http) {
        alert("无法连接服务器!");
        return;
    }

    // 准备数据
    let parameters = "msg=" + encodeURIComponent(form.msg.value.trim());

    http.onreadystatechange = () => {
        if (http.readyState === 4 && http.status === 200) {
            if (typeof http.responseText !== "undefined") {
                let result = http.responseText;
                form.msg.value = "";
            }
        }
    };

    http.open("POST", form.action, true);
    http.setRequestHeader("Content-type", "application/x-www-form-urlencoded");
    http.send(parameters);

    return false;
}    
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,165评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,503评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,295评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,589评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,439评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,342评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,749评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,397评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,700评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,740评论 2 313
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,523评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,364评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,755评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,024评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,297评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,721评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,918评论 2 336

推荐阅读更多精彩内容