# scaffold 项目之 websocket 实时通信
# 功能简介
WebSocket 是一种网络通信协议,提供了在单个 TCP 连接上进行全双工通信的能力。它是 HTML5 规范的一部分,被设计用来替代轮询和长轮询等传统技术,以实现服务器与客户端之间的实时双向通信。WebSocket 的主要优势在于:
- 全双工通信:服务器和客户端可以在任何时刻互相发送数据,不需要像 HTTP 请求那样等待对方完成响应。
- 持久连接:一旦 WebSocket 连接建立,它会保持开放状态,直到客户端或服务器决定关闭连接。
- 减少开销:WebSocket 协议的头部较小,减少了数据传输的开销。
- 更好的实时性:相比于 HTTP 轮询,WebSocket 可以更快地推送数据到客户端。
WebSocket 的通信过程通常包括以下几个步骤:
- 握手:客户端通过发送一个 Upgrade 请求到服务器来初始化 WebSocket 连接。如果服务器支持 WebSocket,它将响应客户端并完成握手过程,之后通信协议将从 HTTP 切换到 WebSocket。
- 数据传输:一旦 WebSocket 连接建立,客户端和服务器就可以通过这个连接发送和接收数据帧。
- 心跳检测:为了保持连接的活跃并检测连接是否仍然有效,客户端和服务器可以定期发送心跳(ping/pong)消息。
- 关闭连接:当通信结束时,任何一方都可以发送一个关闭帧来关闭 WebSocket 连接。
WebSocket 在许多实时通信场景中都非常有用,例如:
- 在线游戏:实时更新游戏状态。
- 聊天应用:实时发送和接收消息。
- 股票行情:实时更新股票价格。
- 协作工具:实时同步文档编辑。
使用 WebSocket 时,开发者需要注意安全性问题,比如使用 wss(WebSocket Secure)来保证数据传输的加密,以及处理可能的拒绝服务攻击(DoS)。此外,WebSocket 也需要良好的错误处理和重连策略,以应对网络不稳定的情况。
使用网络框架 Netty 也可以实现 WebSocket, 但是 Netty 学习成本比较大,使用现有的 WebSocket 框架可以满足大部分需求
# 封装组件
在原有 WebSocket 的基础上二次封装,实现更加便捷的 WebSocket验证
、 Session管理
、 WebSocket集群的广播
。
# 基于 Token 身份认证
当页面向服务器建立连接的时候通过 token 认证,通信带上 token 参数,列如 ws://127.0.0.1:18081/ws?token=xxx
。
由于 websocket 是基于 http 建立连接的所以可以直接走原本系统的 身份认证过滤器
来校验 token。
为什么不和其他接口一样 在请求头携带 token 呢? 这是因为 websocket 不支持在 header 携带内容
工具类代码:
package com.tz.scaffold.framework.websocket.core.util; | |
import com.tz.scaffold.framework.security.core.LoginUser; | |
import org.springframework.web.socket.WebSocketSession; | |
import java.util.Map; | |
/** | |
* <p> Project: scaffold - WebSocketFrameworkUtils </p> | |
* | |
* 专属于 web socket 包的工具类 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
public class WebSocketFrameworkUtils { | |
public static final String ATTRIBUTE_LOGIN_USER = "LOGIN_USER"; | |
/** | |
* 设置当前用户 | |
* | |
* @param loginUser 登录用户 | |
* @param attributes Session | |
*/ | |
public static void setLoginUser(LoginUser loginUser, Map<String, Object> attributes) { | |
attributes.put(ATTRIBUTE_LOGIN_USER, loginUser); | |
} | |
/** | |
* 获取当前用户 | |
* | |
* @return 当前用户 | |
*/ | |
public static LoginUser getLoginUser(WebSocketSession session) { | |
return (LoginUser) session.getAttributes().get(ATTRIBUTE_LOGIN_USER); | |
} | |
/** | |
* 获得当前用户的编号 | |
* | |
* @return 用户编号 | |
*/ | |
public static Long getLoginUserId(WebSocketSession session) { | |
LoginUser loginUser = getLoginUser(session); | |
return loginUser != null ? loginUser.getId() : null; | |
} | |
/** | |
* 获得当前用户的类型 | |
* | |
* @return 用户编号 | |
*/ | |
public static Integer getLoginUserType(WebSocketSession session) { | |
LoginUser loginUser = getLoginUser(session); | |
return loginUser != null ? loginUser.getUserType() : null; | |
} | |
/** | |
* 获得当前用户的租户编号 | |
* | |
* @param session Session | |
* @return 租户编号 | |
*/ | |
public static Long getTenantId(WebSocketSession session) { | |
LoginUser loginUser = getLoginUser(session); | |
return loginUser != null ? loginUser.getTenantId() : null; | |
} | |
} |
认证流程代码
当通过
身份认证过滤器
后表示改 token 是正确的,成功后将登陆用户管理起来,在session管理器
中
package com.tz.scaffold.framework.websocket.core.security; | |
import com.tz.scaffold.framework.security.core.LoginUser; | |
import com.tz.scaffold.framework.security.core.filter.TokenAuthenticationFilter; | |
import com.tz.scaffold.framework.security.core.util.SecurityFrameworkUtils; | |
import com.tz.scaffold.framework.websocket.core.util.WebSocketFrameworkUtils; | |
import org.springframework.http.server.ServerHttpRequest; | |
import org.springframework.http.server.ServerHttpResponse; | |
import org.springframework.web.socket.WebSocketHandler; | |
import org.springframework.web.socket.WebSocketSession; | |
import org.springframework.web.socket.server.HandshakeInterceptor; | |
import java.util.Map; | |
/** | |
* <p> Project: scaffold - LoginUserHandshakeInterceptor </p> | |
* | |
* 登录用户的 {@link HandshakeInterceptor} 实现类 | |
* <p> | |
* 流程如下: | |
* <li> | |
* 1. 前端连接 websocket 时,会通过拼接?token={token} 到 ws:// 连接后,这样它可以被 {@link TokenAuthenticationFilter} 所认证通过 | |
* <li> | |
* 2. {@link LoginUserHandshakeInterceptor} 负责把 {@link LoginUser} 添加到 {@link WebSocketSession} 中 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
public class LoginUserHandshakeInterceptor implements HandshakeInterceptor { | |
// 在处理连接调用的方法 | |
// 是继续握手(true)还是中止(false) | |
@Override | |
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, | |
WebSocketHandler wsHandler, Map<String, Object> attributes) { | |
LoginUser loginUser = SecurityFrameworkUtils.getLoginUser(); | |
if (loginUser != null) { | |
WebSocketFrameworkUtils.setLoginUser(loginUser, attributes); | |
} | |
return true; | |
} | |
@Override | |
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, | |
WebSocketHandler wsHandler, Exception exception) { | |
// do nothing | |
} | |
} |
# Session 会话管理
每一个前端和后端建立连接后,对应后端一个 websocket session 会话对象。 由于后续需要向对应的 websocket session 发送消息,所以需要管理起来。
主要是通过
WebSocketHandlerDecorator
的建立连接和销毁连接前的执行添加和移除 session
代码:
package com.tz.scaffold.framework.websocket.core.session; | |
import org.springframework.web.socket.CloseStatus; | |
import org.springframework.web.socket.WebSocketHandler; | |
import org.springframework.web.socket.WebSocketSession; | |
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; | |
import org.springframework.web.socket.handler.WebSocketHandlerDecorator; | |
/** | |
* <p> Project: scaffold - WebSocketSessionHandlerDecorator </p> | |
* | |
* {@link WebSocketHandler} 的装饰类,实现了以下功能: | |
* <p> | |
* <li> | |
* 1. {@link WebSocketSession} 连接或关闭时,使用 {@link #sessionManager} 进行管理 | |
* <li> | |
* 2. 封装 {@link WebSocketSession} 支持并发操作 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
public class WebSocketSessionHandlerDecorator extends WebSocketHandlerDecorator { | |
/** | |
* 发送时间的限制,单位:毫秒 | |
*/ | |
private static final Integer SEND_TIME_LIMIT = 1000 * 5; | |
/** | |
* 发送消息缓冲上线,单位:bytes | |
*/ | |
private static final Integer BUFFER_SIZE_LIMIT = 1024 * 100; | |
private final WebSocketSessionManager sessionManager; | |
public WebSocketSessionHandlerDecorator(WebSocketHandler delegate, | |
WebSocketSessionManager sessionManager) { | |
super(delegate); | |
this.sessionManager = sessionManager; | |
} | |
// 建立连接被调用 | |
@Override | |
public void afterConnectionEstablished(WebSocketSession session) { | |
// 实现 session 支持并发,可参考 https://blog.csdn.net/abu935009066/article/details/131218149 | |
session = new ConcurrentWebSocketSessionDecorator(session, SEND_TIME_LIMIT, BUFFER_SIZE_LIMIT); | |
// 添加到 WebSocketSessionManager 中 | |
sessionManager.addSession(session); | |
} | |
@Override | |
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) { | |
sessionManager.removeSession(session); | |
} | |
} |
package com.tz.scaffold.framework.websocket.core.session; | |
import org.springframework.web.socket.WebSocketSession; | |
import java.util.Collection; | |
/** | |
* <p> Project: scaffold - WebSocketSessionManager </p> | |
* | |
* {@link WebSocketSession} 管理器的接口 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
public interface WebSocketSessionManager { | |
/** | |
* 添加 Session | |
* | |
* @param session Session | |
*/ | |
void addSession(WebSocketSession session); | |
/** | |
* 移除 Session | |
* | |
* @param session Session | |
*/ | |
void removeSession(WebSocketSession session); | |
/** | |
* 获得指定编号的 Session | |
* | |
* @param id Session 编号 | |
* @return Session | |
*/ | |
WebSocketSession getSession(String id); | |
/** | |
* 获得指定用户类型的 Session 列表 | |
* | |
* @param userType 用户类型 | |
* @return Session 列表 | |
*/ | |
Collection<WebSocketSession> getSessionList(Integer userType); | |
/** | |
* 获得指定用户编号的 Session 列表 | |
* | |
* @param userType 用户类型 | |
* @param userId 用户编号 | |
* @return Session 列表 | |
*/ | |
Collection<WebSocketSession> getSessionList(Integer userType, Long userId); | |
} |
package com.tz.scaffold.framework.websocket.core.session; | |
import cn.hutool.core.collection.CollUtil; | |
import com.tz.scaffold.framework.security.core.LoginUser; | |
import com.tz.scaffold.framework.tenant.core.context.TenantContextHolder; | |
import com.tz.scaffold.framework.websocket.core.util.WebSocketFrameworkUtils; | |
import org.springframework.web.socket.WebSocketSession; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.CopyOnWriteArrayList; | |
/** | |
* <p> Project: scaffold - WebSocketSessionManagerImpl </p> | |
* | |
* 默认的 {@link WebSocketSessionManager} 实现类 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
public class WebSocketSessionManagerImpl implements WebSocketSessionManager { | |
/** | |
* id 与 WebSocketSession 映射 | |
* | |
* key:Session 编号 | |
*/ | |
private final ConcurrentMap<String, WebSocketSession> idSessions = new ConcurrentHashMap<>(); | |
/** | |
* user 与 WebSocketSession 映射 | |
* | |
* key1:用户类型 | |
* key2:用户编号 | |
*/ | |
private final ConcurrentMap<Integer, ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>>> userSessions | |
= new ConcurrentHashMap<>(); | |
@Override | |
public void addSession(WebSocketSession session) { | |
// 添加到 idSessions 中 | |
idSessions.put(session.getId(), session); | |
// 添加到 userSessions 中 | |
LoginUser user = WebSocketFrameworkUtils.getLoginUser(session); | |
if (user == null) { | |
return; | |
} | |
ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType()); | |
if (userSessionsMap == null) { | |
userSessionsMap = new ConcurrentHashMap<>(); | |
if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) { | |
userSessionsMap = userSessions.get(user.getUserType()); | |
} | |
} | |
CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId()); | |
if (sessions == null) { | |
sessions = new CopyOnWriteArrayList<>(); | |
if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) { | |
sessions = userSessionsMap.get(user.getId()); | |
} | |
} | |
sessions.add(session); | |
} | |
@Override | |
public void removeSession(WebSocketSession session) { | |
// 移除从 idSessions 中 | |
idSessions.remove(session.getId(), session); | |
// 移除从 idSessions 中 | |
LoginUser user = WebSocketFrameworkUtils.getLoginUser(session); | |
if (user == null) { | |
return; | |
} | |
ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType()); | |
if (userSessionsMap == null) { | |
return; | |
} | |
CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId()); | |
sessions.removeIf(session0 -> session0.getId().equals(session.getId())); | |
if (CollUtil.isEmpty(sessions)) { | |
userSessionsMap.remove(user.getId(), sessions); | |
} | |
} | |
@Override | |
public WebSocketSession getSession(String id) { | |
return idSessions.get(id); | |
} | |
@Override | |
public Collection<WebSocketSession> getSessionList(Integer userType) { | |
ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType); | |
if (CollUtil.isEmpty(userSessionsMap)) { | |
return new ArrayList<>(); | |
} | |
// 避免扩容 | |
LinkedList<WebSocketSession> result = new LinkedList<>(); | |
Long contextTenantId = TenantContextHolder.getTenantId(); | |
for (List<WebSocketSession> sessions : userSessionsMap.values()) { | |
if (CollUtil.isEmpty(sessions)) { | |
continue; | |
} | |
// 特殊:如果租户不匹配,则直接排除 | |
if (contextTenantId != null) { | |
Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0)); | |
if (!contextTenantId.equals(userTenantId)) { | |
continue; | |
} | |
} | |
result.addAll(sessions); | |
} | |
return result; | |
} | |
@Override | |
public Collection<WebSocketSession> getSessionList(Integer userType, Long userId) { | |
ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType); | |
if (CollUtil.isEmpty(userSessionsMap)) { | |
return new ArrayList<>(); | |
} | |
CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(userId); | |
return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>(); | |
} | |
} |
# Message 消息格式
主要是处理前端发送给后端的消息类型,对应不同的类型进行不同的处理,前端发送给后端的消息类型通常取决于具体的业务需求。以下是一些常见的业务场景及其对应的消息类型,例如:
- 聊天消息(文本消息):
- 最常见的应用之一是聊天应用,前端发送文本消息给后端,后端处理后再转发给其他用户或存储聊天记录。
- 状态更新(文本消息):
- 用户的状态更新,如在线、离线、忙碌等,通常以文本或 JSON 格式发送。
- 用户操作(文本消息 / JSON):
- 用户在前端界面上执行的操作,比如点击按钮、提交表单等,这些操作可以封装成 JSON 对象发送给后端。
- 文件传输(二进制消息):
- 在需要传输文件(如图片、视频、文档等)的场景中,前端会将文件转换为二进制数据并通过 WebSocket 发送。
- 实时游戏数据(文本消息 / JSON):
- 对于实时多人游戏,玩家的位置、得分、游戏状态等信息需要实时同步,这些数据通常以 JSON 格式发送。
- 监控数据(文本消息 / JSON):
- 在物联网或远程监控系统中,传感器数据或监控信息可以通过 WebSocket 实时发送给后端服务器。
- 交易信息(文本消息 / JSON):
- 金融交易系统中,交易请求和确认信息可以通过 WebSocket 发送,以实现快速的交易处理。
- 实时分析数据(文本消息 / JSON):
- 对于需要实时数据分析的应用,如股票市场分析,前端可以发送查询请求,后端返回分析结果。
- 控制命令(文本消息 / JSON):
- 在智能家居或远程控制系统中,用户可以通过 WebSocket 发送控制命令来操作设备。
- 错误报告和日志(文本消息 / JSON):
- 前端可以将错误信息和日志以文本或 JSON 格式发送给后端,以便于问题诊断和系统监控。
- 认证和授权(文本消息 / JSON):
- 在需要用户登录的应用中,前端可能需要发送认证信息(如用户名和密码)给后端,以及处理授权相关的请求。
- 配置更新(文本消息 / JSON):
- 前端应用可能需要从后端获取配置信息,或者发送配置更新请求给后端。
代码:
package com.tz.scaffold.framework.websocket.core.message; | |
import com.tz.scaffold.framework.websocket.core.listener.WebSocketMessageListener; | |
import lombok.Data; | |
import java.io.Serializable; | |
/** | |
* <p> Project: scaffold - JsonWebSocketMessage </p> | |
* | |
* JSON 格式的 WebSocket 消息帧 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@Data | |
public class JsonWebSocketMessage implements Serializable { | |
/** | |
* 消息类型 | |
* | |
* 目的:用于分发到对应的 {@link WebSocketMessageListener} 实现类 | |
*/ | |
private String type; | |
/** | |
* 消息内容 | |
* | |
* 要求 JSON 对象 | |
*/ | |
private String content; | |
} |
通过实现
WebSocketMessageListener
接口,自己定义对应类型的处理逻辑
package com.tz.scaffold.framework.websocket.core.listener; | |
import com.tz.scaffold.framework.websocket.core.message.JsonWebSocketMessage; | |
import org.springframework.web.socket.WebSocketSession; | |
/** | |
* <p> Project: scaffold - WebSocketMessageListener </p> | |
* | |
* WebSocket 消息监听器接口 | |
* <p> | |
* 目的:前端发送消息给后端后,处理对应 {@link #getType ()} 类型的消息 | |
* | |
* @param <T> 泛型,消息类型 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
public interface WebSocketMessageListener<T> { | |
/** | |
* 处理消息 | |
* | |
* @param session Session | |
* @param message 消息 | |
*/ | |
void onMessage(WebSocketSession session, T message); | |
/** | |
* 获得消息类型 | |
* | |
* @see JsonWebSocketMessage#getType () | |
* @return 消息类型 | |
*/ | |
String getType(); | |
} |
package com.tz.scaffold.framework.websocket.core.handler; | |
import cn.hutool.core.util.StrUtil; | |
import cn.hutool.core.util.TypeUtil; | |
import com.tz.scaffold.framework.common.util.json.JsonUtils; | |
import com.tz.scaffold.framework.tenant.core.util.TenantUtils; | |
import com.tz.scaffold.framework.websocket.core.listener.WebSocketMessageListener; | |
import com.tz.scaffold.framework.websocket.core.message.JsonWebSocketMessage; | |
import com.tz.scaffold.framework.websocket.core.util.WebSocketFrameworkUtils; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.web.socket.TextMessage; | |
import org.springframework.web.socket.WebSocketHandler; | |
import org.springframework.web.socket.WebSocketSession; | |
import org.springframework.web.socket.handler.TextWebSocketHandler; | |
import java.lang.reflect.Type; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Objects; | |
import java.util.function.Consumer; | |
/** | |
* <p> Project: scaffold - JsonWebSocketMessageHandler </p> | |
* | |
* JSON 格式 {@link WebSocketHandler} 实现类 | |
* <p> | |
* 基于 {@link JsonWebSocketMessage#getType ()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@Slf4j | |
public class JsonWebSocketMessageHandler extends TextWebSocketHandler { | |
/** | |
* type 与 WebSocketMessageListener 的映射 | |
*/ | |
private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>(); | |
@SuppressWarnings({"rawtypes", "unchecked"}) | |
public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) { | |
listenersList.forEach((Consumer<WebSocketMessageListener>) | |
listener -> listeners.put(listener.getType(), listener)); | |
} | |
@Override | |
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { | |
// 1.1 空消息,跳过 | |
if (message.getPayloadLength() == 0) { | |
return; | |
} | |
// 1.2 ping 心跳消息,直接返回 pong 消息。 | |
if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) { | |
session.sendMessage(new TextMessage("pong")); | |
return; | |
} | |
// 2.1 解析消息 | |
try { | |
JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class); | |
if (jsonMessage == null) { | |
log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload()); | |
return; | |
} | |
if (StrUtil.isEmpty(jsonMessage.getType())) { | |
log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload()); | |
return; | |
} | |
// 2.2 获得对应的 WebSocketMessageListener | |
WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType()); | |
if (messageListener == null) { | |
log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload()); | |
return; | |
} | |
// 2.3 处理消息 | |
Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0); | |
Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type); | |
Long tenantId = WebSocketFrameworkUtils.getTenantId(session); | |
TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj)); | |
} catch (Throwable ex) { | |
log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload()); | |
} | |
} | |
} |
# Message 消息接收
WebSocket 后端接收到消息后通过 JsonWebSocketMessageHandler
消息处理器,将消息解析成 JsonWebSocketMessage
对象。之后在根据该对象中的 type消息类型
获取 WebSocketMessageListener
监听器具体的实现类,并将解析出来的消息内容 content
交给它处理。
下面是一个简单的在线聊天的例子,客户端告诉服务端要给谁发送消息,再有服务端发送。
package com.tz.scaffold.module.infra.websocket; | |
import com.tz.scaffold.framework.common.enums.UserTypeEnum; | |
import com.tz.scaffold.framework.websocket.core.listener.WebSocketMessageListener; | |
import com.tz.scaffold.framework.websocket.core.sender.WebSocketMessageSender; | |
import com.tz.scaffold.framework.websocket.core.util.WebSocketFrameworkUtils; | |
import com.tz.scaffold.module.infra.websocket.message.DemoReceiveMessage; | |
import com.tz.scaffold.module.infra.websocket.message.DemoSendMessage; | |
import org.springframework.stereotype.Component; | |
import org.springframework.web.socket.WebSocketSession; | |
import javax.annotation.Resource; | |
/** | |
* <p> Project: scaffold - DemoWebSocketMessageListener </p> | |
* | |
* WebSocket 示例:单发消息 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@Component | |
public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> { | |
@Resource | |
private WebSocketMessageSender webSocketMessageSender; | |
@Override | |
public void onMessage(WebSocketSession session, DemoSendMessage message) { | |
Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session); | |
// 情况一:单发 | |
if (message.getToUserId() != null) { | |
DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId) | |
.setText(message.getText()).setSingle(true); | |
// 给指定用户 | |
webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(), | |
"demo-message-receive", toMessage); | |
return; | |
} | |
// 情况二:群发 | |
DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId) | |
.setText(message.getText()).setSingle(false); | |
// 给所有用户 | |
webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), | |
"demo-message-receive", toMessage); | |
} | |
@Override | |
public String getType() { | |
return "demo-message-send"; | |
} | |
} |
package com.tz.scaffold.module.infra.websocket.message; | |
import lombok.Data; | |
/** | |
* <p> Project: scaffold - DemoSendMessage </p> | |
* | |
* 示例:client -> server 发送消息 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@Data | |
public class DemoSendMessage { | |
/** | |
* 发送给谁 | |
* | |
* 如果为空,说明发送给所有人 | |
*/ | |
private Long toUserId; | |
/** | |
* 内容 | |
*/ | |
private String text; | |
} |
package com.tz.scaffold.module.infra.websocket.message; | |
import lombok.Data; | |
/** | |
* <p> Project: scaffold - DemoReceiveMessage </p> | |
* | |
* 示例:server -> client 同步消息 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@Data | |
public class DemoReceiveMessage { | |
/** | |
* 接收人的编号 | |
*/ | |
private Long fromUserId; | |
/** | |
* 内容 | |
*/ | |
private String text; | |
/** | |
* 是否单聊 | |
*/ | |
private Boolean single; | |
} |
# Message 消息推送
就是后端向前端发送数据,定义接口 WebSocketMessageSender
发送者,提供发送方法给 session 发送消息,具体的发送方式由具体实现类实现。
本项目提供了多种发送方式
实现类 | 支持集群 | 说明 |
---|---|---|
LocalWebSocketMessageSender | 否 | 本地发送,适用单机的情况下 |
RedisWebSocketMessageSender | 是 | 基于 Redis 消息队列发送消息(redis 有发布和订阅功能所以也可以做消息队列) |
RabbitMQWebSocketMessageSender | 是 | 基于 RabbitMQ 消息队列发送消息 |
关于 WebSocket 的集群,当项目集群的情况下,有多个 java 进程比如 A,B 两个进程, 如果这时候 A 系统用户要发送消息给 B 系统用户怎么办?
解决,使用中间件来代替发送,因为集群情况下消息不能直接发送给用户 session,通过中间件发送的流程是:消息先发送给中间件(也就是 redis、MQ 等消息队列)在由每个 java 进程监听消息,两个进程都会消费 MQ 中的消息, 只有对应项目中的 SessionManager 中存在的 session 才能消费成功
代码:
package com.tz.scaffold.framework.websocket.core.sender; | |
import com.tz.scaffold.framework.common.util.json.JsonUtils; | |
/** | |
* <p> Project: scaffold - WebSocketMessageSender </p> | |
* | |
* WebSocket 消息的发送器接口 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
public interface WebSocketMessageSender { | |
/** | |
* 发送消息给指定用户 | |
* | |
* @param userType 用户类型 | |
* @param userId 用户编号 | |
* @param messageType 消息类型 | |
* @param messageContent 消息内容,JSON 格式 | |
*/ | |
void send(Integer userType, Long userId, String messageType, String messageContent); | |
/** | |
* 发送消息给指定用户类型 | |
* | |
* @param userType 用户类型 | |
* @param messageType 消息类型 | |
* @param messageContent 消息内容,JSON 格式 | |
*/ | |
void send(Integer userType, String messageType, String messageContent); | |
/** | |
* 发送消息给指定 Session | |
* | |
* @param sessionId Session 编号 | |
* @param messageType 消息类型 | |
* @param messageContent 消息内容,JSON 格式 | |
*/ | |
void send(String sessionId, String messageType, String messageContent); | |
default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) { | |
send(userType, userId, messageType, JsonUtils.toJsonString(messageContent)); | |
} | |
default void sendObject(Integer userType, String messageType, Object messageContent) { | |
send(userType, messageType, JsonUtils.toJsonString(messageContent)); | |
} | |
default void sendObject(String sessionId, String messageType, Object messageContent) { | |
send(sessionId, messageType, JsonUtils.toJsonString(messageContent)); | |
} | |
} |
默认抽象实现类
package com.tz.scaffold.framework.websocket.core.sender; | |
import cn.hutool.core.collection.CollUtil; | |
import cn.hutool.core.util.StrUtil; | |
import com.tz.scaffold.framework.common.util.json.JsonUtils; | |
import com.tz.scaffold.framework.websocket.core.message.JsonWebSocketMessage; | |
import com.tz.scaffold.framework.websocket.core.session.WebSocketSessionManager; | |
import lombok.RequiredArgsConstructor; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.web.socket.TextMessage; | |
import org.springframework.web.socket.WebSocketSession; | |
import java.io.IOException; | |
import java.util.Collection; | |
import java.util.Collections; | |
import java.util.List; | |
/** | |
* <p> Project: scaffold - AbstractWebSocketMessageSender </p> | |
* | |
* WebSocketMessageSender 实现类 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@Slf4j | |
@RequiredArgsConstructor | |
public abstract class AbstractWebSocketMessageSender implements WebSocketMessageSender { | |
private final WebSocketSessionManager sessionManager; | |
@Override | |
public void send(Integer userType, Long userId, String messageType, String messageContent) { | |
send(null, userType, userId, messageType, messageContent); | |
} | |
@Override | |
public void send(Integer userType, String messageType, String messageContent) { | |
send(null, userType, null, messageType, messageContent); | |
} | |
@Override | |
public void send(String sessionId, String messageType, String messageContent) { | |
send(sessionId, null, null, messageType, messageContent); | |
} | |
/** | |
* 发送消息 | |
* | |
* @param sessionId Session 编号 | |
* @param userType 用户类型 | |
* @param userId 用户编号 | |
* @param messageType 消息类型 | |
* @param messageContent 消息内容 | |
*/ | |
public void send(String sessionId, Integer userType, Long userId, String messageType, String messageContent) { | |
// 1. 获得 Session 列表 | |
List<WebSocketSession> sessions = Collections.emptyList(); | |
if (StrUtil.isNotEmpty(sessionId)) { | |
WebSocketSession session = sessionManager.getSession(sessionId); | |
if (session != null) { | |
sessions = Collections.singletonList(session); | |
} | |
} else if (userType != null && userId != null) { | |
sessions = (List<WebSocketSession>) sessionManager.getSessionList(userType, userId); | |
} else if (userType != null) { | |
sessions = (List<WebSocketSession>) sessionManager.getSessionList(userType); | |
} | |
if (CollUtil.isEmpty(sessions)) { | |
log.info("[send][sessionId({}) userType({}) userId({}) messageType({}) messageContent({}) 未匹配到会话]", | |
sessionId, userType, userId, messageType, messageContent); | |
} | |
// 2. 执行发送 | |
doSend(sessions, messageType, messageContent); | |
} | |
/** | |
* 发送消息的具体实现 | |
* | |
* @param sessions Session 列表 | |
* @param messageType 消息类型 | |
* @param messageContent 消息内容 | |
*/ | |
public void doSend(Collection<WebSocketSession> sessions, String messageType, String messageContent) { | |
JsonWebSocketMessage message = new JsonWebSocketMessage().setType(messageType).setContent(messageContent); | |
// 关键,使用 JSON 序列化 | |
String payload = JsonUtils.toJsonString(message); | |
sessions.forEach(session -> { | |
// 1. 各种校验,保证 Session 可以被发送 | |
if (session == null) { | |
log.error("[doSend][session 为空, message({})]", message); | |
return; | |
} | |
if (!session.isOpen()) { | |
log.error("[doSend][session({}) 已关闭, message({})]", session.getId(), message); | |
return; | |
} | |
// 2. 执行发送 | |
try { | |
session.sendMessage(new TextMessage(payload)); | |
log.info("[doSend][session({}) 发送消息成功,message({})]", session.getId(), message); | |
} catch (IOException ex) { | |
log.error("[doSend][session({}) 发送消息失败,message({})]", session.getId(), message, ex); | |
} | |
}); | |
} | |
} |
注意:默认情况下是没有开启集群模式的,需要开启参考以下配置:
scaffold.websocket.path 是 websocket 的连接路径,可以修改
scaffold.websocket.sender-type 配置是选择消息类型
scaffold: | |
websocket: | |
enable: true # websocket 的开关 | |
path: /infra/ws # 路径 | |
sender-type: local # 消息发送的类型,可选值为 local、redis、rocketmq、kafka、rabbitmq | |
sender-rocketmq: | |
topic: ${spring.application.name}-websocket # 消息发送的 RocketMQ Topic | |
consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 RocketMQ Consumer Group | |
sender-rabbitmq: | |
exchange: ${spring.application.name}-websocket-exchange # 消息发送的 RabbitMQ Exchange | |
queue: ${spring.application.name}-websocket-queue # 消息发送的 RabbitMQ Queue | |
sender-kafka: | |
topic: ${spring.application.name}-websocket # 消息发送的 Kafka Topic | |
consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 Kafka Consumer Group |
# 使用方案
目前有两种方案,分别是:
方案名称 | 请求 | 响应 |
---|---|---|
纯 WebSocket | WebSocket | WebSocket |
Http+WebSocket | Http | WebSocket |
提示,这里的
请求
是前端
发送消息到后端
的方式,响应
是后端
响应信息到前端
的方式
我个人是倾向于方案二的,使用 HTTP 发送消息,使用 WebSocket 响应消息。
原因如下:
- 封装了一个模块扮演一个 WebSocket 服务的角色,可以通过它来主动发送 (响应) 消息给前端。这样,未来如果使用
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)
中间件 (例如说,EMOX、阿里云 MQTT、腾讯云 MQTT 等) 替换现有 WebSocket 也比较方便。 - HTTP 请求发送消息,相比 WebSocket 请求发送消息来说,更加方便,也比较符合我们的编码习惯。
- 在微服务架构下,多个服务是拆分开的,无法提供相同的 WebSocket 连接。例如说, 我有两个服务
A服务
和B服务
都需要 WebSocket 推送能力时,前端就需要分别连接这两个服务,所以还是子要一个专门提供响应消息的服务,比如说由 A 服务提供,那么 B 服务想要推送数据那么就可以通过 A 服务来响应消息, 前端还是通过 Http 发送,去各个模块请求发送对应的消息。
# 后续更新
可以引入 AI 客户来对帮助用户更加清晰的了解自己的产品。