# scaffold 项目之 websocket 实时通信

# 功能简介

WebSocket 是一种网络通信协议,提供了在单个 TCP 连接上进行全双工通信的能力。它是 HTML5 规范的一部分,被设计用来替代轮询和长轮询等传统技术,以实现服务器与客户端之间的实时双向通信。WebSocket 的主要优势在于:

  1. 全双工通信:服务器和客户端可以在任何时刻互相发送数据,不需要像 HTTP 请求那样等待对方完成响应。
  2. 持久连接:一旦 WebSocket 连接建立,它会保持开放状态,直到客户端或服务器决定关闭连接。
  3. 减少开销:WebSocket 协议的头部较小,减少了数据传输的开销。
  4. 更好的实时性:相比于 HTTP 轮询,WebSocket 可以更快地推送数据到客户端。

WebSocket 的通信过程通常包括以下几个步骤:

  1. 握手:客户端通过发送一个 Upgrade 请求到服务器来初始化 WebSocket 连接。如果服务器支持 WebSocket,它将响应客户端并完成握手过程,之后通信协议将从 HTTP 切换到 WebSocket。
  2. 数据传输:一旦 WebSocket 连接建立,客户端和服务器就可以通过这个连接发送和接收数据帧。
  3. 心跳检测:为了保持连接的活跃并检测连接是否仍然有效,客户端和服务器可以定期发送心跳(ping/pong)消息。
  4. 关闭连接:当通信结束时,任何一方都可以发送一个关闭帧来关闭 WebSocket 连接。

WebSocket 在许多实时通信场景中都非常有用,例如:

  • 在线游戏:实时更新游戏状态。
  • 聊天应用:实时发送和接收消息。
  • 股票行情:实时更新股票价格。
  • 协作工具:实时同步文档编辑。

使用 WebSocket 时,开发者需要注意安全性问题,比如使用 wss(WebSocket Secure)来保证数据传输的加密,以及处理可能的拒绝服务攻击(DoS)。此外,WebSocket 也需要良好的错误处理和重连策略,以应对网络不稳定的情况。

使用网络框架 Netty 也可以实现 WebSocket, 但是 Netty 学习成本比较大,使用现有的 WebSocket 框架可以满足大部分需求

# 封装组件

在原有 WebSocket 的基础上二次封装,实现更加便捷的 WebSocket验证Session管理WebSocket集群的广播

# 基于 Token 身份认证

当页面向服务器建立连接的时候通过 token 认证,通信带上 token 参数,列如 ws://127.0.0.1:18081/ws?token=xxx

由于 websocket 是基于 http 建立连接的所以可以直接走原本系统的 身份认证过滤器 来校验 token。

为什么不和其他接口一样 在请求头携带 token 呢? 这是因为 websocket 不支持在 header 携带内容

工具类代码:

package com.tz.scaffold.framework.websocket.core.util;
import com.tz.scaffold.framework.security.core.LoginUser;
import org.springframework.web.socket.WebSocketSession;
import java.util.Map;
/**
 * <p> Project: scaffold - WebSocketFrameworkUtils </p>
 *
 * 专属于 web socket 包的工具类
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
public class WebSocketFrameworkUtils {
    public static final String ATTRIBUTE_LOGIN_USER = "LOGIN_USER";
    /**
     * 设置当前用户
     *
     * @param loginUser 登录用户
     * @param attributes Session
     */
    public static void setLoginUser(LoginUser loginUser, Map<String, Object> attributes) {
        attributes.put(ATTRIBUTE_LOGIN_USER, loginUser);
    }
    /**
     * 获取当前用户
     *
     * @return 当前用户
     */
    public static LoginUser getLoginUser(WebSocketSession session) {
        return (LoginUser) session.getAttributes().get(ATTRIBUTE_LOGIN_USER);
    }
    /**
     * 获得当前用户的编号
     *
     * @return 用户编号
     */
    public static Long getLoginUserId(WebSocketSession session) {
        LoginUser loginUser = getLoginUser(session);
        return loginUser != null ? loginUser.getId() : null;
    }
    /**
     * 获得当前用户的类型
     *
     * @return 用户编号
     */
    public static Integer getLoginUserType(WebSocketSession session) {
        LoginUser loginUser = getLoginUser(session);
        return loginUser != null ? loginUser.getUserType() : null;
    }
    /**
     * 获得当前用户的租户编号
     *
     * @param session Session
     * @return 租户编号
     */
    public static Long getTenantId(WebSocketSession session) {
        LoginUser loginUser = getLoginUser(session);
        return loginUser != null ? loginUser.getTenantId() : null;
    }
}

认证流程代码

当通过 身份认证过滤器 后表示改 token 是正确的,成功后将登陆用户管理起来,在 session管理器

package com.tz.scaffold.framework.websocket.core.security;
import com.tz.scaffold.framework.security.core.LoginUser;
import com.tz.scaffold.framework.security.core.filter.TokenAuthenticationFilter;
import com.tz.scaffold.framework.security.core.util.SecurityFrameworkUtils;
import com.tz.scaffold.framework.websocket.core.util.WebSocketFrameworkUtils;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
/**
 * <p> Project: scaffold - LoginUserHandshakeInterceptor </p>
 *
 * 登录用户的 {@link HandshakeInterceptor} 实现类
 * <p>
 * 流程如下:
 * <li>
 *     1. 前端连接 websocket 时,会通过拼接?token={token} 到 ws:// 连接后,这样它可以被 {@link TokenAuthenticationFilter} 所认证通过
 * <li>
 *     2. {@link LoginUserHandshakeInterceptor} 负责把 {@link LoginUser} 添加到 {@link WebSocketSession} 中
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
public class LoginUserHandshakeInterceptor implements HandshakeInterceptor {
    // 在处理连接调用的方法
    // 是继续握手(true)还是中止(false)
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                                   WebSocketHandler wsHandler, Map<String, Object> attributes) {
        LoginUser loginUser = SecurityFrameworkUtils.getLoginUser();
        if (loginUser != null) {
            WebSocketFrameworkUtils.setLoginUser(loginUser, attributes);
        }
        return true;
    }
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
                               WebSocketHandler wsHandler, Exception exception) {
        // do nothing
    }
}

# Session 会话管理

每一个前端和后端建立连接后,对应后端一个 websocket session 会话对象。 由于后续需要向对应的 websocket session 发送消息,所以需要管理起来。

主要是通过 WebSocketHandlerDecorator 的建立连接和销毁连接前的执行添加和移除 session

代码:

package com.tz.scaffold.framework.websocket.core.session;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
import org.springframework.web.socket.handler.WebSocketHandlerDecorator;
/**
 * <p> Project: scaffold - WebSocketSessionHandlerDecorator </p>
 *
 * {@link WebSocketHandler} 的装饰类,实现了以下功能:
 * <p>
 * <li>
 *     1. {@link WebSocketSession} 连接或关闭时,使用 {@link #sessionManager} 进行管理
 * <li>
 *     2. 封装 {@link WebSocketSession} 支持并发操作
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
public class WebSocketSessionHandlerDecorator extends WebSocketHandlerDecorator {
    /**
     * 发送时间的限制,单位:毫秒
     */
    private static final Integer SEND_TIME_LIMIT = 1000 * 5;
    /**
     * 发送消息缓冲上线,单位:bytes
     */
    private static final Integer BUFFER_SIZE_LIMIT = 1024 * 100;
    private final WebSocketSessionManager sessionManager;
    public WebSocketSessionHandlerDecorator(WebSocketHandler delegate,
                                            WebSocketSessionManager sessionManager) {
        super(delegate);
        this.sessionManager = sessionManager;
    }
    // 建立连接被调用
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        // 实现 session 支持并发,可参考 https://blog.csdn.net/abu935009066/article/details/131218149
        session = new ConcurrentWebSocketSessionDecorator(session, SEND_TIME_LIMIT, BUFFER_SIZE_LIMIT);
        // 添加到 WebSocketSessionManager 中
        sessionManager.addSession(session);
    }
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
        sessionManager.removeSession(session);
    }
}
package com.tz.scaffold.framework.websocket.core.session;
import org.springframework.web.socket.WebSocketSession;
import java.util.Collection;
/**
 * <p> Project: scaffold - WebSocketSessionManager </p>
 *
 * {@link WebSocketSession} 管理器的接口
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
public interface WebSocketSessionManager {
    /**
     * 添加 Session
     *
     * @param session Session
     */
    void addSession(WebSocketSession session);
    /**
     * 移除 Session
     *
     * @param session Session
     */
    void removeSession(WebSocketSession session);
    /**
     * 获得指定编号的 Session
     *
     * @param id Session 编号
     * @return Session
     */
    WebSocketSession getSession(String id);
    /**
     * 获得指定用户类型的 Session 列表
     *
     * @param userType 用户类型
     * @return Session 列表
     */
    Collection<WebSocketSession> getSessionList(Integer userType);
    /**
     * 获得指定用户编号的 Session 列表
     *
     * @param userType 用户类型
     * @param userId 用户编号
     * @return Session 列表
     */
    Collection<WebSocketSession> getSessionList(Integer userType, Long userId);
}
package com.tz.scaffold.framework.websocket.core.session;
import cn.hutool.core.collection.CollUtil;
import com.tz.scaffold.framework.security.core.LoginUser;
import com.tz.scaffold.framework.tenant.core.context.TenantContextHolder;
import com.tz.scaffold.framework.websocket.core.util.WebSocketFrameworkUtils;
import org.springframework.web.socket.WebSocketSession;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
/**
 * <p> Project: scaffold - WebSocketSessionManagerImpl </p>
 *
 * 默认的 {@link WebSocketSessionManager} 实现类
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
public class WebSocketSessionManagerImpl implements WebSocketSessionManager {
    /**
     * id 与 WebSocketSession 映射
     *
     * key:Session 编号
     */
    private final ConcurrentMap<String, WebSocketSession> idSessions = new ConcurrentHashMap<>();
    /**
     * user 与 WebSocketSession 映射
     *
     * key1:用户类型
     * key2:用户编号
     */
    private final ConcurrentMap<Integer, ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>>> userSessions
            = new ConcurrentHashMap<>();
    @Override
    public void addSession(WebSocketSession session) {
        // 添加到 idSessions 中
        idSessions.put(session.getId(), session);
        // 添加到 userSessions 中
        LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
        if (user == null) {
            return;
        }
        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());
        if (userSessionsMap == null) {
            userSessionsMap = new ConcurrentHashMap<>();
            if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) {
                userSessionsMap = userSessions.get(user.getUserType());
            }
        }
        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());
        if (sessions == null) {
            sessions = new CopyOnWriteArrayList<>();
            if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) {
                sessions = userSessionsMap.get(user.getId());
            }
        }
        sessions.add(session);
    }
    @Override
    public void removeSession(WebSocketSession session) {
        // 移除从 idSessions 中
        idSessions.remove(session.getId(), session);
        // 移除从 idSessions 中
        LoginUser user = WebSocketFrameworkUtils.getLoginUser(session);
        if (user == null) {
            return;
        }
        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType());
        if (userSessionsMap == null) {
            return;
        }
        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId());
        sessions.removeIf(session0 -> session0.getId().equals(session.getId()));
        if (CollUtil.isEmpty(sessions)) {
            userSessionsMap.remove(user.getId(), sessions);
        }
    }
    @Override
    public WebSocketSession getSession(String id) {
        return idSessions.get(id);
    }
    @Override
    public Collection<WebSocketSession> getSessionList(Integer userType) {
        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);
        if (CollUtil.isEmpty(userSessionsMap)) {
            return new ArrayList<>();
        }
        // 避免扩容
        LinkedList<WebSocketSession> result = new LinkedList<>();
        Long contextTenantId = TenantContextHolder.getTenantId();
        for (List<WebSocketSession> sessions : userSessionsMap.values()) {
            if (CollUtil.isEmpty(sessions)) {
                continue;
            }
            // 特殊:如果租户不匹配,则直接排除
            if (contextTenantId != null) {
                Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0));
                if (!contextTenantId.equals(userTenantId)) {
                    continue;
                }
            }
            result.addAll(sessions);
        }
        return result;
    }
    @Override
    public Collection<WebSocketSession> getSessionList(Integer userType, Long userId) {
        ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType);
        if (CollUtil.isEmpty(userSessionsMap)) {
            return new ArrayList<>();
        }
        CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(userId);
        return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>();
    }
}

# Message 消息格式

主要是处理前端发送给后端的消息类型,对应不同的类型进行不同的处理,前端发送给后端的消息类型通常取决于具体的业务需求。以下是一些常见的业务场景及其对应的消息类型,例如:

  1. 聊天消息(文本消息)
    • 最常见的应用之一是聊天应用,前端发送文本消息给后端,后端处理后再转发给其他用户或存储聊天记录。
  2. 状态更新(文本消息)
    • 用户的状态更新,如在线、离线、忙碌等,通常以文本或 JSON 格式发送。
  3. 用户操作(文本消息 / JSON)
    • 用户在前端界面上执行的操作,比如点击按钮、提交表单等,这些操作可以封装成 JSON 对象发送给后端。
  4. 文件传输(二进制消息)
    • 在需要传输文件(如图片、视频、文档等)的场景中,前端会将文件转换为二进制数据并通过 WebSocket 发送。
  5. 实时游戏数据(文本消息 / JSON)
    • 对于实时多人游戏,玩家的位置、得分、游戏状态等信息需要实时同步,这些数据通常以 JSON 格式发送。
  6. 监控数据(文本消息 / JSON)
    • 在物联网或远程监控系统中,传感器数据或监控信息可以通过 WebSocket 实时发送给后端服务器。
  7. 交易信息(文本消息 / JSON)
    • 金融交易系统中,交易请求和确认信息可以通过 WebSocket 发送,以实现快速的交易处理。
  8. 实时分析数据(文本消息 / JSON)
    • 对于需要实时数据分析的应用,如股票市场分析,前端可以发送查询请求,后端返回分析结果。
  9. 控制命令(文本消息 / JSON)
    • 在智能家居或远程控制系统中,用户可以通过 WebSocket 发送控制命令来操作设备。
  10. 错误报告和日志(文本消息 / JSON)
    • 前端可以将错误信息和日志以文本或 JSON 格式发送给后端,以便于问题诊断和系统监控。
  11. 认证和授权(文本消息 / JSON)
    • 在需要用户登录的应用中,前端可能需要发送认证信息(如用户名和密码)给后端,以及处理授权相关的请求。
  12. 配置更新(文本消息 / JSON)
    • 前端应用可能需要从后端获取配置信息,或者发送配置更新请求给后端。

代码:

package com.tz.scaffold.framework.websocket.core.message;
import com.tz.scaffold.framework.websocket.core.listener.WebSocketMessageListener;
import lombok.Data;
import java.io.Serializable;
/**
 * <p> Project: scaffold - JsonWebSocketMessage </p>
 *
 * JSON 格式的 WebSocket 消息帧
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@Data
public class JsonWebSocketMessage implements Serializable {
    /**
     * 消息类型
     *
     * 目的:用于分发到对应的 {@link WebSocketMessageListener} 实现类
     */
    private String type;
    /**
     * 消息内容
     *
     * 要求 JSON 对象
     */
    private String content;
}

通过实现 WebSocketMessageListener 接口,自己定义对应类型的处理逻辑

package com.tz.scaffold.framework.websocket.core.listener;
import com.tz.scaffold.framework.websocket.core.message.JsonWebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
/**
 * <p> Project: scaffold - WebSocketMessageListener </p>
 *
 * WebSocket 消息监听器接口
 * <p>
 * 目的:前端发送消息给后端后,处理对应 {@link #getType ()} 类型的消息
 *
 * @param <T> 泛型,消息类型
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
public interface WebSocketMessageListener<T> {
    /**
     * 处理消息
     *
     * @param session Session
     * @param message 消息
     */
    void onMessage(WebSocketSession session, T message);
    /**
     * 获得消息类型
     *
     * @see JsonWebSocketMessage#getType ()
     * @return 消息类型
     */
    String getType();
}
package com.tz.scaffold.framework.websocket.core.handler;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.TypeUtil;
import com.tz.scaffold.framework.common.util.json.JsonUtils;
import com.tz.scaffold.framework.tenant.core.util.TenantUtils;
import com.tz.scaffold.framework.websocket.core.listener.WebSocketMessageListener;
import com.tz.scaffold.framework.websocket.core.message.JsonWebSocketMessage;
import com.tz.scaffold.framework.websocket.core.util.WebSocketFrameworkUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
/**
 * <p> Project: scaffold - JsonWebSocketMessageHandler </p>
 *
 * JSON 格式 {@link WebSocketHandler} 实现类
 * <p>
 * 基于 {@link JsonWebSocketMessage#getType ()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@Slf4j
public class JsonWebSocketMessageHandler extends TextWebSocketHandler {
    /**
     * type 与 WebSocketMessageListener 的映射
     */
    private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>();
    @SuppressWarnings({"rawtypes", "unchecked"})
    public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) {
        listenersList.forEach((Consumer<WebSocketMessageListener>)
                listener -> listeners.put(listener.getType(), listener));
    }
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        // 1.1 空消息,跳过
        if (message.getPayloadLength() == 0) {
            return;
        }
        // 1.2 ping 心跳消息,直接返回 pong 消息。
        if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) {
            session.sendMessage(new TextMessage("pong"));
            return;
        }
        // 2.1 解析消息
        try {
            JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class);
            if (jsonMessage == null) {
                log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload());
                return;
            }
            if (StrUtil.isEmpty(jsonMessage.getType())) {
                log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload());
                return;
            }
            // 2.2 获得对应的 WebSocketMessageListener
            WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType());
            if (messageListener == null) {
                log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload());
                return;
            }
            // 2.3 处理消息
            Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0);
            Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type);
            Long tenantId = WebSocketFrameworkUtils.getTenantId(session);
            TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj));
        } catch (Throwable ex) {
            log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload());
        }
    }
}

# Message 消息接收

WebSocket 后端接收到消息后通过 JsonWebSocketMessageHandler 消息处理器,将消息解析成 JsonWebSocketMessage 对象。之后在根据该对象中的 type消息类型 获取 WebSocketMessageListener 监听器具体的实现类,并将解析出来的消息内容 content 交给它处理。

下面是一个简单的在线聊天的例子,客户端告诉服务端要给谁发送消息,再有服务端发送。

package com.tz.scaffold.module.infra.websocket;
import com.tz.scaffold.framework.common.enums.UserTypeEnum;
import com.tz.scaffold.framework.websocket.core.listener.WebSocketMessageListener;
import com.tz.scaffold.framework.websocket.core.sender.WebSocketMessageSender;
import com.tz.scaffold.framework.websocket.core.util.WebSocketFrameworkUtils;
import com.tz.scaffold.module.infra.websocket.message.DemoReceiveMessage;
import com.tz.scaffold.module.infra.websocket.message.DemoSendMessage;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketSession;
import javax.annotation.Resource;
/**
 * <p> Project: scaffold - DemoWebSocketMessageListener </p>
 *
 * WebSocket 示例:单发消息
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@Component
public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> {
    @Resource
    private WebSocketMessageSender webSocketMessageSender;
    @Override
    public void onMessage(WebSocketSession session, DemoSendMessage message) {
        Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session);
        // 情况一:单发
        if (message.getToUserId() != null) {
            DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId)
                    .setText(message.getText()).setSingle(true);
            // 给指定用户
            webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(),
                    "demo-message-receive", toMessage);
            return;
        }
        // 情况二:群发
        DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId)
                .setText(message.getText()).setSingle(false);
        // 给所有用户
        webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(),
                "demo-message-receive", toMessage);
    }
    @Override
    public String getType() {
        return "demo-message-send";
    }
}
package com.tz.scaffold.module.infra.websocket.message;
import lombok.Data;
/**
 * <p> Project: scaffold - DemoSendMessage </p>
 *
 * 示例:client -> server 发送消息
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@Data
public class DemoSendMessage {
    /**
     * 发送给谁
     *
     * 如果为空,说明发送给所有人
     */
    private Long toUserId;
    /**
     * 内容
     */
    private String text;
}
package com.tz.scaffold.module.infra.websocket.message;
import lombok.Data;
/**
 * <p> Project: scaffold - DemoReceiveMessage </p>
 *
 * 示例:server -> client 同步消息
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@Data
public class DemoReceiveMessage {
    /**
     * 接收人的编号
     */
    private Long fromUserId;
    /**
     * 内容
     */
    private String text;
    /**
     * 是否单聊
     */
    private Boolean single;
}

# Message 消息推送

就是后端向前端发送数据,定义接口 WebSocketMessageSender 发送者,提供发送方法给 session 发送消息,具体的发送方式由具体实现类实现。

本项目提供了多种发送方式

实现类支持集群说明
LocalWebSocketMessageSender本地发送,适用单机的情况下
RedisWebSocketMessageSender基于 Redis 消息队列发送消息(redis 有发布和订阅功能所以也可以做消息队列)
RabbitMQWebSocketMessageSender基于 RabbitMQ 消息队列发送消息

关于 WebSocket 的集群,当项目集群的情况下,有多个 java 进程比如 A,B 两个进程, 如果这时候 A 系统用户要发送消息给 B 系统用户怎么办?

解决,使用中间件来代替发送,因为集群情况下消息不能直接发送给用户 session,通过中间件发送的流程是:消息先发送给中间件(也就是 redis、MQ 等消息队列)在由每个 java 进程监听消息,两个进程都会消费 MQ 中的消息, 只有对应项目中的 SessionManager 中存在的 session 才能消费成功

代码:

package com.tz.scaffold.framework.websocket.core.sender;
import com.tz.scaffold.framework.common.util.json.JsonUtils;
/**
 * <p> Project: scaffold - WebSocketMessageSender </p>
 *
 * WebSocket 消息的发送器接口
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
public interface WebSocketMessageSender {
    /**
     * 发送消息给指定用户
     *
     * @param userType 用户类型
     * @param userId 用户编号
     * @param messageType 消息类型
     * @param messageContent 消息内容,JSON 格式
     */
    void send(Integer userType, Long userId, String messageType, String messageContent);
    /**
     * 发送消息给指定用户类型
     *
     * @param userType 用户类型
     * @param messageType 消息类型
     * @param messageContent 消息内容,JSON 格式
     */
    void send(Integer userType, String messageType, String messageContent);
    /**
     * 发送消息给指定 Session
     *
     * @param sessionId Session 编号
     * @param messageType 消息类型
     * @param messageContent 消息内容,JSON 格式
     */
    void send(String sessionId, String messageType, String messageContent);
    default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) {
        send(userType, userId, messageType, JsonUtils.toJsonString(messageContent));
    }
    default void sendObject(Integer userType, String messageType, Object messageContent) {
        send(userType, messageType, JsonUtils.toJsonString(messageContent));
    }
    default void sendObject(String sessionId, String messageType, Object messageContent) {
        send(sessionId, messageType, JsonUtils.toJsonString(messageContent));
    }
}

默认抽象实现类

package com.tz.scaffold.framework.websocket.core.sender;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.tz.scaffold.framework.common.util.json.JsonUtils;
import com.tz.scaffold.framework.websocket.core.message.JsonWebSocketMessage;
import com.tz.scaffold.framework.websocket.core.session.WebSocketSessionManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
 * <p> Project: scaffold - AbstractWebSocketMessageSender </p>
 *
 * WebSocketMessageSender 实现类
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@Slf4j
@RequiredArgsConstructor
public abstract class AbstractWebSocketMessageSender implements WebSocketMessageSender {
    private final WebSocketSessionManager sessionManager;
    @Override
    public void send(Integer userType, Long userId, String messageType, String messageContent) {
        send(null, userType, userId, messageType, messageContent);
    }
    @Override
    public void send(Integer userType, String messageType, String messageContent) {
        send(null, userType, null, messageType, messageContent);
    }
    @Override
    public void send(String sessionId, String messageType, String messageContent) {
        send(sessionId, null, null, messageType, messageContent);
    }
    /**
     * 发送消息
     *
     * @param sessionId Session 编号
     * @param userType 用户类型
     * @param userId 用户编号
     * @param messageType 消息类型
     * @param messageContent 消息内容
     */
    public void send(String sessionId, Integer userType, Long userId, String messageType, String messageContent) {
        // 1. 获得 Session 列表
        List<WebSocketSession> sessions = Collections.emptyList();
        if (StrUtil.isNotEmpty(sessionId)) {
            WebSocketSession session = sessionManager.getSession(sessionId);
            if (session != null) {
                sessions = Collections.singletonList(session);
            }
        } else if (userType != null && userId != null) {
            sessions = (List<WebSocketSession>) sessionManager.getSessionList(userType, userId);
        } else if (userType != null) {
            sessions = (List<WebSocketSession>) sessionManager.getSessionList(userType);
        }
        if (CollUtil.isEmpty(sessions)) {
            log.info("[send][sessionId({}) userType({}) userId({}) messageType({}) messageContent({}) 未匹配到会话]",
                    sessionId, userType, userId, messageType, messageContent);
        }
        // 2. 执行发送
        doSend(sessions, messageType, messageContent);
    }
    /**
     * 发送消息的具体实现
     *
     * @param sessions Session 列表
     * @param messageType 消息类型
     * @param messageContent 消息内容
     */
    public void doSend(Collection<WebSocketSession> sessions, String messageType, String messageContent) {
        JsonWebSocketMessage message = new JsonWebSocketMessage().setType(messageType).setContent(messageContent);
        // 关键,使用 JSON 序列化
        String payload = JsonUtils.toJsonString(message);
        sessions.forEach(session -> {
            // 1. 各种校验,保证 Session 可以被发送
            if (session == null) {
                log.error("[doSend][session 为空, message({})]", message);
                return;
            }
            if (!session.isOpen()) {
                log.error("[doSend][session({}) 已关闭, message({})]", session.getId(), message);
                return;
            }
            // 2. 执行发送
            try {
                session.sendMessage(new TextMessage(payload));
                log.info("[doSend][session({}) 发送消息成功,message({})]", session.getId(), message);
            } catch (IOException ex) {
                log.error("[doSend][session({}) 发送消息失败,message({})]", session.getId(), message, ex);
            }
        });
    }
}

注意:默认情况下是没有开启集群模式的,需要开启参考以下配置:

scaffold.websocket.path 是 websocket 的连接路径,可以修改

scaffold.websocket.sender-type 配置是选择消息类型

scaffold:
  websocket:
    enable: true # websocket 的开关
    path: /infra/ws # 路径
    sender-type: local # 消息发送的类型,可选值为 local、redis、rocketmq、kafka、rabbitmq
    sender-rocketmq:
      topic: ${spring.application.name}-websocket # 消息发送的 RocketMQ Topic
      consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 RocketMQ Consumer Group
    sender-rabbitmq:
      exchange: ${spring.application.name}-websocket-exchange # 消息发送的 RabbitMQ Exchange
      queue: ${spring.application.name}-websocket-queue # 消息发送的 RabbitMQ Queue
    sender-kafka:
      topic: ${spring.application.name}-websocket # 消息发送的 Kafka Topic
      consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 Kafka Consumer Group

# 使用方案

目前有两种方案,分别是:

方案名称请求响应
纯 WebSocketWebSocketWebSocket
Http+WebSocketHttpWebSocket

提示,这里的 请求前端 发送消息到 后端 的方式, 响应后端 响应信息到 前端 的方式

我个人是倾向于方案二的,使用 HTTP 发送消息,使用 WebSocket 响应消息。

原因如下:

  1. 封装了一个模块扮演一个 WebSocket 服务的角色,可以通过它来主动发送 (响应) 消息给前端。这样,未来如果使用 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议) 中间件 (例如说,EMOX、阿里云 MQTT、腾讯云 MQTT 等) 替换现有 WebSocket 也比较方便。
  2. HTTP 请求发送消息,相比 WebSocket 请求发送消息来说,更加方便,也比较符合我们的编码习惯。
  3. 在微服务架构下,多个服务是拆分开的,无法提供相同的 WebSocket 连接。例如说, 我有两个服务 A服务B服务 都需要 WebSocket 推送能力时,前端就需要分别连接这两个服务,所以还是子要一个专门提供响应消息的服务,比如说由 A 服务提供,那么 B 服务想要推送数据那么就可以通过 A 服务来响应消息, 前端还是通过 Http 发送,去各个模块请求发送对应的消息。

# 后续更新

可以引入 AI 客户来对帮助用户更加清晰的了解自己的产品。