警告
本文最后更新于 2023-12-28,文中内容可能已过时。
SSE 简单实践 - SpringMVC 异步消息处理
官方文档: Server-Sent Events
参考博客:Spring Boot 中的 SSE 异常处理:统一处理连接和事件异常
阮一峰大佬的博客:Server-Sent Events 教程 - 阮一峰的网络日志
理论部分
关于 SSE 的理论部分,请看《Web 实时消息推送的 7 种实现方案》的Server-sent events
小节
以后有时间了,再深入了解一下HTTP异步消息处理的在Servlet层面的原理。
后端
Spring 的控制器方法默认支持哪些返回值类型,看官方文档:Spring MVC Return Values,其中就专门提到了返回值类型为SseEmitter
或者ResponseBodyEmitter
的场景,这两个类的作用是发出一个异步对象传输流,用HttpMessageConverter
的具体实现将其写入 HTTP 响应。这两个类也支持作为ResponseEntity
的主体。参见异步请求和HTTP 流。
ResponseBodyEmitter
用于异步请求处理,其子类SseEmitter
专门处理异步消息中的一个特殊场景,即Server-sent events
。以后我们有异步请求处理的需求,都可以通过从控制器方法返回SseEmitter
来实现。
关于控制器方法不同的返回值,Spring 是如何处理的,请看《RequestMappingHandlerAdapter 源码解析》的返回值处理器
小节
SseEmitter
的结构也很简单,其父类为ResponseBodyEmitter
,我们直接从这个类开始看起。
ResponseBodyEmitter
ResponseBodyEmitter
是异步请求处理的控制器方法返回值类型,控制器方法返回ResponseBodyEmitter
后,我们可以将一个或多个对象被写入ResponseBodyEmitter
中,推送到客户端,这就是异步请求处理。
有同样作用的还有DeferredResult
,不过DeferredResult
用于推送单个对象到客户端,而ResponseBodyEmitter
可以用于发送多个对象,注意,这些对象不一定是同一种类型的对象,而且其中每个对象都应该注册相应的HttpMessageConverter
,因为最终需要通过HttpMessageConverter
将对象写入到响应消息体中。
关于如何创建HttpMessageConverter
,并将其注册到 Spring 容器中,请看《HttpMessageConverters 自动配置类源码解析》
ResponseBodyEmitter
方法的源码其实没什么可看的,值得注意的是,ResponseBodyEmitter
的初始化方法(ResponseBodyEmitter#initialize
,ResponseBodyEmitter#initializeWithError
)、发送消息的方法(ResponseBodyEmitter#send
)、关闭连接的方法(ResponseBodyEmitter#complete
,ResponseBodyEmitter#completeWithError
)、甚至异常事件捕获的方法(ResponseBodyEmitter#onTimeout
,ResponseBodyEmitter#onError
,ResponseBodyEmitter#onCompletion
),全都添加了synchronized
关键字,也就是说,ResponseBodyEmitter
对象实例的整个工作周期,都是线程安全的,尤其是发送消息的方法也是线程安全的,这样,我们就可以在多个线程中使用同一个ResponseBodyEmitter
实例的 send 方法推送消息到客户端。
ResponseBodyEmitter
的使用方式如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
@RequestMapping(value="/stream", method=RequestMethod.GET)
public ResponseBodyEmitter handle() {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
// Pass the emitter to another component...
return emitter;
}
// in another thread
emitter.send(foo1);
// and again
emitter.send(foo2);
// and done
emitter.complete();
|
SseEmitter - 重点,常用
SseEmitter
继承了ResponseBodyEmitter
,是 Spring 专门为 Server-Sent Events 实现的异步请求处理的控制器方法返回值类型,
SseEmitter
的主要作用,就是简化了ResponseBodyEmitter
的使用,将所有异步推送的 HTTP 信息的消息体的格式都统一成UTF-8
编码的文本。
1
2
3
4
5
6
7
8
9
10
11
|
private static final MediaType TEXT_PLAIN = new MediaType("text", "plain", StandardCharsets.UTF_8);
protected void extendResponse(ServerHttpResponse outputMessage) {
super.extendResponse(outputMessage);
HttpHeaders headers = outputMessage.getHeaders();
if (headers.getContentType() == null) {
// 设置响应消息头中的内容格式为文本流 text/event-stream
headers.setContentType(MediaType.TEXT_EVENT_STREAM);
}
}
|
这样,不管SseEmitter#send
方法的SseEmitter.SseEventBuilder
的 data 方法参数是什么类型,最终都会被转化为字符串推送到客户端,而 SpringBoot 刚好会自动注入MappingJackson2HttpMessageConverter
,可以将任意类型的对象格式化为 JSON 字符串,因此,用户不需要再自定义SseEmitter.SseEventBuilder
的 data 方法参数对应类型的HttpMessageConverter
的实现类,简化了异步推送消息的过程。
当然,如果你对字符串化的结果有定制化的需求,你也可以自定义SseEmitter.SseEventBuilder
的 data 方法参数对应类型的HttpMessageConverter
的实现类,然后将其注册为 bean 即可,这个我们在后面都会有实践
关于为什么 SpringBoot 会自动注册MappingJackson2HttpMessageConverter
,以及如何手动注册自定义的HttpMessageConverter
,具体请看《HttpMessageConverters 自动配置类源码解析》
此外,SseEmitter#send
方法也加了锁。保证了线程安全。这样,我们就可以在多个线程中使用同一个SseEmitter
实例的SseEmitter#send
方法推送消息到客户端,这在后面的本文后面的实践部分也有提到。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public void send(SseEventBuilder builder) throws IOException {
Set<DataWithMediaType> dataToSend = builder.build();
// 加锁,线程安全
this.writeLock.lock();
try {
// super 就是 ResponseBodyEmitter
// 其实 ResponseBodyEmitter 的 send 已经加了所了,这里再加一次,感觉有点多余
super.send(dataToSend);
}
finally {
this.writeLock.unlock();
}
}
|
实际上,SseEmitter#send
方法最终调用的是到ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandler
的sendInternal
方法,调用栈如下:
ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandler
的sendInternal
方法的具体细节请看ResponseBodyEmitterReturnValueHandler
小节。
SseEmitter#send
方法的参数是SseEmitter$SseEventBuilder
,这个构造器接口的主要作用是构造一个 SSE 也就是一个服务端发送的事件,这个接口默认只有一个实现类,也在 SseEmitter 内部SseEmitter$SseEventBuilderImpl
,我们通过SseEmitter#event
方法可以轻松获取这个构造器实现。
这个构造器可以指定一个 SSE 的以下几个字段
这个与HTTP 消息
小节提到的 SSE 对应的 HTTP 消息的格式是对应的。
此外,调用SseEmitter
实例的 complete 方法可以结束请求,关闭连接,连接一旦关闭,就会触发前端的 error 事件,而前端的EventSource
对象是存在断线重连机制的,如果不调用 EventSource
对象的 close 方法,EventSource
对象自动进行断线重连,持续尝试对后端发起连接。因此,我们可以在监听到 error 事件的时候调用EventSource.close();
来关闭前端 EventSource 对象,阻止重试,不过这样就无法在网络出现波动的意外情况下自动重连了,因此,我们还是建议后端发送自定义的关闭事件,来通知客户端关闭连接,这个在后面实践环节也会提到。
SseEmitter
实例的 complete 方法实际上继承自ResponseBodyEmitter#complete
,最终调用ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandler
的complete
方法,这个到ResponseBodyEmitterReturnValueHandler
小节再看
ResponseBodyEmitterReturnValueHandler
ResponseBodyEmitterReturnValueHandler
的作用就是处理控制器方法返回值类型为ResponseBodyEmitter
的情况,所以啊,源码其实应该从这里开始看。
其实ResponseBodyEmitterReturnValueHandler
我们之前接触过,在《RequestMappingHandlerAdapter 源码解析》的返回值处理器
小节中,提到过这个类,也看到过其他的返回值处理类
返回值处理类的重点是handleReturnValue
方法,这里我们简单分析一下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
@Override
@SuppressWarnings("resource")
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
// 获取原始的响应对象,并封装为 outputMessage
HttpServletResponse response = webRequest.getNativeResponse(HttpServletResponse.class);
Assert.state(response != null, "No HttpServletResponse");
ServerHttpResponse outputMessage = new ServletServerHttpResponse(response);
// 如果是 ResponseEntity
if (returnValue instanceof ResponseEntity<?> responseEntity) {
response.setStatus(responseEntity.getStatusCode().value());
outputMessage.getHeaders().putAll(responseEntity.getHeaders());
returnValue = responseEntity.getBody();
returnType = returnType.nested();
if (returnValue == null) {
mavContainer.setRequestHandled(true);
outputMessage.flush();
return;
}
}
ServletRequest request = webRequest.getNativeRequest(ServletRequest.class);
Assert.state(request != null, "No ServletRequest");
ResponseBodyEmitter emitter;
if (returnValue instanceof ResponseBodyEmitter responseBodyEmitter) {
// 如果是 ResponseBodyEmitter 这个类或者其子类,说明还是传统的 servelet API 处理
emitter = responseBodyEmitter;
} else {
// 如果不是 ResponseBodyEmitter 这个类或者其子类,那么就是响应式 API
emitter = this.reactiveHandler.handleValue(returnValue, returnType, mavContainer, webRequest);
if (emitter == null) {
// Not streaming: write headers without committing response..
outputMessage.getHeaders().forEach((headerName, headerValues) -> {
for (String headerValue : headerValues) {
response.addHeader(headerName, headerValue);
}
});
return;
}
}
// 对 HTTP 响应消息进行拓展,如果是 SseEmitter,此时就会设置响应消息体的格式为文本流 text/event-stream
emitter.extendResponse(outputMessage);
// At this point we know we're streaming..
ShallowEtagHeaderFilter.disableContentCaching(request);
// 包装响应对象,开始以流的形式进行消息传递
// Wrap the response to ignore further header changes
// Headers will be flushed at the first write
outputMessage = new StreamingServletServerHttpResponse(outputMessage);
HttpMessageConvertingHandler handler;
try {
DeferredResult<?> deferredResult = new DeferredResult<>(emitter.getTimeout());
// 本质上还是基于 deferredResult 进行异步消息传递
// getAsyncManager 获取的是 WebAsyncManager,这是异步请求处理的核心
// startDeferredResultProcessing 方法的作用是,监听异步请求的各种事件,比如请求超时,请求报错,请求完成,并在这些事件发生的时候对 deferredResult 执行相应的操作
// 这里就不深入理解了
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
handler = new HttpMessageConvertingHandler(outputMessage, deferredResult);
}
catch (Throwable ex) {
emitter.initializeWithError(ex);
throw ex;
}
// 初始化 ResponseBodyEmitter
emitter.initialize(handler);
}
|
然后再看内部类ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandler
,这个内部类实现了ResponseBodyEmitter.Handler
。
我们重点看这个内部类的sendInternal
方法,简单的源码分析如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// data 就是 SseEmitter.event().id().name().data() 的 data 方法传入的值,也就是我们推送到客户端的对象
// mediaType 是我们希望以什么样的格式来传递 data
private <T> void sendInternal(T data, @Nullable MediaType mediaType) throws IOException {
// ResponseBodyEmitterReturnValueHandler.this.sseMessageConverters 保存的是当前容器内的所有的 HttpMessageConverter
for (HttpMessageConverter<?> converter : ResponseBodyEmitterReturnValueHandler.this.sseMessageConverters) {
// 判断当前 converter 能否将 data 的类型的数据转化为指定的 MediaType 格式
if (converter.canWrite(data.getClass(), mediaType)) {
// 如果可以,直接转化,写入到响应中
((HttpMessageConverter<T>) converter).write(data, mediaType, this.outputMessage);
// 然后直接返回,不会再继续查找,所以 converter 的顺序很重要,这话都要说烂了。
return;
}
}
throw new IllegalArgumentException("No suitable converter for " + data.getClass());
}
|
类似的分析请看《SpringMVC-ContentNegotiation 内容协商》的简单分析RequestResponseBodyMethodProcessor
小节
这个内部方法sendInternal
方法,实际上就是将异步推送的数据推送到客户端的地方。
假设我们调用SseEmitter#send
方法向客户端推送数据,那么从SseEmitter#send
方法到ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandler
的sendInternal
方法的调用栈如下:
此外还有complete
方法,简单看一下。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@Override
public void complete() {
try {
// outputMessage 的类型为 ServerHttpResponse 就是 HTTP 响应对象
// 执行刷新
this.outputMessage.flush();
// setResult 会标志着异步请求完成
this.deferredResult.setResult(null);
}
catch (IOException ex) {
this.deferredResult.setErrorResult(ex);
}
}
|
HTTP 消息
数据格式
服务器向浏览器发送的 SSE 数据,必须是 UTF-8 编码的文本,具有如下的 HTTP 头信息。
1
2
3
|
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
|
上面三行之中,第一行的Content-Type
必须指定 MIME 类型为event-steam
。
每一次发送的信息,由若干个message
组成,每个message
之间用\n\n
分隔。每个message
内部由若干行组成,每一行都是如下格式。
上面的field
可以取四个值。
此外,还可以有冒号开头的行,表示注释。通常,服务器每隔一段时间就会向浏览器发送一个注释,保持连接不中断。
下面是一个例子。
1
2
3
4
5
6
|
: this is a test stream\n\n
data: some text\n\n
data: another message\n
data: with two lines \n\n
|
data 字段
数据内容用data
字段表示。
如果数据很长,可以分成多行,最后一行用\n\n
结尾,前面行都用\n
结尾。
1
2
|
data: begin message\n
data: continue message\n\n
|
下面是一个发送 JSON 数据的例子。
1
2
3
4
|
data: {\n
data: "foo": "bar",\n
data: "baz", 555\n
data: }\n\n
|
id 字段
数据标识符用id
字段表示,相当于每一条数据的编号。
1
2
|
id: msg1\n
data: message\n\n
|
浏览器用lastEventId
属性读取这个值。一旦连接断线,浏览器会发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID
头信息,将这个值发送回来,用来帮助服务器端重建连接。因此,这个头信息可以被视为一种同步机制。
event 字段
event
字段表示自定义的事件类型,默认是message
事件。浏览器可以用addEventListener()
监听该事件。
1
2
3
4
5
6
7
|
event: foo\n
data: a foo event\n\n
data: an unnamed event\n\n
event: bar\n
data: a bar event\n\n
|
上面的代码创造了三条信息。第一条的名字是foo
,触发浏览器的foo
事件;第二条未取名,表示默认类型,触发浏览器的message
事件;第三条是bar
,触发浏览器的bar
事件。
下面是另一个例子。
1
2
3
4
5
6
7
8
9
10
11
|
event: userconnect
data: {"username": "bobby", "time": "02:33:48"}
event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}
event: userdisconnect
data: {"username": "bobby", "time": "02:34:23"}
event: usermessage
data: {"username": "sean", "time": "02:34:36", "text": "Bye, bobby."}
|
retry 字段
服务器可以用retry
字段,指定浏览器重新发起连接的时间间隔。
两种情况会导致浏览器重新发起连接:一种是时间间隔到期,二是由于网络错误等原因,导致连接出错。
前端
官方文档: Server-Sent Events
EventSource 对象
SSE 的客户端 API 部署在EventSource
对象上。下面的代码可以检测浏览器是否支持 SSE。
1
2
3
|
if ('EventSource' in window) {
// ...
}
|
使用 SSE 时,浏览器首先生成一个EventSource
实例,向服务器发起连接。
1
|
var source = new EventSource(url);
|
上面的url
可以与当前网址同域,也可以跨域。跨域时,可以指定第二个参数,打开withCredentials
属性,表示是否一起发送 Cookie。
1
|
var source = new EventSource(url, { withCredentials: true });
|
EventSource
实例的readyState
属性,表明连接的当前状态。该属性只读,可以取以下值。
-
0:相当于常量EventSource.CONNECTING
,表示连接还未建立,或者断线正在重连。
-
1:相当于常量EventSource.OPEN
,表示连接已经建立,可以接受数据。
-
2:相当于常量EventSource.CLOSED
,表示连接已断,且不会重连。
基本使用流程
SSE 默认的事件如下:
通道建立事件,连接一旦建立,就会触发open
事件,可以在onopen
属性定义回调函数。
1
2
3
4
5
6
7
8
|
source.onopen = function (event) {
// ...
};
// 另一种写法
source.addEventListener('open', function (event) {
// ...
}, false);
|
客户端收到服务器发来的数据,就会触发message
事件,可以在onmessage
属性的回调函数。
1
2
3
4
5
6
7
8
9
10
|
source.onmessage = function (event) {
var data = event.data;
// handle message
};
// 另一种写法
source.addEventListener('message', function (event) {
var data = event.data;
// handle message
}, false);
|
上面代码中,事件对象的data
属性就是服务器端传回的数据(Content-Type
为text/event-stream
,文本流格式)。
如果发生通信错误(比如连接中断),或者服务端调用 SseEmitter.complete
的时候(此方法会结束连接),就会触发error
事件,可以在onerror
属性定义回调函数。此时如果不调用EventSource
对象的 close 方法,这个对象自动进行断线重连,会持续对其 url 发起连接。因此,我们一般是建议通过自定义事件来通知前端关闭EventSource
。
1
2
3
4
5
6
7
8
|
source.onerror = function (event) {
// handle error event
};
// 另一种写法
source.addEventListener('error', function (event) {
// handle error event
}, false);
|
close
方法用于关闭 SSE 连接。
默认情况下,服务器发来的数据,总是触发浏览器EventSource
实例的message
事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发message
事件。
总体上来说需要自定义两类事件,用来监听 SSE 的通信阶段
简单实践
基本使用
首先搭建一个简单的 SpringBoot Web 程序
pom 文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
<project>
......
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
|
配置文件
1
2
3
4
5
6
7
8
|
server:
# 当前应用所在的 Web 服务器监听的本地端口
port: 8080
servlet:
# 应用的上下文路径
context-path: /SSESimple
# 启用默认的 Servlet
register-default-servlet: true
|
在项目的src\main\resources\static
目录下创建index.html
1
2
3
4
5
6
7
8
9
10
11
12
13
|
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>首页</title>
</head>
<body>
<h1>Hello World</h1>
SSE 测试
<br/>
<a href="./ssepage">测试 SSE </a>
</body>
</html>
|
然后注册控制器SSEController
1
2
3
4
5
6
7
8
9
10
|
@Controller
public class SSEController {
@RequestMapping("/ssepage")
public String ssepage() {
//设置视图名称
return "ssepage.html";
}
}
|
添加视图页面ssepage.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>首页</title>
</head>
<body>
<div>动态刷新:<span id="dynamic_content">还未接收到动态数据!</span></div>
</body>
<script>
let source = null;
if (window.EventSource) {
// 建立连接
source = new EventSource('http://localhost:8080/SSESimple/ssedata');
setMessageInnerHTML("开始连接 SSE 服务端");
/**
* 连接一旦建立,就会触发 open 事件
* 另一种写法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML("成功建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
// 异常处理
source.addEventListener('error', function (event) {
// handle error event
console.log("SSE 发生错误:", event)
// 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
// 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
source.close();
}, false);
} else {
setMessageInnerHTML("你的浏览器不支持 SSE");
}
function setMessageInnerHTML(message) {
document.getElementById('dynamic_content').innerHTML = message;
}
</script>
</html>
|
同时添加 SSE 的后台处理器SSEDataController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
@RestController
public class SSEDataController {
@GetMapping("/ssedata")
public SseEmitter handleSSE() {
// 每一次请求都返回一个新的 SseEmitter 对象
SseEmitter emitter = new SseEmitter();
// 注册回调
// 当连接完成时,会触发 onCompletion 回调
emitter.onCompletion(() -> System.out.println("link complete!!"));
// 当连接报错时,会触发 onError 回调
emitter.onError(Throwable::printStackTrace);
// 当连接超时时,会触发 onTimeout 回调
emitter.onTimeout(() -> System.out.println("connection timeout!!"));
// 注意,不要使用 Executors.newSingleThreadExecutor 这样,每次用户请到达服务端都会创建一个核心线程数为 1 的线程池,执行完请求之后,核心线程会一直存在,这样的话,会造成线程的浪费
// 单线程写入
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
SseEmitter.SseEventBuilder event = SseEmitter.event().id(String.valueOf(i)).name("message").data("SSE event #" + i);
emitter.send(event);
Thread.sleep(1000);
}
// 开始关闭
// 标志 SSE 事件完成,关闭连接,连接会断开,这会触发前端的 error 事件
emitter.complete();
} catch (IOException e) {
// 检测到报错,关闭连接,当然如果报错原因就是连接断开了,则啥都不做直接返回
emitter.completeWithError(e);
} catch (InterruptedException e) {
// 线程中断
// throw new RuntimeException(e);
}
}).start();
return emitter;
}
}
|
启动之后,访问http://localhost:8080/SSESimple/
:进入首页之后点击测试 SSE
,即可看到页面内容在刷新
输出到 9 之后,因为服务端关闭了异步请求,因此数字不再增加,同时服务端控制台输出
打开浏览器控制台,我们可以看到请求的具体情况
以及EventStream
事件流,可以很清楚地看到服务端给客户端推送的事件流
关闭事件
有一点需要注意的是异步请求的关闭,如果连接关闭了,而一方不是主动关闭连接的那一方,那这一方就会报错。
在上面的示例代码中,是客户端主动结束异步请求的处理(SseEmitter
实例调用 complete 方法),此时因为连接关闭,客户端监听到连接断开之后,会触发 error 事件,输出错误如下。
其实如果是因为其他原因导致连接关闭,比如断网,也会触发客户端的 error 事件。
而前端的EventSource
对象是存在断线重连机制的,如果不调用 EventSource
对象的 close 方法
1
2
3
4
5
6
7
8
|
// 异常处理
source.addEventListener('error', function (event) {
// handle error event
console.log("SSE 发生错误:", event)
// 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
// 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// source.close();
}, false);
|
EventSource
对象自动进行断线重连,持续尝试对后端发起连接。如果网络是畅通的,而且后端接口也依然可用,那么就前端会再次与后端建立连接一个新的连接
,效果就是前端页面显示到 9 之后,会再次从 0 开始刷新:
如果我们想不进行自动重连,那我们可以在监听到 error 事件的时候调用EventSource.close();
来关闭前端 EventSource 对象,阻止重试,
1
2
3
4
5
6
7
8
|
// 异常处理
source.addEventListener('error', function (event) {
// handle error event
console.log("SSE 发生错误:", event)
// 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
// 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
source.close();
}, false);
|
不过这样就无法在网络出现波动的意外情况下自动重连了,因此,我们还是建议后端发送自定义的关闭事件,来通知客户端关闭连接,这个在自定义关闭事件
小节会提到。
我们再来看看如果服务端还未关闭连接(SseEmitter
实例调用 complete 方法),客户端主动关闭连接会怎么样。
经过实验我们发现,客户端在服务端还未调用 complete 方法之前,标签页关闭或者 JavaScript 脚本中主动调用EventSource.close
方法,甚至页面刷新,服务端都报错,提示连接已关闭。输出错误信息如下:
可以这样在前端中主动关闭连接:
1
2
3
|
setTimeout(function () {
source.close();
}, 3000)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
java.io.IOException: 你的主机中的软件中止了一个已建立的连接。
at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:54)
at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:132)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:97)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:53)
at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:532)
at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:118)
at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.doWrite(NioEndpoint.java:1381)
at org.apache.tomcat.util.net.SocketWrapperBase.doWrite(SocketWrapperBase.java:764)
at org.apache.tomcat.util.net.SocketWrapperBase.flushBlocking(SocketWrapperBase.java:728)
at org.apache.tomcat.util.net.SocketWrapperBase.flush(SocketWrapperBase.java:712)
at org.apache.coyote.http11.Http11OutputBuffer$SocketOutputBuffer.flush(Http11OutputBuffer.java:566)
at org.apache.coyote.http11.filters.ChunkedOutputFilter.flush(ChunkedOutputFilter.java:157)
at org.apache.coyote.http11.Http11OutputBuffer.flush(Http11OutputBuffer.java:220)
at org.apache.coyote.http11.Http11Processor.flush(Http11Processor.java:1245)
at org.apache.coyote.AbstractProcessor.action(AbstractProcessor.java:400)
at org.apache.coyote.Response.action(Response.java:208)
at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:301)
at org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:267)
at org.apache.catalina.connector.CoyoteOutputStream.flush(CoyoteOutputStream.java:120)
at org.springframework.util.StreamUtils.copy(StreamUtils.java:135)
at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:130)
at org.springframework.http.converter.StringHttpMessageConverter.writeInternal(StringHttpMessageConverter.java:44)
at org.springframework.http.converter.AbstractHttpMessageConverter.write(AbstractHttpMessageConverter.java:236)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.sendInternal(ResponseBodyEmitterReturnValueHandler.java:221)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler$HttpMessageConvertingHandler.send(ResponseBodyEmitterReturnValueHandler.java:212)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.sendInternal(ResponseBodyEmitter.java:234)
at org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter.send(ResponseBodyEmitter.java:225)
at org.springframework.web.servlet.mvc.method.annotation.SseEmitter.send(SseEmitter.java:135)
at xyz.xiashuo.ssesimple.controller.SSEDataController.lambda$handleSSE$2(SSEDataController.java:34)
at java.base/java.lang.Thread.run(Thread.java:833)
link complete!!
|
日志的输出是因为我们在代码中注册了错误处理动作
1
|
emitter.onError(Throwable::printStackTrace);
|
而且我们发现即使是发生了错误,连接关闭的时候依然会触发emitter.onCompletion
事件。
因此,我们可以在onCompletion
或者onError
的处理方法中,关闭服务端的相关资源,比如线程,I/O 接口之类的。
值得注意的是,如果是因为连接断开而报的错,是不需要调用SseEmitter.complete()
来关闭连接的,如果是因为其他的错,比如服务端内部的,跟连接状态无关的错,可以调用的SseEmitter.complete()
来关闭连接。
为什么不需要?因为SseEmitter.complete()
方法适用于服务端主动关闭连接的时候使用,而不应该在 Servlet 容器相关事件之后使用,这在SseEmitter.complete()
的注释中也有提到。
自定义关闭事件
一般在监听到 error 事件的时候,我们不建议调用EventSource.close();
来关闭前端 EventSource 对象,因为这样会跟自动重试相冲突,因此,我们可以自定义关闭事件。
在后端调用 complete 方法前,先发送一个类型为 complete 的事件,当然你也可以起别的名字。
1
2
3
4
5
|
// 开始关闭
// 建议发送自定义的关闭事件,来通知客户端关闭连接
emitter.send(SseEmitter.event().name("complete").data("close"));
// 标志 SSE 事件完成,关闭连接,连接会断开,这会触发前端的 error 事件
emitter.complete();
|
在前端中,在 error 事件中,不再调用EventSource.close();
,此外,再额外监听 complete
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// 异常处理
source.addEventListener('error', function (event) {
// handle error event
console.log("SSE 发生错误:", event)
// 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
// 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// source.close();
}, false);
// 注意,如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// 因此还是建议发送自定义的关闭事件,来通知客户端关闭连接。
// 默认情况下,服务器发来的数据,总是触发浏览器 EventSource 实例的 message 事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发 message 事件。
source.addEventListener('complete', function (e) {
source.close();
});
|
这样,就很完美了。此时客户端收到的 EventStream 为。PS:不知道最后两个事件 id 都为 9 会不会有影响。
自定义传递对象
在前面的例子中,我们往客户端推送的事件的数据类型都是字符串,那可以推送其他自定义的类型吗?答案是可以的。
首先在index.html
中添加连接
1
|
<a href="./ssepage_obj">测试 SSE 传输 Java 对象</a>
|
然后在SSEController
中添加控制器方法
1
2
3
4
5
|
@RequestMapping("/ssepage_obj")
public String ssepage_obj() {
//设置视图名称
return "ssepageobj.html";
}
|
在静态资源路径src\main\resources\static
下添加页面ssepageobj.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>首页</title>
</head>
<body>
<div>动态刷新:<span id="dynamic_content">还未接收到动态数据!</span></div>
</body>
<script>
let source = null;
if (window.EventSource) {
// 建立连接
source = new EventSource('http://localhost:8080/SSESimple/ssedata_obj');
setMessageInnerHTML("开始连接 SSE 服务端");
/**
* 连接一旦建立,就会触发 open 事件
* 另一种写法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML("成功建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
// 异常处理
source.addEventListener('error', function (event) {
// handle error event
console.log("SSE 发生错误:", event)
// 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
// 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// source.close();
}, false);
// 注意,如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// 因此还是建议发送自定义的关闭事件,来通知客户端关闭连接。
// 默认情况下,服务器发来的数据,总是触发浏览器 EventSource 实例的 message 事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发 message 事件。
source.addEventListener('complete', function (e) {
source.close();
});
// setTimeout(function () {
// source.close();
// }, 3000)
} else {
setMessageInnerHTML("你的浏览器不支持 SSE");
}
function setMessageInnerHTML(message) {
document.getElementById('dynamic_content').innerHTML = message;
}
</script>
</html>
|
然后添加自定义 SSE 消息对象类
@Data
为 Lombok 的注解
1
2
3
4
5
6
|
@Data
public class SSEMessage {
private String name;
private int age;
private LocalDateTime birthday = LocalDateTime.now();
}
|
再在SSEDataController
中添加后台接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
@GetMapping("/ssedata_obj")
public SseEmitter handleSSEObj() {
// 每一次请求都返回一个新的 SseEmitter 对象
SseEmitter emitter = new SseEmitter();
// 注册回调
// 当连接完成时,会触发 onCompletion 回调
emitter.onCompletion(() -> System.out.println("link complete!!"));
// 当连接报错时,会触发 onError 回调
emitter.onError(Throwable::printStackTrace);
// 当连接超时时,会触发 onTimeout 回调
emitter.onTimeout(() -> System.out.println("connection timeout!!"));
// 注意,不要使用 Executors.newSingleThreadExecutor 这样,每次用户请到达服务端都会创建一个核心线程数为 1 的线程池,执行完请求之后,核心线程会一直存在,这样的话,会造成线程的浪费
// 单线程写入
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
// 传递字符串
// SseEmitter.SseEventBuilder event = SseEmitter.event().id(String.valueOf(i)).name("message").data("SSE event #" + i);
// 传递自定义类型
SSEMessage message = new SSEMessage();
message.setName("SSE event #" + i);
SseEmitter.SseEventBuilder event = SseEmitter.event().id(String.valueOf(i)).name("message").data(message);
emitter.send(event);
Thread.sleep(1000);
}
// 开始关闭
// 建议发送自定义的关闭事件,来通知客户端关闭连接
emitter.send(SseEmitter.event().name("complete").data("close"));
// 标志 SSE 事件完成,关闭连接,连接会断开,这会触发前端的 error 事件
emitter.complete();
} catch (IOException e) {
// 检测到报错,关闭连接,当然如果报错原因就是连接断开了,则啥都不做直接返回
emitter.completeWithError(e);
} catch (InterruptedException e) {
// 线程中断
// throw new RuntimeException(e);
}
}).start();
return emitter;
}
|
启动项目之后,在前台页面中可以看到
为什么可以正常传输呢?根据我们在SseEmitter
的源码分析,我们可以知道,最终传递给客户端的 HTTP 消息的格式是MediaType.TEXT_EVENT_STREAM
,而ResponseBodyEmitterReturnValueHandler#HttpMessageConvertingHandler
的sendInternal
方法要求系统中存在一个HttpMessageConverter
,能够将我们传入的类型(可能是任意类型)转化成MediaType.TEXT_EVENT_STREAM
类型的 HTTP 消息传给客户端,但是我们并没有注册这样一个HttpMessageConverter
,那么实际是那个HttpMessageConverter
完成的转化呢?经过简单调试,我们发现实际默认注册了 8 个HttpMessageConverter
而最终负责将我们传入的类型(可能是任意类型)转化成MediaType.TEXT_EVENT_STREAM
类型的 HTTP 消息的是MappingJackson2HttpMessageConverter
关于为什么 SpringBoot 会自动注册MappingJackson2HttpMessageConverter
,以及如何手动注册自定义的HttpMessageConverter
,具体请看《HttpMessageConverters 自动配置类源码解析》
难怪看页面上的结果,那么像一个 JSON 字符串。
当然你也可以自定义一个HttpMessageConverter
,用来处理特定格式到成MediaType.TEXT_EVENT_STREAM
类型的 HTTP 消息的转化,例如SSEMessageConverter
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
/**
* 参考 StringHttpMessageConverter
*/
@Component
public class SSEMessageConverter extends AbstractHttpMessageConverter<SSEMessage> {
// 默认编码
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
public SSEMessageConverter() {
// super(DEFAULT_CHARSET, null);
}
@Override
protected boolean supports(Class<?> clazz) {
// 只支持处理 SSEMessage 类型的实例
return SSEMessage.class.isAssignableFrom(clazz);
}
@Override
protected SSEMessage readInternal(Class<? extends SSEMessage> clazz, HttpInputMessage inputMessage) throws IOException, HttpMessageNotReadableException {
// 读操作有待实现
return null;
}
@Override
protected void writeInternal(SSEMessage sseMessage, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException {
String content = myProtocolContent(sseMessage);
// 输出到响应的消息体
StreamUtils.copy(content, DEFAULT_CHARSET, outputMessage.getBody());
}
/**
* 自定义消息输出内容格式
*
* @param sseMessage
* @return
*/
private String myProtocolContent(SSEMessage sseMessage) {
// 当然你可以在这里选择其他的拼接方式,比如把所有的属性用英文逗号拼接,等等。
// 或者序列化为 JSON 字符串
return "自定义转换:"+sseMessage.toString();
}
@Override
protected void addDefaultHeaders(HttpHeaders headers, SSEMessage s, @Nullable MediaType type) throws IOException {
// 不重写 addDefaultHeaders 指定响应的 ContentType 消息头的话,默认的 addDefaultHeaders 会自动设置 ContentType 为自定义 MediaType,
// 浏览器会处理不了,会变成下载,所以这里需要设置 ContentType 消息头,设置成 MediaType.TEXT_PLAIN 是为了方便展示
// 指定类型的时候指定编码
headers.setContentType(new MediaType(MediaType.TEXT_PLAIN, DEFAULT_CHARSET));
super.addDefaultHeaders(headers, s, type);
}
@Override
protected Long getContentLength(SSEMessage message, @Nullable MediaType contentType) {
String str = myProtocolContent(message);
return (long) str.getBytes(DEFAULT_CHARSET).length;
}
}
|
再次调试,可以看到SSEMessageConverter
确实出现在了转化器列表中,而且排在MappingJackson2HttpMessageConverter
的前面,这意味着SSEMessageConverter
会优先生效
实际的结果:
可以看到,转化结果确实是自定义的HttpMessageConverter
转化的结果。
一个页面多个类型的事件使用同一个 SseEmitter 对象
在实际使用的过程中,一个页面中可能有多个需要异步处理的请求,比如订单信息的变更,比如聊天信息的条数,等等,这些信息属于不同的主题,而且SseEmitter
是支持单个对象传输不同主题的 SSE 事件的,而且因为SseEmitter
的 send 方法是线程安全的,因此,我们可以在多个线程中调用同一个SseEmitter
实例的 send 方法来推送消息,方案也很简单,一个主题的事件用一个线程推送即可。
首先在index.html
中添加连接
1
|
<a href="./ssepage_concurrent">测试 SSE 传输多类型事件</a>
|
然后在SSEController
中添加控制器方法
1
2
3
4
5
|
@RequestMapping("/ssepage_concurrent")
public String ssepageConcurrent() {
//设置视图名称
return "ssepage_concurrent.html";
}
|
在静态资源路径src\main\resources\static
下添加页面ssepage_concurrent.html
,在这个页面中,我们手动监听三个类型的事件topic1
、topic2
、topic3
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>首页</title>
</head>
<body>
<div>主题 1 动态刷新:<span id="dynamic_content_1">还未接收到动态数据!</span></div>
<div>主题 2 动态刷新:<span id="dynamic_content_2">还未接收到动态数据!</span></div>
<div>主题 3 动态刷新:<span id="dynamic_content_3">还未接收到动态数据!</span></div>
</body>
<script>
let source = null;
if (window.EventSource) {
// 建立连接
source = new EventSource('http://localhost:8080/SSESimple/ssedata_concurrent');
console.log("开始连接 SSE 服务端");
/**
* 连接一旦建立,就会触发 open 事件
* 另一种写法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
console.log("成功建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
// 后端指定的 ID 通过 e.lastEventId 获取
setMessageInnerHTML(e.lastEventId, e.data);
});
source.addEventListener('topic1', function (e) {
// 后端指定的 ID 通过 e.lastEventId 获取
setMessageInnerHTML(e.lastEventId, e.data);
});
source.addEventListener('topic2', function (e) {
// 后端指定的 ID 通过 e.lastEventId 获取
setMessageInnerHTML(e.lastEventId, e.data);
});
source.addEventListener('topic3', function (e) {
// 后端指定的 ID 通过 e.lastEventId 获取
setMessageInnerHTML(e.lastEventId, e.data);
});
// 异常处理
source.addEventListener('error', function (event) {
// handle error event
console.log("SSE 发生错误:", event)
// 服务端的 SseEmitter.complete; 方法,会触发这里的 error 事件
// 如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// source.close();
}, false);
// 注意,如果不调用 EventSource 对象的 close 方法,这个对象自动进行断线重连,持续对其 url 发起连接。
// 因此还是建议发送自定义的关闭事件,来通知客户端关闭连接。
// 默认情况下,服务器发来的数据,总是触发浏览器 EventSource 实例的 message 事件。开发者还可以自定义 SSE 事件,这种情况下,发送回来的数据不会触发 message 事件。
source.addEventListener('complete', function (e) {
source.close();
});
} else {
setMessageInnerHTML("你的浏览器不支持 SSE");
}
function setMessageInnerHTML(id, message) {
document.getElementById('dynamic_content_' + id).innerHTML = message;
}
</script>
</html>
|
再在SSEDataController
中添加后台接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
@GetMapping("/ssedata_concurrent")
public SseEmitter handleSSEConcurrent() {
SseEmitter emitter = new SseEmitter();
// 注册回调
emitter.onCompletion(() -> System.out.println("link complete!!"));
emitter.onError(Throwable::printStackTrace);
emitter.onTimeout(() -> System.out.println("connection timeout!!"));
// 在实际业务种,我们可以给每一种业务分配一个主题,并且用一个专门的线程来推送数据
for (int i = 1; i <= 3; i++) {
int finalI = i;
new Thread(() -> {
String topic = "topic" + finalI;
// 测试每个主题发送 10 条数据
for (int j = 0; j < 10; j++) {
String msg = " Topic " + finalI + " : SSE event #" + j;
SseEmitter.SseEventBuilder event = SseEmitter.event().id(String.valueOf(finalI)).name(topic).data(msg);
try {
emitter.send(event);
// 延时
Thread.sleep(800 + 50 * finalI * j);
} catch (IOException e) {
// 客户端主动关闭
emitter.completeWithError(e);
} catch (InterruptedException e) {
// 线程中断
// throw new RuntimeException(e);
}
}
}).start();
}
return emitter;
}
|
启动项目之后,在前台页面中可以看到下图这样的变换,为了让效果更直观,我们还给每个主题添加了不一样的延时
效果还是不错的。
在实际的生产实践中,应该是一个用户一个SseEmitter
对象,而不是一个请求一个SseEmitter
对象,最好是用一个Map
,来保存所有的SseEmitter
,key 为SessionID
,在Session
过期的时候清除对应的SseEmitter
,这个就懒得写了,后面用到了再说。