最近在做一个项目,涉及到前后端的消息同步、推送,进而我们选择使用webSocket的方案进行实现,但是当websocket服务端部署在多个实例下,会出现前端socket意外断开导致无法收到消息的情况。手下我们先说我们的实现方案:
1.客户端(通过websocket,与后端的wensocket服务端建立链接,)请注意因为后端为多实例使用nginx进行负载均衡,需要注意的是ngx按照如下配置,负责无法进行正常的链接
server {
listen 86;
server_name 127.0.0.1;
location /websocket/222 {
proxy_connect_timeout 20s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
proxy_redirect off;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host:$server_port;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_pass http://websocket_backend;
}
proxy_set_header Host $host:$server_port;这个配置很重要,如果不配置会出现websocket无法链接。
2.注意服务端接收到客户端的心跳消息,不能使用异步推送的方式,这样会导致链接断开
@OnMessage public void onMessage(String message, Session session) {System.out.println("接收到消息:" + message);// 处理收到的消息// 可以通过session发送消息给客户端try {Map map = new HashMap();map.put("Received: ","8092");//session.getBasicRemote().sendText("Received: " + message);JSONObject jsonObject = new JSONObject(map);//session.getBasicRemote().sendText("Received: " + message);session.getBasicRemote().sendObject(jsonObject);//session.getBasicRemote().sendObject(map);} catch (Exception e) {e.printStackTrace();} }
3。因为使用的mq进行广播消息,使用exchange的机制,可以保证所有消费者只要是绑定到了对应的exchange上的队列都能消费,来解决多实例下的消息推送问题
@RabbitHandler @RabbitListener(bindings = @QueueBinding(value = @Queue(),exchange = @Exchange(value = "com.jihaixiang.test",type = ExchangeTypes.FANOUT)), concurrency = "10") public void receiveMessage(String message) {System.out.println("Received message3: " + message); }
4.在往mq中发送消息的时候需要保持routekeying为空,这样所有消费者都能收到消息
rabbitTemplate.convertAndSend("com.jihaixiang.test", "", "测试消息");
5.创建exchange
package com.jihaixiang.bootmybatis.configure;import org.springframework.amqp.core.FanoutExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** @author jihaixiang* @date 2024-03-12 22:29*/ @Configuration public class RabbitConfig {/*** 定义交换机* @return*/@Beanpublic FanoutExchange orderWebsocketSendMessageExchange() {return new FanoutExchange("com.jihaixiang.test");}}