SSE 简单实践 - SpringMVC 异步消息处理

警告
本文最后更新于 2023-12-28,文中内容可能已过时。

SSE 简单实践 - SpringMVC 异步消息处理

官方文档: Server-Sent Events

参考博客:Spring Boot 中的 SSE 异常处理:统一处理连接和事件异常

阮一峰大佬的博客:Server-Sent Events 教程 - 阮一峰的网络日志

理论部分

关于 SSE 的理论部分,请看《Web 实时消息推送的 7 种实现方案》的Server-sent events小节

以后有时间了,再深入了解一下HTTP异步消息处理的在Servlet层面的原理。

后端

Spring 的控制器方法默认支持哪些返回值类型,看官方文档:Spring MVC Return Values,其中就专门提到了返回值类型为SseEmitter或者ResponseBodyEmitter的场景,这两个类的作用是发出一个异步对象传输流,用HttpMessageConverter的具体实现将其写入 HTTP 响应。这两个类也支持作为ResponseEntity的主体。参见异步请求HTTP 流

ResponseBodyEmitter用于异步请求处理,其子类SseEmitter专门处理异步消息中的一个特殊场景,即Server-sent events。以后我们有异步请求处理的需求,都可以通过从控制器方法返回SseEmitter来实现。

关于控制器方法不同的返回值,Spring 是如何处理的,请看《RequestMappingHandlerAdapter 源码解析》的返回值处理器小节

SseEmitter的结构也很简单,其父类为ResponseBodyEmitter,我们直接从这个类开始看起。

ResponseBodyEmitter

ResponseBodyEmitter异步请求处理的控制器方法返回值类型,控制器方法返回ResponseBodyEmitter后,我们可以将一个或多个对象被写入ResponseBodyEmitter中,推送到客户端,这就是异步请求处理。

有同样作用的还有DeferredResult,不过DeferredResult用于推送单个对象到客户端,而ResponseBodyEmitter可以用于发送多个对象,注意,这些对象不一定是同一种类型的对象,而且其中每个对象都应该注册相应的HttpMessageConverter,因为最终需要通过HttpMessageConverter将对象写入到响应消息体中。

关于如何创建HttpMessageConverter,并将其注册到 Spring 容器中,请看《HttpMessageConverters 自动配置类源码解析》

ResponseBodyEmitter方法的源码其实没什么可看的,值得注意的是,ResponseBodyEmitter的初始化方法(ResponseBodyEmitter#initialize,ResponseBodyEmitter#initializeWithError)、发送消息的方法(ResponseBodyEmitter#send)、关闭连接的方法(ResponseBodyEmitter#completeResponseBodyEmitter#completeWithError)、甚至异常事件捕获的方法(ResponseBodyEmitter#onTimeoutResponseBodyEmitter#onErrorResponseBodyEmitter#onCompletion),全都添加了synchronized关键字,也就是说,ResponseBodyEmitter对象实例的整个工作周期,都是线程安全的,尤其是发送消息的方法也是线程安全的,这样,我们就可以在多个线程中使用同一个ResponseBodyEmitter实例的 send 方法推送消息到客户端。

ResponseBodyEmitter的使用方式如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@RequestMapping(value="/stream", method=RequestMethod.GET)
public ResponseBodyEmitter handle() {
    ResponseBodyEmitter emitter = new ResponseBodyEmitter();
    // Pass the emitter to another component...
    return emitter;
}

// in another thread
emitter.send(foo1);

// and again
emitter.send(foo2);

// and done
emitter.complete();

SseEmitter - 重点,常用

SseEmitter继承了ResponseBodyEmitter,是 Spring 专门为 Server-Sent Events 实现的异步请求处理的控制器方法返回值类型,

SseEmitter的主要作用,就是简化了ResponseBodyEmitter的使用,将所有异步推送的 HTTP 信息的消息体的格式都统一成UTF-8编码的文本。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
private static final MediaType TEXT_PLAIN = new MediaType("text", "plain", StandardCharsets.UTF_8);

protected void extendResponse(ServerHttpResponse outputMessage) {
    super.extendResponse(outputMessage);

    HttpHeaders headers = outputMessage.getHeaders();
    if (headers.getContentType() == null) {
        // 设置响应消息头中的内容格式为文本流 text/event-stream
        headers.setContentType(MediaType.TEXT_EVENT_STREAM);
    }
}

这样,不管SseEmitter#send方法的SseEmitter.SseEventBuilder的 data 方法参数是什么类型,最终都会被转化为字符串推送到客户端,而 SpringBoot 刚好会自动注入MappingJackson2HttpMessageConverter,可以将任意类型的对象格式化为 JSON 字符串,因此,用户不需要再自定义SseEmitter.SseEventBuilder的 data 方法参数对应类型的HttpMessageConverter的实现类,简化了异步推送消息的过程。

当然,如果你对字符串化的结果有定制化的需求,你也可以自定义SseEmitter.SseEventBuilder的 data 方法参数对应类型的HttpMessageConverter的实现类,然后将其注册为 bean 即可,这个我们在后面都会有实践

关于为什么 SpringBoot 会自动注册MappingJackson2HttpMessageConverter,以及如何手动注册自定义的HttpMessageConverter,具体请看《HttpMessageConverters 自动配置类源码解析》

此外,SseEmitter#send方法也加了锁。保证了线程安全。这样,我们就可以在多个线程中使用同一个SseEmitter实例的SseEmitter#send方法推送消息到客户端,这在后面的本文后面的实践部分也有提到。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public void send(SseEventBuilder builder) throws IOException {
    Set<DataWithMediaType> dataToSend = builder.build();
    // 加锁,线程安全
    this.writeLock.lock();
    try {
        // super 就是 ResponseBodyEmitter
        // 其实 ResponseBodyEmitter 的 send 已经加了所了,这里再加一次,感觉有点多余
        super.send(dataToSend);
    }
    finally {
        this.writeLock.unlock();
    }
}

实际上,SseEmitter#send方法最终调用的是到ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandlersendInternal方法,调用栈如下:

ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandlersendInternal方法的具体细节请看ResponseBodyEmitterReturnValueHandler小节。

SseEmitter#send方法的参数是SseEmitter$SseEventBuilder,这个构造器接口的主要作用是构造一个 SSE 也就是一个服务端发送的事件,这个接口默认只有一个实现类,也在 SseEmitter 内部SseEmitter$SseEventBuilderImpl,我们通过SseEmitter#event方法可以轻松获取这个构造器实现。

这个构造器可以指定一个 SSE 的以下几个字段

  • id:事件 ID

  • name:事件类型,前端 EventSource 对象默认监听三种类型的事件:openerrormessage,我们可以通过在后端指定 SSE 的 name 为这三个类型,来往这三个监听方法发信息。我们也可以通过指定其他 name 来自定义 SSE 的类型。

    具体请看EventSource 对象小节和自定义关闭事件小节

  • reconnectTime:重试时间

  • comment:事件评论

  • data:事件数据,我们可以传入任意类型的对象

    具体请看自定义传递对象小节

这个与HTTP 消息小节提到的 SSE 对应的 HTTP 消息的格式是对应的。

此外,调用SseEmitter实例的 complete 方法可以结束请求,关闭连接,连接一旦关闭,就会触发前端的 error 事件,而前端的EventSource 对象是存在断线重连机制的,如果不调用 EventSource 对象的 close 方法,EventSource 对象自动进行断线重连,持续尝试对后端发起连接。因此,我们可以在监听到 error 事件的时候调用EventSource.close();来关闭前端 EventSource 对象,阻止重试,不过这样就无法在网络出现波动的意外情况下自动重连了,因此,我们还是建议后端发送自定义的关闭事件,来通知客户端关闭连接,这个在后面实践环节也会提到。

SseEmitter实例的 complete 方法实际上继承自ResponseBodyEmitter#complete,最终调用ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandlercomplete方法,这个到ResponseBodyEmitterReturnValueHandler小节再看

ResponseBodyEmitterReturnValueHandler

ResponseBodyEmitterReturnValueHandler的作用就是处理控制器方法返回值类型为ResponseBodyEmitter的情况,所以啊,源码其实应该从这里开始看。

其实ResponseBodyEmitterReturnValueHandler我们之前接触过,在《RequestMappingHandlerAdapter 源码解析》的返回值处理器小节中,提到过这个类,也看到过其他的返回值处理类

返回值处理类的重点是handleReturnValue方法,这里我们简单分析一下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@Override
@SuppressWarnings("resource")
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

    if (returnValue == null) {
        mavContainer.setRequestHandled(true);
        return;
    }

    // 获取原始的响应对象,并封装为 outputMessage
    HttpServletResponse response = webRequest.getNativeResponse(HttpServletResponse.class);
    Assert.state(response != null, "No HttpServletResponse");
    ServerHttpResponse outputMessage = new ServletServerHttpResponse(response);

    // 如果是 ResponseEntity
    if (returnValue instanceof ResponseEntity<?> responseEntity) {
        response.setStatus(responseEntity.getStatusCode().value());
        outputMessage.getHeaders().putAll(responseEntity.getHeaders());
        returnValue = responseEntity.getBody();
        returnType = returnType.nested();
        if (returnValue == null) {
            mavContainer.setRequestHandled(true);
            outputMessage.flush();
            return;
        }
    }

    ServletRequest request = webRequest.getNativeRequest(ServletRequest.class);
    Assert.state(request != null, "No ServletRequest");

    ResponseBodyEmitter emitter;
    if (returnValue instanceof ResponseBodyEmitter responseBodyEmitter) {
        // 如果是 ResponseBodyEmitter 这个类或者其子类,说明还是传统的 servelet API 处理
        emitter = responseBodyEmitter;
    } else {
        // 如果不是 ResponseBodyEmitter 这个类或者其子类,那么就是响应式 API
        emitter = this.reactiveHandler.handleValue(returnValue, returnType, mavContainer, webRequest);
        if (emitter == null) {
            // Not streaming: write headers without committing response..
            outputMessage.getHeaders().forEach((headerName, headerValues) -> {
                for (String headerValue : headerValues) {
                    response.addHeader(headerName, headerValue);
                }
            });
            return;
        }
    }
    // 对 HTTP 响应消息进行拓展,如果是 SseEmitter,此时就会设置响应消息体的格式为文本流 text/event-stream
    emitter.extendResponse(outputMessage);

    // At this point we know we're streaming..
    ShallowEtagHeaderFilter.disableContentCaching(request);

    // 包装响应对象,开始以流的形式进行消息传递
    // Wrap the response to ignore further header changes
    // Headers will be flushed at the first write
    outputMessage = new StreamingServletServerHttpResponse(outputMessage);

    HttpMessageConvertingHandler handler;
    try {
        DeferredResult<?> deferredResult = new DeferredResult<>(emitter.getTimeout());
        // 本质上还是基于 deferredResult 进行异步消息传递
        // getAsyncManager 获取的是 WebAsyncManager,这是异步请求处理的核心
        // startDeferredResultProcessing 方法的作用是,监听异步请求的各种事件,比如请求超时,请求报错,请求完成,并在这些事件发生的时候对 deferredResult 执行相应的操作
        // 这里就不深入理解了
        WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
        handler = new HttpMessageConvertingHandler(outputMessage, deferredResult);
    }
    catch (Throwable ex) {
        emitter.initializeWithError(ex);
        throw ex;
    }

    // 初始化 ResponseBodyEmitter
    emitter.initialize(handler);
}

然后再看内部类ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandler,这个内部类实现了ResponseBodyEmitter.Handler

我们重点看这个内部类的sendInternal方法,简单的源码分析如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// data 就是 SseEmitter.event().id().name().data() 的 data 方法传入的值,也就是我们推送到客户端的对象
// mediaType 是我们希望以什么样的格式来传递 data
private <T> void sendInternal(T data, @Nullable MediaType mediaType) throws IOException {
    // ResponseBodyEmitterReturnValueHandler.this.sseMessageConverters 保存的是当前容器内的所有的 HttpMessageConverter
    for (HttpMessageConverter<?> converter : ResponseBodyEmitterReturnValueHandler.this.sseMessageConverters) {
        // 判断当前 converter 能否将 data 的类型的数据转化为指定的 MediaType 格式
        if (converter.canWrite(data.getClass(), mediaType)) {
            // 如果可以,直接转化,写入到响应中
            ((HttpMessageConverter<T>) converter).write(data, mediaType, this.outputMessage);
            // 然后直接返回,不会再继续查找,所以 converter 的顺序很重要,这话都要说烂了。
            return;
        }
    }
    throw new IllegalArgumentException("No suitable converter for " + data.getClass());
}

类似的分析请看《SpringMVC-ContentNegotiation 内容协商》的简单分析RequestResponseBodyMethodProcessor小节

这个内部方法sendInternal方法,实际上就是将异步推送的数据推送到客户端的地方。

假设我们调用SseEmitter#send方法向客户端推送数据,那么从SseEmitter#send方法到ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandlersendInternal方法的调用栈如下:

此外还有complete方法,简单看一下。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Override
public void complete() {
    try {
        // outputMessage 的类型为 ServerHttpResponse 就是 HTTP 响应对象
        // 执行刷新
        this.outputMessage.flush();
        // setResult 会标志着异步请求完成
        this.deferredResult.setResult(null);
    }
    catch (IOException ex) {
        this.deferredResult.setErrorResult(ex);
    }
}

HTTP 消息

数据格式

服务器向浏览器发送的 SSE 数据,必须是 UTF-8 编码的文本,具有如下的 HTTP 头信息。

1
2
3
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

上面三行之中,第一行的Content-Type必须指定 MIME 类型为event-steam

每一次发送的信息,由若干个message组成,每个message之间用\n\n分隔。每个message内部由若干行组成,每一行都是如下格式。

1
[field]: value\n

上面的field可以取四个值。

  • data
  • event
  • id
  • retry

此外,还可以有冒号开头的行,表示注释。通常,服务器每隔一段时间就会向浏览器发送一个注释,保持连接不中断。

1
: This is a comment

下面是一个例子。

1
2
3
4
5
6
: this is a test stream\n\n

data: some text\n\n

data: another message\n
data: with two lines \n\n

data 字段

数据内容用data字段表示。

1
data:  message\n\n

如果数据很长,可以分成多行,最后一行用\n\n结尾,前面行都用\n结尾。

1
2
data: begin message\n
data: continue message\n\n

下面是一个发送 JSON 数据的例子。

1
2
3
4
data: {\n
data: "foo": "bar",\n
data: "baz", 555\n
data: }\n\n

id 字段

数据标识符用id字段表示,相当于每一条数据的编号。

1
2
id: msg1\n
data: message\n\n

浏览器用lastEventId属性读取这个值。一旦连接断线,浏览器会发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID头信息,将这个值发送回来,用来帮助服务器端重建连接。因此,这个头信息可以被视为一种同步机制。

event 字段

event字段表示自定义的事件类型,默认是message事件。浏览器可以用addEventListener()监听该事件。

1
2
3
4
5
6
7
event: foo\n
data: a foo event\n\n

data: an unnamed event\n\n

event: bar\n
data: a bar event\n\n

上面的代码创造了三条信息。第一条的名字是foo,触发浏览器的foo事件;第二条未取名,表示默认类型,触发浏览器的message事件;第三条是bar,触发浏览器的bar事件。

下面是另一个例子。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
event: userconnect
data: {"username": "bobby", "time": "02:33:48"}

event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}

event: userdisconnect
data: {"username": "bobby", "time": "02:34:23"}

event: usermessage
data: {"username": "sean", "time": "02:34:36", "text": "Bye, bobby."}

retry 字段

服务器可以用retry字段,指定浏览器重新发起连接的时间间隔。

1
retry: 10000\n

两种情况会导致浏览器重新发起连接:一种是时间间隔到期,二是由于网络错误等原因,导致连接出错。

前端

官方文档: Server-Sent Events

EventSource 对象

SSE 的客户端 API 部署在EventSource对象上。下面的代码可以检测浏览器是否支持 SSE。

1
2
3
if ('EventSource' in window) {
  // ...
}

使用 SSE 时,浏览器首先生成一个EventSource实例,向服务器发起连接。

1
var source = new EventSource(url);

上面的url可以与当前网址同域,也可以跨域。跨域时,可以指定第二个参数,打开withCredentials属性,表示是否一起发送 Cookie。

1
var source = new EventSource(url, { withCredentials: true });

EventSource实例的readyState属性,表明连接的当前状态。该属性只读,可以取以下值。

  • 0:相当于常量EventSource.CONNECTING,表示连接还未建立,或者断线正在重连。

  • 1:相当于常量EventSource.OPEN,表示连接已经建立,可以接受数据。

  • 2:相当于常量EventSource.CLOSED,表示连接已断,且不会重连。

基本使用流程

SSE 默认的事件如下:

通道建立事件,连接一旦建立,就会触发open事件,可以在onopen属性定义回调函数。

1
2
3
4
5
6
7
8
source.onopen = function (event) {
  // ...
};

// 另一种写法
source.addEventListener('open', function (event) {
  // ...
}, false);

客户端收到服务器发来的数据,就会触发message事件,可以在onmessage属性的回调函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
source.onmessage = function (event) {
  var data = event.data;
  // handle message
};

// 另一种写法
source.addEventListener('message', function (event) {
  var data = event.data;
  // handle message
}, false);

上面代码中,事件对象的data属性就是服务器端传回的数据(Content-Typetext/event-stream,文本流格式)。

如果发生通信错误(比如连接中断),或者服务端调用 SseEmitter.complete的时候(此方法会结束连接),就会触发error事件,可以在onerror属性定义回调函数。此时如果不调用EventSource对象的 close 方法,这个对象自动进行断线重连,会持续对其 url 发起连接。因此,我们一般是建议通过自定义事件来通知前端关闭EventSource

1
2
3
4
5
6
7
8
source.onerror = function (event) {
  // handle error event
};

// 另一种写法
source.addEventListener('error', function (event) {
  // handle error event
}, false);

close方法用于关闭 SSE 连接。

1
source.close();

默认情况下,服务器发来的数据,总是触发浏览器EventSource实例的message事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发message事件。

总体上来说需要自定义两类事件,用来监听 SSE 的通信阶段

  • 传输业务数据

    我们可以根据业务情况,给一个页面中需要推送的每一种类型的数据都给一个专门的事件名称,这样一个页面只用一个 SSE 通道即可,且不同类型的数据走不同的通道,相互不干扰。

  • 关闭连接

    通知前端关闭 SSE 通道,不要再重试

简单实践

基本使用

首先搭建一个简单的 SpringBoot Web 程序

pom 文件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<project>
    ......
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

配置文件

1
2
3
4
5
6
7
8
server:
  # 当前应用所在的 Web 服务器监听的本地端口
  port: 8080
  servlet:
    # 应用的上下文路径
    context-path: /SSESimple
    # 启用默认的 Servlet
    register-default-servlet: true

在项目的src\main\resources\static目录下创建index.html

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>首页</title>
</head>
<body>
<h1>Hello World</h1>
SSE 测试
<br/>
<a href="./ssepage">测试 SSE </a>
</body>
</html>

然后注册控制器SSEController

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@Controller
public class SSEController {

    @RequestMapping("/ssepage")
    public String ssepage() {
        //设置视图名称
        return "ssepage.html";
    }

}

添加视图页面ssepage.html

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>首页</title>
</head>
<body>
<div>动态刷新:<span id="dynamic_content">还未接收到动态数据!</span></div>
</body>
<script>
    let source = null;
    if (window.EventSource) {

        // 建立连接
        source = new EventSource('http://localhost:8080/SSESimple/ssedata');
        setMessageInnerHTML("开始连接 SSE 服务端");

        /**
         * 连接一旦建立,就会触发 open 事件
         * 另一种写法:source.onopen = function (event) {}
         */
        source.addEventListener('open', function (e) {
            setMessageInnerHTML("成功建立连接。。。");
        }, false);

        /**
         * 客户端收到服务器发来的数据
         * 另一种写法:source.onmessage = function (event) {}
         */
        source.addEventListener('message', function (e) {
            setMessageInnerHTML(e.data);
        });

        // 异常处理
        source.addEventListener('error', function (event) {
            // handle error event
            console.log("SSE 发生错误:", event)
            // 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
            // 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
            source.close();
        }, false);

    } else {
        setMessageInnerHTML("你的浏览器不支持 SSE");
    }

    function setMessageInnerHTML(message) {
        document.getElementById('dynamic_content').innerHTML = message;
    }

</script>
</html>

同时添加 SSE 的后台处理器SSEDataController

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@RestController
public class SSEDataController {

    @GetMapping("/ssedata")
    public SseEmitter handleSSE() {
        // 每一次请求都返回一个新的 SseEmitter 对象
        SseEmitter emitter = new SseEmitter();
        // 注册回调
        // 当连接完成时,会触发 onCompletion 回调
        emitter.onCompletion(() -> System.out.println("link complete!!"));
        // 当连接报错时,会触发 onError 回调
        emitter.onError(Throwable::printStackTrace);
        // 当连接超时时,会触发 onTimeout 回调
        emitter.onTimeout(() -> System.out.println("connection timeout!!"));

        // 注意,不要使用 Executors.newSingleThreadExecutor 这样,每次用户请到达服务端都会创建一个核心线程数为 1 的线程池,执行完请求之后,核心线程会一直存在,这样的话,会造成线程的浪费
        // 单线程写入
        new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    SseEmitter.SseEventBuilder event = SseEmitter.event().id(String.valueOf(i)).name("message").data("SSE event #" + i);
                    emitter.send(event);
                    Thread.sleep(1000);
                }
                // 开始关闭
                // 标志 SSE 事件完成,关闭连接,连接会断开,这会触发前端的 error 事件
                emitter.complete();
            } catch (IOException e) {
                // 检测到报错,关闭连接,当然如果报错原因就是连接断开了,则啥都不做直接返回
                emitter.completeWithError(e);
            } catch (InterruptedException e) {
                // 线程中断
                // throw new RuntimeException(e);
            }
        }).start();

        return emitter;
    }

}

启动之后,访问http://localhost:8080/SSESimple/:进入首页之后点击测试 SSE,即可看到页面内容在刷新

输出到 9 之后,因为服务端关闭了异步请求,因此数字不再增加,同时服务端控制台输出

1
link complete!!

打开浏览器控制台,我们可以看到请求的具体情况

以及EventStream事件流,可以很清楚地看到服务端给客户端推送的事件流

关闭事件

有一点需要注意的是异步请求的关闭,如果连接关闭了,而一方不是主动关闭连接的那一方,那这一方就会报错

在上面的示例代码中,是客户端主动结束异步请求的处理(SseEmitter实例调用 complete 方法),此时因为连接关闭,客户端监听到连接断开之后,会触发 error 事件,输出错误如下。

其实如果是因为其他原因导致连接关闭,比如断网,也会触发客户端的 error 事件。

前端的EventSource对象是存在断线重连机制的,如果不调用 EventSource 对象的 close 方法

1
2
3
4
5
6
7
8
// 异常处理
source.addEventListener('error', function (event) {
    // handle error event
    console.log("SSE 发生错误:", event)
    // 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
    // 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
    // source.close();
}, false);

EventSource 对象自动进行断线重连,持续尝试对后端发起连接。如果网络是畅通的,而且后端接口也依然可用,那么就前端会再次与后端建立连接一个新的连接

,效果就是前端页面显示到 9 之后,会再次从 0 开始刷新:

如果我们想不进行自动重连,那我们可以在监听到 error 事件的时候调用EventSource.close();来关闭前端 EventSource 对象,阻止重试,

1
2
3
4
5
6
7
8
// 异常处理
source.addEventListener('error', function (event) {
    // handle error event
    console.log("SSE 发生错误:", event)
    // 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
    // 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
    source.close();
}, false);

不过这样就无法在网络出现波动的意外情况下自动重连了,因此,我们还是建议后端发送自定义的关闭事件,来通知客户端关闭连接,这个在自定义关闭事件小节会提到。

我们再来看看如果服务端还未关闭连接(SseEmitter实例调用 complete 方法),客户端主动关闭连接会怎么样。

经过实验我们发现,客户端在服务端还未调用 complete 方法之前,标签页关闭或者 JavaScript 脚本中主动调用EventSource.close方法,甚至页面刷新,服务端都报错,提示连接已关闭。输出错误信息如下:

可以这样在前端中主动关闭连接:

1
2
3
setTimeout(function () {
    source.close();
}, 3000)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java.io.IOException: 你的主机中的软件中止了一个已建立的连接。
    at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
    at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:54)
    at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132)
    at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:97)
    at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:53)
    at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:532)
    at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:118)
    at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.doWrite(NioEndpoint.java:1381)
    at org.apache.tomcat.util.net.SocketWrapperBase.doWrite(SocketWrapperBase.java:764)
    at org.apache.tomcat.util.net.SocketWrapperBase.flushBlocking(SocketWrapperBase.java:728)
    at org.apache.tomcat.util.net.SocketWrapperBase.flush(SocketWrapperBase.java:712)
    at org.apache.coyote.http11.Http11OutputBuffer$SocketOutputBuffer.flush(Http11OutputBuffer.java:566)
    at org.apache.coyote.http11.filters.ChunkedOutputFilter.flush(ChunkedOutputFilter.java:157)
    at org.apache.coyote.http11.Http11OutputBuffer.flush(Http11OutputBuffer.java:220)
    at org.apache.coyote.http11.Http11Processor.flush(Http11Processor.java:1245)
    at org.apache.coyote.AbstractProcessor.action(AbstractProcessor.java:400)
    at org.apache.coyote.Response.action(Response.java:208)
    at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:301)
    at org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:267)
    at org.apache.catalina.connector.CoyoteOutputStream.flush(CoyoteOutputStream.java:120)
    at org.springframework.util.StreamUtils.copy(StreamUtils.java:135)
    at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:130)
    at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:44)
    at org.springframework.http.converter.AbstractHttpMessageConverter.write(AbstractHttpMessageConverter.java:236)
    at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.sendInternal(ResponseBodyEmitterReturnValueHandler.java:221)
    at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.send(ResponseBodyEmitterReturnValueHandler.java:212)
    at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.sendInternal(ResponseBodyEmitter.java:234)
    at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.send(ResponseBodyEmitter.java:225)
    at org.springframework.web.servlet.mvc.method.annotation.SseEmitter.send(SseEmitter.java:135)
    at xyz.xiashuo.ssesimple.controller.SSEDataController.lambda$handleSSE$2(SSEDataController.java:34)
    at java.base/java.lang.Thread.run(Thread.java:833)
link complete!!

日志的输出是因为我们在代码中注册了错误处理动作

1
emitter.onError(Throwable::printStackTrace);

而且我们发现即使是发生了错误,连接关闭的时候依然会触发emitter.onCompletion事件。

因此,我们可以在onCompletion或者onError的处理方法中,关闭服务端的相关资源,比如线程,I/O 接口之类的。

值得注意的是,如果是因为连接断开而报的错,是不需要调用SseEmitter.complete()来关闭连接的,如果是因为其他的错,比如服务端内部的,跟连接状态无关的错,可以调用的SseEmitter.complete()来关闭连接。

为什么不需要?因为SseEmitter.complete()方法适用于服务端主动关闭连接的时候使用,而不应该在 Servlet 容器相关事件之后使用,这在SseEmitter.complete()的注释中也有提到。

自定义关闭事件

一般在监听到 error 事件的时候,我们不建议调用EventSource.close();来关闭前端 EventSource 对象,因为这样会跟自动重试相冲突,因此,我们可以自定义关闭事件。

在后端调用 complete 方法前,先发送一个类型为 complete 的事件,当然你也可以起别的名字。

1
2
3
4
5
// 开始关闭
// 建议发送自定义的关闭事件,来通知客户端关闭连接
emitter.send(SseEmitter.event().name("complete").data("close"));
// 标志 SSE 事件完成,关闭连接,连接会断开,这会触发前端的 error 事件
emitter.complete();

在前端中,在 error 事件中,不再调用EventSource.close();,此外,再额外监听 complete

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 异常处理
source.addEventListener('error', function (event) {
    // handle error event
    console.log("SSE 发生错误:", event)
    // 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
    // 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
    // source.close();
}, false);

// 注意,如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// 因此还是建议发送自定义的关闭事件,来通知客户端关闭连接。
// 默认情况下,服务器发来的数据,总是触发浏览器 EventSource 实例的 message 事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发 message 事件。
source.addEventListener('complete', function (e) {
    source.close();
});

这样,就很完美了。此时客户端收到的 EventStream 为。PS:不知道最后两个事件 id 都为 9 会不会有影响。

自定义传递对象

在前面的例子中,我们往客户端推送的事件的数据类型都是字符串,那可以推送其他自定义的类型吗?答案是可以的。

首先在index.html中添加连接

1
<a href="./ssepage_obj">测试 SSE 传输 Java 对象</a>

然后在SSEController中添加控制器方法

1
2
3
4
5
@RequestMapping("/ssepage_obj")
public String ssepage_obj() {
    //设置视图名称
    return "ssepageobj.html";
}

在静态资源路径src\main\resources\static下添加页面ssepageobj.html

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>首页</title>
</head>
<body>
<div>动态刷新:<span id="dynamic_content">还未接收到动态数据!</span></div>
</body>
<script>
    let source = null;
    if (window.EventSource) {

        // 建立连接
        source = new EventSource('http://localhost:8080/SSESimple/ssedata_obj');
        setMessageInnerHTML("开始连接 SSE 服务端");

        /**
         * 连接一旦建立,就会触发 open 事件
         * 另一种写法:source.onopen = function (event) {}
         */
        source.addEventListener('open', function (e) {
            setMessageInnerHTML("成功建立连接。。。");
        }, false);

        /**
         * 客户端收到服务器发来的数据
         * 另一种写法:source.onmessage = function (event) {}
         */
        source.addEventListener('message', function (e) {
            setMessageInnerHTML(e.data);
        });

        // 异常处理
        source.addEventListener('error', function (event) {
            // handle error event
            console.log("SSE 发生错误:", event)
            // 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
            // 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
            // source.close();
        }, false);

        // 注意,如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
        // 因此还是建议发送自定义的关闭事件,来通知客户端关闭连接。
        // 默认情况下,服务器发来的数据,总是触发浏览器 EventSource 实例的 message 事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发 message 事件。
        source.addEventListener('complete', function (e) {
            source.close();
        });

        // setTimeout(function () {
        //     source.close();
        // }, 3000)

    } else {
        setMessageInnerHTML("你的浏览器不支持 SSE");
    }

    function setMessageInnerHTML(message) {
        document.getElementById('dynamic_content').innerHTML = message;
    }

</script>
</html>

然后添加自定义 SSE 消息对象类

@Data为 Lombok 的注解

1
2
3
4
5
6
@Data
public class SSEMessage {
    private String name;
    private int age;
    private LocalDateTime birthday = LocalDateTime.now();
}

再在SSEDataController中添加后台接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@GetMapping("/ssedata_obj")
public SseEmitter handleSSEObj() {
    // 每一次请求都返回一个新的 SseEmitter 对象
    SseEmitter emitter = new SseEmitter();
    // 注册回调
    // 当连接完成时,会触发 onCompletion 回调
    emitter.onCompletion(() -> System.out.println("link complete!!"));
    // 当连接报错时,会触发 onError 回调
    emitter.onError(Throwable::printStackTrace);
    // 当连接超时时,会触发 onTimeout 回调
    emitter.onTimeout(() -> System.out.println("connection timeout!!"));

    // 注意,不要使用 Executors.newSingleThreadExecutor 这样,每次用户请到达服务端都会创建一个核心线程数为 1 的线程池,执行完请求之后,核心线程会一直存在,这样的话,会造成线程的浪费
    // 单线程写入
    new Thread(() -> {
        try {
            for (int i = 0; i < 10; i++) {
                // 传递字符串
                // SseEmitter.SseEventBuilder event = SseEmitter.event().id(String.valueOf(i)).name("message").data("SSE event #" + i);
                // 传递自定义类型
                SSEMessage message = new SSEMessage();
                message.setName("SSE event #" + i);
                SseEmitter.SseEventBuilder event = SseEmitter.event().id(String.valueOf(i)).name("message").data(message);
                emitter.send(event);
                Thread.sleep(1000);
            }
            // 开始关闭
            // 建议发送自定义的关闭事件,来通知客户端关闭连接
            emitter.send(SseEmitter.event().name("complete").data("close"));
            // 标志 SSE 事件完成,关闭连接,连接会断开,这会触发前端的 error 事件
            emitter.complete();
        } catch (IOException e) {
            // 检测到报错,关闭连接,当然如果报错原因就是连接断开了,则啥都不做直接返回
            emitter.completeWithError(e);
        } catch (InterruptedException e) {
            // 线程中断
            // throw new RuntimeException(e);
        }
    }).start();

    return emitter;
}

启动项目之后,在前台页面中可以看到

为什么可以正常传输呢?根据我们在SseEmitter的源码分析,我们可以知道,最终传递给客户端的 HTTP 消息的格式是MediaType.TEXT_EVENT_STREAM,而ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandlersendInternal方法要求系统中存在一个HttpMessageConverter,能够将我们传入的类型(可能是任意类型)转化成MediaType.TEXT_EVENT_STREAM类型的 HTTP 消息传给客户端,但是我们并没有注册这样一个HttpMessageConverter,那么实际是那个HttpMessageConverter完成的转化呢?经过简单调试,我们发现实际默认注册了 8 个HttpMessageConverter

而最终负责将我们传入的类型(可能是任意类型)转化成MediaType.TEXT_EVENT_STREAM类型的 HTTP 消息的是MappingJackson2HttpMessageConverter

关于为什么 SpringBoot 会自动注册MappingJackson2HttpMessageConverter,以及如何手动注册自定义的HttpMessageConverter,具体请看《HttpMessageConverters 自动配置类源码解析》

难怪看页面上的结果,那么像一个 JSON 字符串。

当然你也可以自定义一个HttpMessageConverter,用来处理特定格式到成MediaType.TEXT_EVENT_STREAM类型的 HTTP 消息的转化,例如SSEMessageConverter

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
 * 参考 StringHttpMessageConverter
 */
@Component
public class SSEMessageConverter extends AbstractHttpMessageConverter<SSEMessage> {
    // 默认编码
    public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;

    public SSEMessageConverter() {
        // super(DEFAULT_CHARSET, null);
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        // 只支持处理 SSEMessage 类型的实例
        return SSEMessage.class.isAssignableFrom(clazz);
    }

    @Override
    protected SSEMessage readInternal(Class<? extends SSEMessage> clazz, HttpInputMessage inputMessage) throws IOException, HttpMessageNotReadableException {
        // 读操作有待实现
        return null;
    }

    @Override
    protected void writeInternal(SSEMessage sseMessage, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException {
        String content = myProtocolContent(sseMessage);
        // 输出到响应的消息体
        StreamUtils.copy(content, DEFAULT_CHARSET, outputMessage.getBody());
    }

    /**
     * 自定义消息输出内容格式
     *
     * @param sseMessage
     * @return
     */
    private String myProtocolContent(SSEMessage sseMessage) {
        // 当然你可以在这里选择其他的拼接方式,比如把所有的属性用英文逗号拼接,等等。
        // 或者序列化为 JSON 字符串
        return "自定义转换:"+sseMessage.toString();
    }

    @Override
    protected void addDefaultHeaders(HttpHeaders headers, SSEMessage s, @Nullable MediaType type) throws IOException {
        // 不重写 addDefaultHeaders 指定响应的 ContentType 消息头的话,默认的 addDefaultHeaders 会自动设置 ContentType 为自定义 MediaType,
        // 浏览器会处理不了,会变成下载,所以这里需要设置 ContentType 消息头,设置成 MediaType.TEXT_PLAIN 是为了方便展示
        // 指定类型的时候指定编码
        headers.setContentType(new MediaType(MediaType.TEXT_PLAIN, DEFAULT_CHARSET));
        super.addDefaultHeaders(headers, s, type);
    }

    @Override
    protected Long getContentLength(SSEMessage message, @Nullable MediaType contentType) {
        String str = myProtocolContent(message);
        return (long) str.getBytes(DEFAULT_CHARSET).length;
    }

}

再次调试,可以看到SSEMessageConverter确实出现在了转化器列表中,而且排在MappingJackson2HttpMessageConverter的前面,这意味着SSEMessageConverter会优先生效

实际的结果:

可以看到,转化结果确实是自定义的HttpMessageConverter转化的结果。

一个页面多个类型的事件使用同一个 SseEmitter 对象

在实际使用的过程中,一个页面中可能有多个需要异步处理的请求,比如订单信息的变更,比如聊天信息的条数,等等,这些信息属于不同的主题,而且SseEmitter是支持单个对象传输不同主题的 SSE 事件的,而且因为SseEmitter的 send 方法是线程安全的,因此,我们可以在多个线程中调用同一个SseEmitter实例的 send 方法来推送消息,方案也很简单,一个主题的事件用一个线程推送即可。

首先在index.html中添加连接

1
<a href="./ssepage_concurrent">测试 SSE 传输多类型事件</a>

然后在SSEController中添加控制器方法

1
2
3
4
5
@RequestMapping("/ssepage_concurrent")
public String ssepageConcurrent() {
    //设置视图名称
    return "ssepage_concurrent.html";
}

在静态资源路径src\main\resources\static下添加页面ssepage_concurrent.html,在这个页面中,我们手动监听三个类型的事件topic1topic2topic3

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>首页</title>
</head>
<body>
<div>主题 1 动态刷新:<span id="dynamic_content_1">还未接收到动态数据!</span></div>
<div>主题 2 动态刷新:<span id="dynamic_content_2">还未接收到动态数据!</span></div>
<div>主题 3 动态刷新:<span id="dynamic_content_3">还未接收到动态数据!</span></div>
</body>
<script>
    let source = null;
    if (window.EventSource) {

        // 建立连接
        source = new EventSource('http://localhost:8080/SSESimple/ssedata_concurrent');
        console.log("开始连接 SSE 服务端");

        /**
         * 连接一旦建立,就会触发 open 事件
         * 另一种写法:source.onopen = function (event) {}
         */
        source.addEventListener('open', function (e) {
            console.log("成功建立连接。。。");
        }, false);

        /**
         * 客户端收到服务器发来的数据
         * 另一种写法:source.onmessage = function (event) {}
         */
        source.addEventListener('message', function (e) {
            // 后端指定的 ID 通过 e.lastEventId 获取
            setMessageInnerHTML(e.lastEventId, e.data);
        });

        source.addEventListener('topic1', function (e) {
            // 后端指定的 ID 通过 e.lastEventId 获取
            setMessageInnerHTML(e.lastEventId, e.data);
        });
        source.addEventListener('topic2', function (e) {
            // 后端指定的 ID 通过 e.lastEventId 获取
            setMessageInnerHTML(e.lastEventId, e.data);
        });
        source.addEventListener('topic3', function (e) {
            // 后端指定的 ID 通过 e.lastEventId 获取
            setMessageInnerHTML(e.lastEventId, e.data);
        });

        // 异常处理
        source.addEventListener('error', function (event) {
            // handle error event
            console.log("SSE 发生错误:", event)
            // 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
            // 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
            // source.close();
        }, false);

        // 注意,如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
        // 因此还是建议发送自定义的关闭事件,来通知客户端关闭连接。
        // 默认情况下,服务器发来的数据,总是触发浏览器 EventSource 实例的 message 事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发 message 事件。
        source.addEventListener('complete', function (e) {
            source.close();
        });

    } else {
        setMessageInnerHTML("你的浏览器不支持 SSE");
    }

    function setMessageInnerHTML(id, message) {
        document.getElementById('dynamic_content_' + id).innerHTML = message;
    }

</script>
</html>

再在SSEDataController中添加后台接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@GetMapping("/ssedata_concurrent")
public SseEmitter handleSSEConcurrent() {
    SseEmitter emitter = new SseEmitter();
    // 注册回调
    emitter.onCompletion(() -> System.out.println("link complete!!"));
    emitter.onError(Throwable::printStackTrace);
    emitter.onTimeout(() -> System.out.println("connection timeout!!"));

    // 在实际业务种,我们可以给每一种业务分配一个主题,并且用一个专门的线程来推送数据
    for (int i = 1; i <= 3; i++) {
        int finalI = i;
        new Thread(() -> {
            String topic = "topic" + finalI;
            // 测试每个主题发送 10 条数据
            for (int j = 0; j < 10; j++) {
                String msg = " Topic " + finalI + " : SSE event #" + j;
                SseEmitter.SseEventBuilder event = SseEmitter.event().id(String.valueOf(finalI)).name(topic).data(msg);
                try {
                    emitter.send(event);
                    // 延时
                    Thread.sleep(800 + 50 * finalI * j);
                } catch (IOException e) {
                    // 客户端主动关闭
                    emitter.completeWithError(e);
                } catch (InterruptedException e) {
                    // 线程中断
                    // throw new RuntimeException(e);
                }
            }
        }).start();
    }

    return emitter;
}

启动项目之后,在前台页面中可以看到下图这样的变换,为了让效果更直观,我们还给每个主题添加了不一样的延时

效果还是不错的。

在实际的生产实践中,应该是一个用户一个SseEmitter对象,而不是一个请求一个SseEmitter对象,最好是用一个Map,来保存所有的SseEmitter,key 为SessionID,在Session过期的时候清除对应的SseEmitter,这个就懒得写了,后面用到了再说。


0%