1. 前言
在上篇文章java实现websocket的五种方式, 我详细讲述了使用java实现websocket的五种方式.
五种方式都集中在服务器端的实现, 客户端我们使用的是一个在线测试工具进行测试的. 只有针对stomp这种方式我自己写了一个javascript版本的客户端. 但是随着项目的推进, 发现整个体系就像魔方或拼图一样, 在完整体系中还缺少了一块, 那就是java客户端去连接stomp websocket服务. 其作用是,在服务器端主动向客户端发送消息时, 通过stomp客户端连接stomp websocket服务, 将消息发布到某个topic, 最终被订阅消息的前端页面接收到. 三者之间的关系 java client -> stomp websocket服务 -> 前端页面. 采用的是消息订阅模式, 消息订阅并不是pull, 实际是push.
2. 目标
编写并配置一个stomp java client, 实现在服务器端程序任何一个需要向前端发送消息位置通过客户端, 而非与stomp websocket服务器端程序耦合的方式向前端发送消息.
3. 实现思路
- 首先需要stomp client相关依赖
- 构建stomp java client
- 发送消息到特定通道
- 利用stomp本身的消息传输机制传递到客户端
4. 引入相关依赖
1 2 3 4 5 6 7 8
| plugins { id 'java' id 'org.springframework.boot' version '2.7.7' id 'io.spring.dependency-management' version '1.0.15.RELEASE' id 'application' }
implementation 'org.springframework.boot:spring-boot-starter-websocket'
|
5. 构建stomp java client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import java.net.URI; import java.util.Scanner;
import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.simp.stomp.StompHeaders; import org.springframework.messaging.simp.stomp.StompSession; import org.springframework.messaging.simp.stomp.StompSessionHandler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.socket.client.standard.StandardWebSocketClient; import org.springframework.web.socket.messaging.WebSocketStompClient;
public class StompClient { public static void main(String[] args) throws Exception {
WebSocketStompClient stompClient = new WebSocketStompClient(new StandardWebSocketClient()); stompClient.setMessageConverter(new MappingJackson2MessageConverter()); stompClient.setInboundMessageSizeLimit(1024 * 1024); ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.afterPropertiesSet(); stompClient.setTaskScheduler(taskScheduler); StompSessionHandler customHandler = new CustomStompSessionHandler(); StompHeaders stompHeaders = new StompHeaders(); stompHeaders.add("username", "admin"); stompHeaders.add("password", "admin1"); URI uri = URI.create("ws://localhost:81/ws"); StompSession session = stompClient.connect(uri, null, stompHeaders, customHandler).get(); Scanner scanner = new Scanner(System.in); while (true) { System.out.print( "userXyz >> "); System.out.flush(); String userinput = scanner.nextLine(); ClientMessage message = new ClientMessage("userXyz", userinput); session.send("/app/shout", message); } } }
|
实现思路, 首先需要通过连接websocket服务器建立连接, 建立连接之前需要构建stompclient, 而构建stomclient又要依赖StandardWebSocketClient去连接webscoket server
建立连接时需要连接websocket的Endpoint即ws://localhost:81/ws. 连接建立成功后会返回session, 通过session对象就可以与服务端通讯了.
在连接服务器时我们还有传递一个StompSessionHandler负责处理连接异常, 传输异常, 以及连接建立后subsrible相应的channel.
本例中我们构建了一个简单的StompSessionHandler, 代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
import org.springframework.messaging.simp.stomp.*;
import java.lang.reflect.Type;
public class CustomStompSessionHandler extends StompSessionHandlerAdapter { public CustomStompSessionHandler(){ }
@Override public void afterConnected(final StompSession session, StompHeaders connectedHeaders) { System.out.println("StompHeaders: " + connectedHeaders.toString()); String destination = "/queue/notifications"; session.subscribe(destination, new StompFrameHandler() { @Override public Type getPayloadType(StompHeaders headers) { return byte[].class; } @Override public void handleFrame(StompHeaders headers, Object payload) { System.out.println(new String((byte[])payload)); } }); }
@Override public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) { System.out.println(exception.getMessage()); }
@Override public void handleTransportError(StompSession session, Throwable exception) { exception.printStackTrace(); System.out.println("transport error."); } }
|
6. 通过客户端向前端发送消息
上一节我们通过stompclient与服务器端建立了连接, 获得了session对象, 通过session对象, 我们就可以向服务器端相应的channel发送消息,
服务器端可以将消息转发消息到相应的web客户端, 这样就实现了端到端的webscoket通信. 完整代码可以参考上一节中main方法的最后几行.
这里重点摘抄其中的核心代码进行讲解.
1 2 3
| String userinput = scanner.nextLine(); ClientMessage message = new ClientMessage("userXyz", userinput); session.send("/app/shout", message);
|
在发送对象的使用还需要做序列化处理, 此处我们使用一个通用的message converter即MappingJackson2MessageConverter.
1
| stompClient.setMessageConverter(new MappingJackson2MessageConverter());
|
此处的ClientMessage是一个自定义对象, 为了节省大家动手的时间, 这里将代码附在下面. 这里将消息发送到了一个广播channel.
当然如果要向特定客户端发送消息, 还要在服务端设计相应的转发机制, 然后发送到特定的channel. 由于本篇文章的主要目的是讲解stomp java客户端发送消息, 为了使得简化目的, 没有使得案例更复杂, 有兴趣的读者可以在此基础上进行改进.
ClientMessage.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
public class ClientMessage { private String from; private String message;
public ClientMessage() { }
public ClientMessage(String from, String text) { this.from = from; this.message = text; }
public String getFrom() { return from; }
public void setFrom(String from) { this.from = from; }
public String getMessage() { return message; }
public void setMessage(String text) { this.message = text; } }
|
7. 参考文章
使用stomp的java客户端接收websocket数据
Spring Boot + WebSocket With STOMP Tutorial
Spring Websocket 中文文档
Java WebSocketStompClient类代码示例