概要
在项目开发中,使用Websocket订阅可以实现实时推送数据更新和变化。但是,这也存在一些痛点。首先,Websocket订阅需要对订阅对象进行管理,确保订阅的准确性和及时性。其次,订阅的消息需要经过充分过滤和处理,以避免无效消息和重复消息的出现。同时,在高并发的情况下,Websocket订阅也需要进行限流和队列管理,以保证系统的稳定性和可靠性。除此之外,Websocket订阅还需要支持多种协议和格式,以适应不同类型的数据推送和处理场景。因此,在项目中使用Websocket订阅,需要充分考虑这些问题,确保系统的稳定性、可靠性和及时性。
整体架构流程
在项目中,可以通过配置一个统一的Websocket入口来统一管理Websocket请求。这样,其他的Websocket请求就可以像API一样进行编写、调用和管理,使得项目的结构更加清晰和规范。同时,这种方式还能够实现Websocket请求的统一过滤和处理,提高系统的可靠性和稳定性。对于开发者来说,这也能够减少重复代码的编写,提高开发效率。因此,建议在项目中采用统一的Websocket入口,以优化系统的结构和性能。
技术细节
可以配置多个前缀的broker,根据需要映射到不同的路由,只是注解不一样,其他的都是和写api有一个逻辑
配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// registry.enableSimpleBroker("/topic");
// registry.setApplicationDestinationPrefixes("/app");
//基于内存的STOMP消息代理
registry.enableSimpleBroker("/queue", "/topic");
//基于RabbitMQ 的STOMP消息代理
/* registry.enableStompBrokerRelay("/queue", "/topic") .setRelayHost(host) .setRelayPort(port) .setClientLogin(userName) .setClientPasscode(password);*/
// registry.setApplicationDestinationPrefixes("/app", "/foo");
registry.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// registry.addEndpoint("/gs-guide-websocket").withSockJS();
// registry.addEndpoint("/webSocketServer").setAllowedOrigins("*").withSockJS();
registry.addEndpoint("/webSocketServer").withSockJS();
}
}
调用:
@MessageMapping("/app/sendTest")
// @SendTo("/topic/subscribeTest")
public ServerMessage sendDemo(ClientMessage message, StompHeaderAccessor stompHeaderAccessor) {
Principal principal = stompHeaderAccessor.getUser();
String name = principal.getName();
logger.info("认证的名字是:{},收到的消息是:{}", name, message);
logger.info("接收到了信息" + message.getName());
return new ServerMessage("你发送的消息为:" + message.getName());
}
@SubscribeMapping("/subscribeTest")
@SendTo("/topic/subscribeTest")
public ServerMessage sub() {
logger.info("XXX用户订阅了我。。。");
template.convertAndSend("/topic/subscribeTest", "XXX用户订阅了房间");
return new ServerMessage("感谢你订阅了我。。。");
}
前端:
// 建立连接对象(还未发起连接)
var socket = new SockJS("http://localhost:8080/webSocketServer");
// 获取 STOMP 子协议的客户端对象
var stompClient = Stomp.over(socket);
// 向服务器发起websocket连接并发送CONNECT帧
stompClient.connect(
{
},
function connectCallback(frame) {
var userId=1;
// 连接成功时(服务器响应 CONNECTED 帧)的回调方法
setMessageInnerHTML("连接成功");
stompClient.subscribe('/user/' + userId +'/queue/light', function (response) {
setMessageInnerHTML("已成功订阅 "+'/user/' + userId +'/queue/light');
var returnData = JSON.parse(response.body);
setMessageInnerHTML('/user/' + userId +'/queue/light'+" 你接收到的消息为:" + returnData.responseMessage);
});
},
function errorCallBack(error) {
// 连接失败时(服务器响应 ERROR 帧)的回调方法
setMessageInnerHTML("连接失败");
}
);
//发送消息
function send() {
var message = document.getElementById('text').value;
var messageJson = JSON.stringify({
"name": message });
stompClient.send("/app/sendTest", {
}, messageJson);
setMessageInnerHTML("/app/sendTest 你发送的消息:" + message);
}
//订阅消息
function subscribe1() {
stompClient.subscribe('/topic/subscribeTest', function (response) {
// stompClient.subscribe('/ops/home/monitor/page', function (response) {
setMessageInnerHTML("已成功订阅/topic/subscribeTest");
console.log(response)
debugger
// var returnData = JSON.parse(response.body);
setMessageInnerHTML("/ops/home/monitor/page 你接收到的消息为:" + response);
});
}
这里面有几个注解:
@MessageMapping是Spring框架中的一个注解,它通常用于定义方法级别的消息处理程序,当消息到达时,该方法将被调用。@MessageMapping注解可以放置在方法上,指定该方法要处理哪些目的地的消息。
例如,当客户端发送一个消息到“/hello”目的地时,@MessageMapping(“/hello”)注解会告诉Spring框架,当有消息到达“/hello”目的地时,需要调用带有@MessageMapping(“/hello”)注解的方法。
@MessageMapping("/hello")
public void handle(HelloMessage message) {
// 处理消息
}
当带有@MessageMapping注解的方法被调用时,Spring框架会自动将消息体解码为方法参数,并将响应结果编码为消息体返回给客户端。对于一些需要对接收到的消息进行处理并返回响应的场景,@MessageMapping注解是非常有用的。
@SubscribeMapping
是 Spring 框架中用来处理 WebSocket 订阅请求的注解。使用该注解可以让开发者将 WebSocket 订阅请求与订阅后的消息处理方法直接绑定在一起,从而简化处理流程,降低代码复杂度。
在使用 @SubscribeMapping
注解时,开发者需要将其添加到某个 Controller 中的方法上,并指定订阅的目的地(destination)。例如:
@Controller
public class WebSocketController {
@MessageMapping("/chat/{roomId}")
public void handleChatMessage(@DestinationVariable String roomId, ChatMessage message) {
// 处理聊天消息
}
@SubscribeMapping("/chat/{roomId}")
public List<ChatMessage> subscribeChatMessages(@DestinationVariable String roomId) {
// 处理订阅请求,并返回历史消息列表
}
}
上述代码中,handleChatMessage
方法用来处理 WebSocket 发送到 /chat/{roomId}
目的地的消息,subscribeChatMessages
方法用来处理针对同样的目的地的订阅请求,并返回历史消息列表。
通过使用 @SubscribeMapping
注解,开发者可以将订阅请求处理和消息处理直接绑定在一起,从而降低代码复杂度,提高开发效率。
@SendTo是Spring框架中的一个注解,用于发送消息到指定的目的地。它通常用于定义方法级别的消息处理程序,当消息到达时,该方法将被调用。@SendTo注解可以放置在方法上,指定该方法要将处理结果发送到哪个目的地。例如:
@MessageMapping("/hello")
@SendTo("/topic/greetings")
public Greeting greeting(HelloMessage message) throws Exception {
Thread.sleep(1000); // simulated delay
return new Greeting("Hello, " + message.getName() + "!");
}
在上面的代码中,当客户端发送一个消息到“/hello”目的地时,greeting()方法将被调用。处理完消息后,该方法将会把处理结果发送到“/topic/greetings”目的地,通知所有已订阅该目的地的客户端。
小结
Spring STOMP是一种基于WebSocket协议的消息传输协议,它提供了一种简单的方式来实现实时Web应用程序。下面是Spring STOMP的使用总结:
- 配置WebSocket支持
在Spring配置文件中,需要通过@EnableWebSocketMessageBroker注解启用WebSocket消息代理。
- 配置STOMP端点
可以使用registerStompEndpoints()方法来配置STOMP端点,用于接收来自客户端的WebSocket连接请求。
这里注册了一个名为“/ws”的STOMP端点,并使用SockJS子协议。SockJS是一个WebSocket协议的后备协议,它可以在WebSocket不可用时提供WebSocket-like的体验。
- 配置消息处理程序
在Spring框架中,可以使用@MessageMapping注解来声明方法级别的消息处理程序。当有消息到达时,带有@MessageMapping注解的方法将被调用。
- 配置消息代理
Spring STOMP提供了一个内置的消息代理,可以使用配置@EnableWebSocketMessageBroker来启用。
这里启用了一个简单的消息代理,并配置了一个目的地为“/topic”。当有消息到达目的地时,代理将把消息广播给所有订阅者。
- 发送和接收消息
可以使用STOMP客户端来发送和接收消息。例如,在客户端代码中,可以使用StompJS库来发送消息:
在Spring应用程序中,可以使用@SendTo注解来实现在处理消息时发送响应消息。
这里带有@SendTo注解的方法将把结果发送到“/topic/greetings”目的地,可以通过在客户端订阅该目的地来接收结果。
可以通过它们构建实时Web应用程序。