# scaffold 项目之消息队列基于 Redis 实现

# 简介

消息队列 (Message Queue) 是一种进程间通信或同一进程内不同线程间的通信方式,它提供了异步的通信协议,允许发送者和接收者不需要同时与消息队列交互。

# 使用场景

# 特点

  • 解耦:生产者和消费者不需要知道彼此的存在
  • 异步:生产者发送消息后可以立即返回,不需要等待消费者处理
  • 削峰:在流量高峰时缓冲请求,避免系统过载
  • 可靠性:消息可以持久化,确保不会丢失
  • 扩展性:可以轻松增加消费者来提高处理能力

# 使用场景

  1. 订单处理系统:用户下单后,订单消息放入队列,由后台服务异步处理
  2. 日志收集:应用将日志发送到消息队列,由专门的日志处理服务消费
  3. 通知系统:用户行为触发通知,通过消息队列发送给通知服务
  4. 数据同步:不同系统间通过消息队列同步数据变更
  5. 任务调度:将定时任务放入队列,由工作节点消费执行

# 开始使用

首先本项目专门封装了一个消息队列的组件 scaffold-spring-boot-starter-mq 并且基于 redis 实现了分布式消息队列。

redis 实现核心点有一下两点:

  1. 使用 Stream 特性,提供【集群】消费的能力。
  2. 使用 Pub/Sub 特性,提供【广播】消费的能力。

# Stream 和 Pub/Sub 的区别和选择

# Stream vs Pub/Sub: 关键区别总结

特性Pub/SubStream
消息持久性 (瞬时,无订阅者即丢失) (持久存储在内存中)
消息存储❌ 服务器不存储消息✅ 消息存储在 Stream 结构中
消费模型✅ 广播 (所有订阅者收到所有消息)✅ 灵活:单消费者 / 消费者组 (负载均衡)
消息回溯❌ 无法读取历史消息✅ 可以读取任意历史位置的消息
消费者状态❌ 无状态 (服务器不跟踪消费者进度)有状态 (消费者组跟踪 > 和 PEL)
可靠性保证❌ 最多一次 (消息可能丢失)至少一次 (通过 PEL 和 XACK 机制)
阻塞读取✅ 支持 ( SUBSCRIBE 本身就是阻塞的)✅ 支持 ( XREAD/XREADGROUP with BLOCK )
容量控制❌ 无✅ 有 ( XTRIM , XADD ... MAXLEN )
主要用途实时事件通知、广播消息队列、事件溯源、可靠日志处理、数据管道
复杂度简单相对复杂 (尤其涉及消费者组管理)

# 如何选择?

  • 选择 Pub/Sub 当:
    • 你需要瞬时广播通知
    • 消息丢失是可接受的(或者你知道总有订阅者在)。
    • 不需要追踪消费者进度或重发消息。
    • 场景非常简单,不需要持久化或历史数据。
  • 选择 Stream 当:
    • 你需要持久化消息
    • 你需要可靠的消息处理(“至少一次” 语义)。
    • 你需要多个消费者分担负载(消费者组)。
    • 你需要重放历史消息或从特定点开始消费。
    • 你需要控制消息队列的长度。
    • 你需要构建一个健壮的消息队列事件溯源系统。

核心区别一句话总结:

  • Pub/Sub: 是瞬时的广播 / 通知机制。消息如果没有订阅者,就永远丢失。它关注的是 “事件发生” 的通知。
  • Stream: 是持久化的消息日志。消息被持久存储,消费者可以按需读取(首次读取、重读、回溯)。它关注的是 “发生了什么” 的记录和可靠处理。

# 封装 scaffold-spring-boot-starter-mq 组件

基于 Redis 实现的轻量级消息队列系统,支持两种消息模式:Pub/Sub(发布 / 订阅)和 Stream(流)。

# 主要功能

  1. 消息发送
    • 支持 Pub/Sub 模式的广播消息
    • 支持 Stream 模式的持久化消息
    • 自动序列化 / 反序列化消息
  2. 消息消费
    • Pub/Sub 监听器自动订阅指定 Channel
    • Stream 监听器自动创建消费者组
    • Stream 消息自动确认 (ack)
  3. 可靠性保障
    • Pending 消息重发机制
    • 消息超时检查
    • 消费者崩溃恢复
  4. 扩展性
    • 拦截器机制支持自定义处理逻辑
    • 消息头支持附加元数据

# 消息模型

# Redis 消息抽象基类:
package com.tz.scaffold.framework.mq.redis.core.message;
/**
 * <p> Project: scaffold - AbstractRedisMessage </p>
 *
 * Redis 消息抽象基类
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@Data
public abstract class AbstractRedisMessage {
    /**
     * 头
     */
    private Map<String, String> headers = new HashMap<>();
    public String getHeader(String key) {
        return headers.get(key);
    }
    public void addHeader(String key, String value) {
        headers.put(key, value);
    }
}
  • 消息抽象基类
  • 包含 headers 属性用于存储消息头信息
  • 提供基本的 header 操作方法
# AbstractRedisChannelMessage
package com.tz.scaffold.framework.mq.redis.core.pubsub;
/**
 * <p> Project: scaffold - AbstractRedisChannelMessage </p>
 *
 * Redis Channel Message 抽象类
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
public abstract class AbstractRedisChannelMessage extends AbstractRedisMessage {
    /**
     * 获得 Redis Channel,默认使用类名
     * <p>
     * JsonIgnore: 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。
     * @return Channel
     */
    @JsonIgnore
    public String getChannel() {
        return getClass().getSimpleName();
    }
}
  • Pub/Sub 消息抽象类
  • 继承自 AbstractRedisMessage
  • 默认使用类名作为 Channel 名称
# AbstractRedisStreamMessage
package com.tz.scaffold.framework.mq.redis.core.stream;
/**
 * <p> Project: scaffold - AbstractRedisStreamMessage </p>
 *
 * Redis Stream Message 抽象类
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
public abstract class AbstractRedisStreamMessage extends AbstractRedisMessage {
    /**
     * 获得 Redis Stream Key,默认使用类名
     * <p>
     * JsonIgnore: 避免序列化。
     * @return Channel
     */
    @JsonIgnore
    public String getStreamKey() {
        return getClass().getSimpleName();
    }
}
  • Stream 消息抽象类
  • 继承自 AbstractRedisMessage
  • 默认使用类名作为 Stream Key

# 核心模板类

RedisMQTemplate

package com.tz.scaffold.framework.mq.redis.core;
/**
 * <p> Project: scaffold - RedisMQTemplate </p>
 *
 * Redis MQ 操作模板类
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@AllArgsConstructor
public class RedisMQTemplate {
    @Getter
    private final RedisTemplate<String, ?> redisTemplate;
    /**
     * 拦截器数组
     */
    @Getter
    private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();
    /**
     * 发送 Redis 消息,基于 Redis pub/sub 实现
     *
     * @param message 消息
     */
    public <T extends AbstractRedisChannelMessage> void send(T message) {
        try {
            sendMessageBefore(message);
            // 发送消息
            redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
        } finally {
            sendMessageAfter(message);
        }
    }
    /**
     * 发送 Redis 消息,基于 Redis Stream 实现
     *
     * @param message 消息
     * @return 消息记录的编号对象
     */
    public <T extends AbstractRedisStreamMessage> RecordId send(T message) {
        try {
            sendMessageBefore(message);
            // 发送消息
            return redisTemplate.opsForStream().add(StreamRecords.newRecord()
                    // 设置内容
                    .ofObject(JsonUtils.toJsonString(message))
                    // 设置 stream key
                    .withStreamKey(message.getStreamKey()));
        } finally {
            sendMessageAfter(message);
        }
    }
    /**
     * 添加拦截器
     *
     * @param interceptor 拦截器
     */
    public void addInterceptor(RedisMessageInterceptor interceptor) {
        interceptors.add(interceptor);
    }
    private void sendMessageBefore(AbstractRedisMessage message) {
        // 正序
        interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
    }
    private void sendMessageAfter(AbstractRedisMessage message) {
        // 倒序
        for (int i = interceptors.size() - 1; i >= 0; i--) {
            interceptors.get(i).sendMessageAfter(message);
        }
    }
}
  • 消息队列操作模板类
  • 提供两种消息发送方法:
    • send(T message) - 基于 Pub/Sub 的广播消息
    • send(T message) - 基于 Stream 的持久化消息
  • 支持拦截器机制,可在消息发送前后和消费前后执行自定义逻辑

# 配置类

# ScaffoldRedisMQProducerAutoConfiguration
package com.tz.scaffold.framework.mq.redis.config;
import com.tz.scaffold.framework.mq.redis.core.RedisMQTemplate;
import com.tz.scaffold.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import com.tz.scaffold.framework.redis.config.ScaffoldRedisAutoConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.List;
/**
 * <p> Project: scaffold - ScaffoldRedisMQProducerAutoConfiguration </p>
 *
 * Redis 消息队列 Producer 配置类
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@Slf4j
@AutoConfiguration(after = ScaffoldRedisAutoConfiguration.class)
public class ScaffoldRedisMQProducerAutoConfiguration {
    @Bean
    public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
                                           List<RedisMessageInterceptor> interceptors) {
        RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
        // 添加拦截器
        interceptors.forEach(redisMQTemplate::addInterceptor);
        return redisMQTemplate;
    }
}
  • 生产者自动配置类
  • 主要创建 RedisMQTemplate bean
  • 自动收集所有 RedisMessageInterceptor 拦截器并注入到模板中
# ScaffoldRedisMQConsumerAutoConfiguration
package com.tz.scaffold.framework.mq.redis.config;
/**
 * <p> Project: scaffold - ScaffoldRedisMQConsumerAutoConfiguration </p>
 *
 * Redis 消息队列 Consumer 配置类
 * <p>
 * EnableScheduling: 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@Slf4j
@EnableScheduling
@AutoConfiguration(after = ScaffoldRedisAutoConfiguration.class)
public class ScaffoldRedisMQConsumerAutoConfiguration {
    /**
     * 创建 Redis Pub/Sub 广播消费的容器
     * <p>
     * ConditionalOnBean (AbstractRedisChannelMessageListener.class) :
     * 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
     */
    @Bean
    @ConditionalOnBean(AbstractRedisChannelMessageListener.class)
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {
        // 创建 RedisMessageListenerContainer 对象
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 设置 RedisConnection 工厂。
        container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());
        // 添加监听器
        listeners.forEach(listener -> {
            listener.setRedisMQTemplate(redisMQTemplate);
            container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
            log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
                    listener.getChannel(), listener.getClass().getName());
        });
        return container;
    }
    /**
     * 创建 Redis Stream 重新消费的任务
     * <p>
     * ConditionalOnBean (AbstractRedisStreamMessageListener.class) :
     * 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
     */
    @Bean
    @ConditionalOnBean(AbstractRedisStreamMessageListener.class)
    public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,
                                                                     RedisMQTemplate redisTemplate,
                                                                     @Value("${spring.application.name}") String groupName,
                                                                     RedissonClient redissonClient) {
        return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);
    }
    /**
     * 创建 Redis Stream 集群消费的容器
     * <p>
     * 基础知识:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令 & lt;/a>
     * <p>
     * ConditionalOnBean (AbstractRedisStreamMessageListener.class) :
     * 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
     */
    @Bean(initMethod = "start", destroyMethod = "stop")
    @ConditionalOnBean(AbstractRedisStreamMessageListener.class)
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
            RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {
        RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
        checkRedisVersion(redisTemplate);
        // 第一步,创建 StreamMessageListenerContainer 容器
        // 创建 options 配置
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                        // 一次性最多拉取多少条消息
                        .batchSize(10)
                        // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
                        .targetType(String.class)
                        .build();
        // 创建 container 对象
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
                StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
        // 第二步,注册监听器,消费对应的 Stream 主题
        String consumerName = buildConsumerName();
        listeners.parallelStream().forEach(listener -> {
            log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]",
                    listener.getStreamKey(), listener.getClass().getName());
            // 创建 listener 对应的消费者分组
            try {
                redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
            } catch (Exception ignore) {
            }
            // 设置 listener 对应的 redisTemplate
            listener.setRedisMQTemplate(redisMQTemplate);
            // 创建 Consumer 对象
            Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
            // 设置 Consumer 消费进度,以最小消费进度为准
            StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
            // 设置 Consumer 监听
            StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
                    .builder(streamOffset).consumer(consumer)
                    // 不自动 ack
                    .autoAcknowledge(false)
                    // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
                    .cancelOnError(throwable -> false);
            container.register(builder.build(), listener);
            log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]",
                    listener.getStreamKey(), listener.getClass().getName());
        });
        return container;
    }
    /**
     * 构建消费者名字,使用本地 IP + 进程编号的方式。
     * 参考自 RocketMQ clientId 的实现
     *
     * @return 消费者名字
     */
    private static String buildConsumerName() {
        return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
    }
    /**
     * 校验 Redis 版本号,是否满足最低的版本号要求!
     */
    private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
        // 获得 Redis 版本
        Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
        String version = MapUtil.getStr(info, "redis_version");
        // 校验最低版本必须大于等于 5.0.0
        int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));
        if (majorVersion < 5) {
            throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" +
                    "请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl()));
        }
    }
}
  • 消费者自动配置类
  • 配置 Pub/Sub 监听容器 ( RedisMessageListenerContainer )
  • 配置 Stream 监听容器 ( StreamMessageListenerContainer )
  • 创建消息重发任务 ( RedisPendingMessageResendJob )

# 监听器

# AbstractRedisChannelMessageListener
package com.tz.scaffold.framework.mq.redis.core.pubsub;
/**
 * <p> Project: scaffold - AbstractRedisChannelMessageListener </p>
 *
 * Redis Pub/Sub 监听器抽象类,用于实现广播消费
 * <p>
 * @param <T> 消息类型。一定要填写噢,不然会报错
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
public abstract class AbstractRedisChannelMessageListener<T extends AbstractRedisChannelMessage> implements MessageListener {
    /**
     * 消息类型
     */
    private final Class<T> messageType;
    /**
     * Redis Channel
     */
    private final String channel;
    /**
     * RedisMQTemplate
     */
    @Setter
    private RedisMQTemplate redisMQTemplate;
    @SneakyThrows
    protected AbstractRedisChannelMessageListener() {
        this.messageType = getMessageClass();
        this.channel = messageType.getDeclaredConstructor().newInstance().getChannel();
    }
    /**
     * 获得 Sub 订阅的 Redis Channel 通道
     *
     * @return channel
     */
    public final String getChannel() {
        return channel;
    }
    @Override
    public final void onMessage(Message message, byte[] bytes) {
        T messageObj = JsonUtils.parseObject(message.getBody(), messageType);
        try {
            consumeMessageBefore(messageObj);
            // 消费消息
            this.onMessage(messageObj);
        } finally {
            consumeMessageAfter(messageObj);
        }
    }
    /**
     * 处理消息
     *
     * @param message 消息
     */
    public abstract void onMessage(T message);
    /**
     * 通过解析类上的泛型,获得消息类型
     *
     * @return 消息类型
     */
    @SuppressWarnings("unchecked")
    private Class<T> getMessageClass() {
        Type type = TypeUtil.getTypeArgument(getClass(), 0);
        if (type == null) {
            throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
        }
        return (Class<T>) type;
    }
    private void consumeMessageBefore(AbstractRedisMessage message) {
        assert redisMQTemplate != null;
        List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
        // 正序
        interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
    }
    private void consumeMessageAfter(AbstractRedisMessage message) {
        assert redisMQTemplate != null;
        List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
        // 倒序
        for (int i = interceptors.size() - 1; i >= 0; i--) {
            interceptors.get(i).consumeMessageAfter(message);
        }
    }
}
  • Pub/Sub 消息监听器抽象类
  • 实现 MessageListener 接口
  • 自动解析泛型类型确定消息类型
  • 支持拦截器机制

AbstractRedisStreamMessageListener

package com.tz.scaffold.framework.mq.redis.core.stream;
import cn.hutool.core.util.TypeUtil;
import com.tz.scaffold.framework.common.util.json.JsonUtils;
import com.tz.scaffold.framework.mq.redis.core.RedisMQTemplate;
import com.tz.scaffold.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import com.tz.scaffold.framework.mq.redis.core.message.AbstractRedisMessage;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import java.lang.reflect.Type;
import java.util.List;
/**
 * <p> Project: scaffold - AbstractRedisStreamMessageListener </p>
 *
 * Redis Stream 监听器抽象类,用于实现集群消费
 * <p>
 * @param <T> 消息类型。一定要填写噢,不然会报错
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
public abstract class AbstractRedisStreamMessageListener<T extends AbstractRedisStreamMessage>
        implements StreamListener<String, ObjectRecord<String, String>> {
    /**
     * 消息类型
     */
    private final Class<T> messageType;
    /**
     * Redis Channel
     */
    @Getter
    private final String streamKey;
    /**
     * Redis 消费者分组,默认使用 spring.application.name 名字
     */
    @Value("${spring.application.name}")
    @Getter
    private String group;
    /**
     * RedisMQTemplate
     */
    @Setter
    private RedisMQTemplate redisMQTemplate;
    @SneakyThrows
    protected AbstractRedisStreamMessageListener() {
        this.messageType = getMessageClass();
        this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey();
    }
    @Override
    public void onMessage(ObjectRecord<String, String> message) {
        // 消费消息
        T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
        try {
            consumeMessageBefore(messageObj);
            // 消费消息
            this.onMessage(messageObj);
            //ack 消息消费完成
            redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message);
            // TODO 需要额外考虑以下几个点:
            // 1. 处理异常的情况
            // 2. 发送日志;以及事务的结合
            // 3. 消费日志;以及通用的幂等性
            // 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638
        } finally {
            consumeMessageAfter(messageObj);
        }
    }
    /**
     * 处理消息
     *
     * @param message 消息
     */
    public abstract void onMessage(T message);
    /**
     * 通过解析类上的泛型,获得消息类型
     *
     * @return 消息类型
     */
    @SuppressWarnings("unchecked")
    private Class<T> getMessageClass() {
        Type type = TypeUtil.getTypeArgument(getClass(), 0);
        if (type == null) {
            throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
        }
        return (Class<T>) type;
    }
    private void consumeMessageBefore(AbstractRedisMessage message) {
        assert redisMQTemplate != null;
        List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
        // 正序
        interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
    }
    private void consumeMessageAfter(AbstractRedisMessage message) {
        assert redisMQTemplate != null;
        List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
        // 倒序
        for (int i = interceptors.size() - 1; i >= 0; i--) {
            interceptors.get(i).consumeMessageAfter(message);
        }
    }
}
  • Stream 消息监听器抽象类
  • 实现 StreamListener 接口
  • 自动解析泛型类型确定消息类型
  • 支持消费者组概念
  • 自动 ack 消息
  • 支持拦截器机制

# 辅助组件

# RedisMessageInterceptor
package com.tz.scaffold.framework.mq.redis.core.interceptor;
import com.tz.scaffold.framework.mq.redis.core.message.AbstractRedisMessage;
/**
 * <p> Project: scaffold - RedisMessageInterceptor </p>
 *
 * {@link AbstractRedisMessage} 消息拦截器
 * <p>
 * 通过拦截器,作为插件机制,实现拓展。
 * <p>
 * 例如说,多租户场景下的 MQ 消息处理
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
public interface RedisMessageInterceptor {
    default void sendMessageBefore(AbstractRedisMessage message) {
    }
    default void sendMessageAfter(AbstractRedisMessage message) {
    }
    default void consumeMessageBefore(AbstractRedisMessage message) {
    }
    default void consumeMessageAfter(AbstractRedisMessage message) {
    }
}
  • 消息拦截器接口
  • 定义四个拦截点:
    • sendMessageBefore / sendMessageAfter - 发送前后
    • consumeMessageBefore / consumeMessageAfter - 消费前后
# RedisPendingMessageResendJob
package com.tz.scaffold.framework.mq.redis.core.job;
import cn.hutool.core.collection.CollUtil;
import com.tz.scaffold.framework.mq.redis.core.RedisMQTemplate;
import com.tz.scaffold.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
 * <p> Project: scaffold - RedisPendingMessageResendJob </p>
 *
 * 这个任务用于处理,crash 之后的消费者未消费完的消息
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@Slf4j
@AllArgsConstructor
public class RedisPendingMessageResendJob {
    private static final String LOCK_KEY = "redis:pending:msg:lock";
    /**
     * 消息超时时间,默认 5 分钟
     * <p>
     * <li>
     *     1. 超时的消息才会被重新投递
     * <li>
     *     2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息 5 分钟过期后,再等 1 分钟才会被扫瞄到
     */
    private static final int EXPIRE_TIME = 5 * 60;
    private final List<AbstractRedisStreamMessageListener<?>> listeners;
    private final RedisMQTemplate redisTemplate;
    private final String groupName;
    private final RedissonClient redissonClient;
    /**
     * 一分钟执行一次,这里选择每分钟的 35 秒执行,是为了避免整点任务过多的问题
     */
    @Scheduled(cron = "35 * * * * ?")
    public void messageResend() {
        RLock lock = redissonClient.getLock(LOCK_KEY);
        // 尝试加锁
        if (lock.tryLock()) {
            try {
                execute();
            } catch (Exception ex) {
                log.error("[messageResend][执行异常]", ex);
            } finally {
                lock.unlock();
            }
        }
    }
    /**
     * 执行清理逻辑
     * <p>
     * @see <a href="https://gitee.com/zhijiantianya/ruoyi-vue-pro/pulls/480/files"> 讨论 & lt;/a>
     */
    private void execute() {
        StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
        listeners.forEach(listener -> {
            PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));
            // 每个消费者的 pending 队列消息数量
            Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
            pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
                log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);
                // 每个消费者的 pending 消息的详情信息
                PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);
                if (pendingMessages.isEmpty()) {
                    return;
                }
                pendingMessages.forEach(pendingMessage -> {
                    // 获取消息上一次传递到 consumer 的时间,
                    long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();
                    if (lastDelivery < EXPIRE_TIME){
                        return;
                    }
                    // 获取指定 id 的消息体
                    List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),
                            Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));
                    if (CollUtil.isEmpty(records)) {
                        return;
                    }
                    // 重新投递消息
                    redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
                            // 设置内容
                            .ofObject(records.get(0).getValue())
                            .withStreamKey(listener.getStreamKey()));
                    //ack 消息消费完成
                    redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));
                    log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());
                });
            });
        });
    }
}
  • 待处理消息重发任务
  • 定时检查 Pending 状态的消息
  • 对超时未处理的消息进行重新投递
  • 使用分布式锁防止重复处理

# 集群消费

集群消费,是指消息发送到 Redis 时,有且只会被一个消费者 (应用 JM 实例) 收到,然后消费成功。

# 使用

需要引入 scaffold-spring-boot-starter-mq 组件

<!-- 消息队列相关 -->
<dependency>
    <groupId>com.tz.boot</groupId>
    <artifactId>scaffold-spring-boot-starter-mq</artifactId>
</dependency>
  1. 实现消息模板

    public SmsSendMessage extends AbstractRedisStreamMessage {
    	/**
         * 短信日志编号
         */
        @NotNull(message = "短信日志编号不能为空")
        private Long logId;
        /**
         * 手机号
         */
        @NotNull(message = "手机号不能为空")
        private String mobile;
        /**
         * 短信渠道编号
         */
        @NotNull(message = "短信渠道编号不能为空")
        private Long channelId;
        /**
         * 短信 API 的模板编号
         */
        @NotNull(message = "短信 API 的模板编号不能为空")
        private String apiTemplateId;
        /**
         * 短信模板参数
         */
        private List<KeyValue<String, Object>> templateParams;
    }
  2. SmsProducer 生产者

    /**
     * <p> Project: scaffold - SmsProducer </p>
     *
     * Sms 短信相关消息的 Producer
     * @author Tz
     * @date 2024/01/09 23:45
     * @version 1.0.0
     * @since 1.0.0
     */
    @Slf4j
    @Component
    public class SmsProducer {
        // 重点:注入 RedisMQTemplate 对象
        @Resource
        private RedisMQTemplate redisMQTemplate;
        /**
         * 发送 {@link SmsSendMessage} 消息
         *
         * @param logId 短信日志编号
         * @param mobile 手机号
         * @param channelId 渠道编号
         * @param apiTemplateId 短信模板编号
         * @param templateParams 短信模板参数
         */
        public void sendSmsSendMessage(Long logId, String mobile,
                                       Long channelId, String apiTemplateId, List<KeyValue<String, Object>> templateParams) {
            SmsSendMessage message = new SmsSendMessage().setLogId(logId).setMobile(mobile);
            message.setChannelId(channelId).setApiTemplateId(apiTemplateId).setTemplateParams(templateParams);
            // 重点: 使用 RedisMQTemplate 发送消息
            redisMQTemplate.send(message);
        }
    }
  3. SmsSendConsumer 消费者

    /**
     * <p> Project: scaffold - SmsSendConsumer </p>
     *
     * 针对 {@link SmsSendMessage} 的消费者
     * @author Tz
     * @date 2024/01/09 23:45
     * @version 1.0.0
     * @since 1.0.0
     */
    @Component
    @Slf4j
    // 重点:继承 AbstractRedisStreamMessageListener 类
    public class SmsSendConsumer extend AbstractRedisStreamMessageListener<SmsSendMessage> {
        @Resource
        private SmsSendService smsSendService;
        // 重点:实现 onMessage 方法
        @Override 
        public void onMessage(SmsSendMessage message) {
            log.info("[onMessage][消息内容({})]", message);
            smsSendService.doSendSms(message);
        }
    }

# 广播消费

广播消费,是指消息发送到 Redis 时,所有消费者(应用 JVM 实例)收到,然后消费成功。

# 使用场景

例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 Redis 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。

又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 Redis 广播消费。每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。

# 实现源码

广播消费基于 Redis Pub/Sub 实现:

  • 实现 AbstractChannelMessage 抽象类,定义【广播】消息。
  • 使用 RedisMQTemplate 的 #send(message) 方法,发送消息。
  • 实现 AbstractRedisChannelMessageListener 接口,消费消息。

最终使用 ScaffoldRedisMQConsumerAutoConfiguration 配置类,扫描所有的 AbstractRedisChannelMessageListener 监听器,初始化对应的消费者

代码:

/**
     * 创建 Redis Pub/Sub 广播消费的容器
     */
    @Bean
    @ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {
        // 创建 RedisMessageListenerContainer 对象
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        // 设置 RedisConnection 工厂。
        container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());
        // 添加监听器
        listeners.forEach(listener -> {
            listener.setRedisMQTemplate(redisMQTemplate);
            container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
            log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
                    listener.getChannel(), listener.getClass().getName());
        });
        return container;
    }