使用stomp实现服务器端向web客户端推送数据

1. 前言

在上篇文章java实现websocket的五种方式, 我详细讲述了使用java实现websocket的五种方式.
五种方式都集中在服务器端的实现, 客户端我们使用的是一个在线测试工具进行测试的. 只有针对stomp这种方式我自己写了一个javascript版本的客户端. 但是随着项目的推进, 发现整个体系就像魔方或拼图一样, 在完整体系中还缺少了一块, 那就是java客户端去连接stomp websocket服务. 其作用是,在服务器端主动向客户端发送消息时, 通过stomp客户端连接stomp websocket服务, 将消息发布到某个topic, 最终被订阅消息的前端页面接收到. 三者之间的关系 java client -> stomp websocket服务 -> 前端页面. 采用的是消息订阅模式, 消息订阅并不是pull, 实际是push.

2. 目标

编写并配置一个stomp java client, 实现在服务器端程序任何一个需要向前端发送消息位置通过客户端, 而非与stomp websocket服务器端程序耦合的方式向前端发送消息.

3. 实现思路

  1. 首先需要stomp client相关依赖
  2. 构建stomp java client
  3. 发送消息到特定通道
  4. 利用stomp本身的消息传输机制传递到客户端

4. 引入相关依赖

1
2
3
4
5
6
7
8
plugins {
id 'java'
id 'org.springframework.boot' version '2.7.7'
id 'io.spring.dependency-management' version '1.0.15.RELEASE'
id 'application'
}

implementation 'org.springframework.boot:spring-boot-starter-websocket'

5. 构建stomp java client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

import java.net.URI;
import java.util.Scanner;

import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;

public class StompClient {
public static void main(String[] args) throws Exception {

WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient());
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
// 接收大小限制
stompClient.setInboundMessageSizeLimit(1024 * 1024);
// 处理心跳
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
// for heartbeats
stompClient.setTaskScheduler(taskScheduler);
StompSessionHandler customHandler = new CustomStompSessionHandler();
// 可以发送请求头
StompHeaders stompHeaders = new StompHeaders();
stompHeaders.add("username", "admin");
stompHeaders.add("password", "admin1");
URI uri = URI.create("ws://localhost:81/ws");
StompSession session = stompClient.connect(uri, null, stompHeaders, customHandler).get();
// Don't close immediately.
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print( "userXyz >> ");
System.out.flush();
String userinput = scanner.nextLine();
ClientMessage message = new ClientMessage("userXyz", userinput);
session.send("/app/shout", message);
}
}
}

实现思路, 首先需要通过连接websocket服务器建立连接, 建立连接之前需要构建stompclient, 而构建stomclient又要依赖StandardWebSocketClient去连接webscoket server
建立连接时需要连接websocket的Endpoint即ws://localhost:81/ws. 连接建立成功后会返回session, 通过session对象就可以与服务端通讯了.
在连接服务器时我们还有传递一个StompSessionHandler负责处理连接异常, 传输异常, 以及连接建立后subsrible相应的channel.

本例中我们构建了一个简单的StompSessionHandler, 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43


import org.springframework.messaging.simp.stomp.*;

import java.lang.reflect.Type;

public class CustomStompSessionHandler extends StompSessionHandlerAdapter {

public CustomStompSessionHandler(){
}

@Override
public void afterConnected(final StompSession session, StompHeaders connectedHeaders) {
System.out.println("StompHeaders: " + connectedHeaders.toString());
//订阅地址,发送端前面没有/user
String destination = "/queue/notifications";
//订阅消息
session.subscribe(destination, new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return byte[].class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
//todo 只能接收到byte[]数组,没时间研究原因
System.out.println(new String((byte[])payload));
}
});
}

@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload,
Throwable exception) {
System.out.println(exception.getMessage());
}

@Override
public void handleTransportError(StompSession session, Throwable exception) {
exception.printStackTrace();
System.out.println("transport error.");
}
}

6. 通过客户端向前端发送消息

上一节我们通过stompclient与服务器端建立了连接, 获得了session对象, 通过session对象, 我们就可以向服务器端相应的channel发送消息,
服务器端可以将消息转发消息到相应的web客户端, 这样就实现了端到端的webscoket通信. 完整代码可以参考上一节中main方法的最后几行.
这里重点摘抄其中的核心代码进行讲解.

1
2
3
String userinput = scanner.nextLine();
ClientMessage message = new ClientMessage("userXyz", userinput);
session.send("/app/shout", message);

在发送对象的使用还需要做序列化处理, 此处我们使用一个通用的message converter即MappingJackson2MessageConverter.

1
stompClient.setMessageConverter(new MappingJackson2MessageConverter());

此处的ClientMessage是一个自定义对象, 为了节省大家动手的时间, 这里将代码附在下面. 这里将消息发送到了一个广播channel.
当然如果要向特定客户端发送消息, 还要在服务端设计相应的转发机制, 然后发送到特定的channel. 由于本篇文章的主要目的是讲解stomp java客户端发送消息, 为了使得简化目的, 没有使得案例更复杂, 有兴趣的读者可以在此基础上进行改进.

ClientMessage.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31


public class ClientMessage {
private String from;
private String message;

public ClientMessage() {
}

public ClientMessage(String from, String text) {
this.from = from;
this.message = text;
}

public String getFrom() {
return from;
}

public void setFrom(String from) {
this.from = from;
}

public String getMessage() {
return message;
}

public void setMessage(String text) {
this.message = text;
}
}

7. 参考文章

使用stomp的java客户端接收websocket数据

Spring Boot + WebSocket With STOMP Tutorial

Spring Websocket 中文文档

Java WebSocketStompClient类代码示例