# scaffold 项目之消息队列基于 Redis 实现
# 简介
消息队列 (Message Queue) 是一种进程间通信或同一进程内不同线程间的通信方式,它提供了异步的通信协议,允许发送者和接收者不需要同时与消息队列交互。
# 使用场景
# 特点
- 解耦:生产者和消费者不需要知道彼此的存在
- 异步:生产者发送消息后可以立即返回,不需要等待消费者处理
- 削峰:在流量高峰时缓冲请求,避免系统过载
- 可靠性:消息可以持久化,确保不会丢失
- 扩展性:可以轻松增加消费者来提高处理能力
# 使用场景
- 订单处理系统:用户下单后,订单消息放入队列,由后台服务异步处理
- 日志收集:应用将日志发送到消息队列,由专门的日志处理服务消费
- 通知系统:用户行为触发通知,通过消息队列发送给通知服务
- 数据同步:不同系统间通过消息队列同步数据变更
- 任务调度:将定时任务放入队列,由工作节点消费执行
# 开始使用
首先本项目专门封装了一个消息队列的组件 scaffold-spring-boot-starter-mq 并且基于 redis 实现了分布式消息队列。
redis 实现核心点有一下两点:
- 使用 Stream 特性,提供【集群】消费的能力。
- 使用 Pub/Sub 特性,提供【广播】消费的能力。
# Stream 和 Pub/Sub 的区别和选择
# Stream vs Pub/Sub: 关键区别总结
| 特性 | Pub/Sub | Stream |
|---|---|---|
| 消息持久性 | ❌ 无 (瞬时,无订阅者即丢失) | ✅ 有 (持久存储在内存中) |
| 消息存储 | ❌ 服务器不存储消息 | ✅ 消息存储在 Stream 结构中 |
| 消费模型 | ✅ 广播 (所有订阅者收到所有消息) | ✅ 灵活:单消费者 / 消费者组 (负载均衡) |
| 消息回溯 | ❌ 无法读取历史消息 | ✅ 可以读取任意历史位置的消息 |
| 消费者状态 | ❌ 无状态 (服务器不跟踪消费者进度) | ✅ 有状态 (消费者组跟踪 > 和 PEL) |
| 可靠性保证 | ❌ 最多一次 (消息可能丢失) | ✅ 至少一次 (通过 PEL 和 XACK 机制) |
| 阻塞读取 | ✅ 支持 ( SUBSCRIBE 本身就是阻塞的) | ✅ 支持 ( XREAD/XREADGROUP with BLOCK ) |
| 容量控制 | ❌ 无 | ✅ 有 ( XTRIM , XADD ... MAXLEN ) |
| 主要用途 | 实时事件通知、广播 | 消息队列、事件溯源、可靠日志处理、数据管道 |
| 复杂度 | 简单 | 相对复杂 (尤其涉及消费者组管理) |
# 如何选择?
- 选择 Pub/Sub 当:
- 你需要瞬时广播通知。
- 消息丢失是可接受的(或者你知道总有订阅者在)。
- 不需要追踪消费者进度或重发消息。
- 场景非常简单,不需要持久化或历史数据。
- 选择 Stream 当:
- 你需要持久化消息。
- 你需要可靠的消息处理(“至少一次” 语义)。
- 你需要多个消费者分担负载(消费者组)。
- 你需要重放历史消息或从特定点开始消费。
- 你需要控制消息队列的长度。
- 你需要构建一个健壮的消息队列或事件溯源系统。
核心区别一句话总结:
- Pub/Sub: 是瞬时的广播 / 通知机制。消息如果没有订阅者,就永远丢失。它关注的是 “事件发生” 的通知。
- Stream: 是持久化的消息日志。消息被持久存储,消费者可以按需读取(首次读取、重读、回溯)。它关注的是 “发生了什么” 的记录和可靠处理。
# 封装 scaffold-spring-boot-starter-mq 组件
基于 Redis 实现的轻量级消息队列系统,支持两种消息模式:Pub/Sub(发布 / 订阅)和 Stream(流)。
# 主要功能
- 消息发送:
- 支持 Pub/Sub 模式的广播消息
- 支持 Stream 模式的持久化消息
- 自动序列化 / 反序列化消息
- 消息消费:
- Pub/Sub 监听器自动订阅指定 Channel
- Stream 监听器自动创建消费者组
- Stream 消息自动确认 (ack)
- 可靠性保障:
- Pending 消息重发机制
- 消息超时检查
- 消费者崩溃恢复
- 扩展性:
- 拦截器机制支持自定义处理逻辑
- 消息头支持附加元数据
# 消息模型
# Redis 消息抽象基类:
package com.tz.scaffold.framework.mq.redis.core.message; | |
/** | |
* <p> Project: scaffold - AbstractRedisMessage </p> | |
* | |
* Redis 消息抽象基类 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@Data | |
public abstract class AbstractRedisMessage { | |
/** | |
* 头 | |
*/ | |
private Map<String, String> headers = new HashMap<>(); | |
public String getHeader(String key) { | |
return headers.get(key); | |
} | |
public void addHeader(String key, String value) { | |
headers.put(key, value); | |
} | |
} |
- 消息抽象基类
- 包含 headers 属性用于存储消息头信息
- 提供基本的 header 操作方法
# AbstractRedisChannelMessage
package com.tz.scaffold.framework.mq.redis.core.pubsub; | |
/** | |
* <p> Project: scaffold - AbstractRedisChannelMessage </p> | |
* | |
* Redis Channel Message 抽象类 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
public abstract class AbstractRedisChannelMessage extends AbstractRedisMessage { | |
/** | |
* 获得 Redis Channel,默认使用类名 | |
* <p> | |
* JsonIgnore: 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。 | |
* @return Channel | |
*/ | |
@JsonIgnore | |
public String getChannel() { | |
return getClass().getSimpleName(); | |
} | |
} |
- Pub/Sub 消息抽象类
- 继承自
AbstractRedisMessage- 默认使用类名作为 Channel 名称
# AbstractRedisStreamMessage
package com.tz.scaffold.framework.mq.redis.core.stream; | |
/** | |
* <p> Project: scaffold - AbstractRedisStreamMessage </p> | |
* | |
* Redis Stream Message 抽象类 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
public abstract class AbstractRedisStreamMessage extends AbstractRedisMessage { | |
/** | |
* 获得 Redis Stream Key,默认使用类名 | |
* <p> | |
* JsonIgnore: 避免序列化。 | |
* @return Channel | |
*/ | |
@JsonIgnore | |
public String getStreamKey() { | |
return getClass().getSimpleName(); | |
} | |
} |
- Stream 消息抽象类
- 继承自
AbstractRedisMessage- 默认使用类名作为 Stream Key
# 核心模板类
RedisMQTemplate
package com.tz.scaffold.framework.mq.redis.core; | |
/** | |
* <p> Project: scaffold - RedisMQTemplate </p> | |
* | |
* Redis MQ 操作模板类 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@AllArgsConstructor | |
public class RedisMQTemplate { | |
@Getter | |
private final RedisTemplate<String, ?> redisTemplate; | |
/** | |
* 拦截器数组 | |
*/ | |
@Getter | |
private final List<RedisMessageInterceptor> interceptors = new ArrayList<>(); | |
/** | |
* 发送 Redis 消息,基于 Redis pub/sub 实现 | |
* | |
* @param message 消息 | |
*/ | |
public <T extends AbstractRedisChannelMessage> void send(T message) { | |
try { | |
sendMessageBefore(message); | |
// 发送消息 | |
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message)); | |
} finally { | |
sendMessageAfter(message); | |
} | |
} | |
/** | |
* 发送 Redis 消息,基于 Redis Stream 实现 | |
* | |
* @param message 消息 | |
* @return 消息记录的编号对象 | |
*/ | |
public <T extends AbstractRedisStreamMessage> RecordId send(T message) { | |
try { | |
sendMessageBefore(message); | |
// 发送消息 | |
return redisTemplate.opsForStream().add(StreamRecords.newRecord() | |
// 设置内容 | |
.ofObject(JsonUtils.toJsonString(message)) | |
// 设置 stream key | |
.withStreamKey(message.getStreamKey())); | |
} finally { | |
sendMessageAfter(message); | |
} | |
} | |
/** | |
* 添加拦截器 | |
* | |
* @param interceptor 拦截器 | |
*/ | |
public void addInterceptor(RedisMessageInterceptor interceptor) { | |
interceptors.add(interceptor); | |
} | |
private void sendMessageBefore(AbstractRedisMessage message) { | |
// 正序 | |
interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message)); | |
} | |
private void sendMessageAfter(AbstractRedisMessage message) { | |
// 倒序 | |
for (int i = interceptors.size() - 1; i >= 0; i--) { | |
interceptors.get(i).sendMessageAfter(message); | |
} | |
} | |
} |
- 消息队列操作模板类
- 提供两种消息发送方法:
send(T message)- 基于 Pub/Sub 的广播消息send(T message)- 基于 Stream 的持久化消息- 支持拦截器机制,可在消息发送前后和消费前后执行自定义逻辑
# 配置类
# ScaffoldRedisMQProducerAutoConfiguration
package com.tz.scaffold.framework.mq.redis.config; | |
import com.tz.scaffold.framework.mq.redis.core.RedisMQTemplate; | |
import com.tz.scaffold.framework.mq.redis.core.interceptor.RedisMessageInterceptor; | |
import com.tz.scaffold.framework.redis.config.ScaffoldRedisAutoConfiguration; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.boot.autoconfigure.AutoConfiguration; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.data.redis.core.StringRedisTemplate; | |
import java.util.List; | |
/** | |
* <p> Project: scaffold - ScaffoldRedisMQProducerAutoConfiguration </p> | |
* | |
* Redis 消息队列 Producer 配置类 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@Slf4j | |
@AutoConfiguration(after = ScaffoldRedisAutoConfiguration.class) | |
public class ScaffoldRedisMQProducerAutoConfiguration { | |
@Bean | |
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate, | |
List<RedisMessageInterceptor> interceptors) { | |
RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate); | |
// 添加拦截器 | |
interceptors.forEach(redisMQTemplate::addInterceptor); | |
return redisMQTemplate; | |
} | |
} |
- 生产者自动配置类
- 主要创建
RedisMQTemplatebean- 自动收集所有
RedisMessageInterceptor拦截器并注入到模板中
# ScaffoldRedisMQConsumerAutoConfiguration
package com.tz.scaffold.framework.mq.redis.config; | |
/** | |
* <p> Project: scaffold - ScaffoldRedisMQConsumerAutoConfiguration </p> | |
* | |
* Redis 消息队列 Consumer 配置类 | |
* <p> | |
* EnableScheduling: 启用定时任务,用于 RedisPendingMessageResendJob 重发消息 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@Slf4j | |
@EnableScheduling | |
@AutoConfiguration(after = ScaffoldRedisAutoConfiguration.class) | |
public class ScaffoldRedisMQConsumerAutoConfiguration { | |
/** | |
* 创建 Redis Pub/Sub 广播消费的容器 | |
* <p> | |
* ConditionalOnBean (AbstractRedisChannelMessageListener.class) : | |
* 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听 | |
*/ | |
@Bean | |
@ConditionalOnBean(AbstractRedisChannelMessageListener.class) | |
public RedisMessageListenerContainer redisMessageListenerContainer( | |
RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) { | |
// 创建 RedisMessageListenerContainer 对象 | |
RedisMessageListenerContainer container = new RedisMessageListenerContainer(); | |
// 设置 RedisConnection 工厂。 | |
container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory()); | |
// 添加监听器 | |
listeners.forEach(listener -> { | |
listener.setRedisMQTemplate(redisMQTemplate); | |
container.addMessageListener(listener, new ChannelTopic(listener.getChannel())); | |
log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]", | |
listener.getChannel(), listener.getClass().getName()); | |
}); | |
return container; | |
} | |
/** | |
* 创建 Redis Stream 重新消费的任务 | |
* <p> | |
* ConditionalOnBean (AbstractRedisStreamMessageListener.class) : | |
* 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 | |
*/ | |
@Bean | |
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) | |
public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners, | |
RedisMQTemplate redisTemplate, | |
@Value("${spring.application.name}") String groupName, | |
RedissonClient redissonClient) { | |
return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient); | |
} | |
/** | |
* 创建 Redis Stream 集群消费的容器 | |
* <p> | |
* 基础知识:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令 & lt;/a> | |
* <p> | |
* ConditionalOnBean (AbstractRedisStreamMessageListener.class) : | |
* 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 | |
*/ | |
@Bean(initMethod = "start", destroyMethod = "stop") | |
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) | |
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer( | |
RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) { | |
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate(); | |
checkRedisVersion(redisTemplate); | |
// 第一步,创建 StreamMessageListenerContainer 容器 | |
// 创建 options 配置 | |
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions = | |
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() | |
// 一次性最多拉取多少条消息 | |
.batchSize(10) | |
// 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化 | |
.targetType(String.class) | |
.build(); | |
// 创建 container 对象 | |
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = | |
StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions); | |
// 第二步,注册监听器,消费对应的 Stream 主题 | |
String consumerName = buildConsumerName(); | |
listeners.parallelStream().forEach(listener -> { | |
log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]", | |
listener.getStreamKey(), listener.getClass().getName()); | |
// 创建 listener 对应的消费者分组 | |
try { | |
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); | |
} catch (Exception ignore) { | |
} | |
// 设置 listener 对应的 redisTemplate | |
listener.setRedisMQTemplate(redisMQTemplate); | |
// 创建 Consumer 对象 | |
Consumer consumer = Consumer.from(listener.getGroup(), consumerName); | |
// 设置 Consumer 消费进度,以最小消费进度为准 | |
StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed()); | |
// 设置 Consumer 监听 | |
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest | |
.builder(streamOffset).consumer(consumer) | |
// 不自动 ack | |
.autoAcknowledge(false) | |
// 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false | |
.cancelOnError(throwable -> false); | |
container.register(builder.build(), listener); | |
log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]", | |
listener.getStreamKey(), listener.getClass().getName()); | |
}); | |
return container; | |
} | |
/** | |
* 构建消费者名字,使用本地 IP + 进程编号的方式。 | |
* 参考自 RocketMQ clientId 的实现 | |
* | |
* @return 消费者名字 | |
*/ | |
private static String buildConsumerName() { | |
return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID()); | |
} | |
/** | |
* 校验 Redis 版本号,是否满足最低的版本号要求! | |
*/ | |
private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) { | |
// 获得 Redis 版本 | |
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info); | |
String version = MapUtil.getStr(info, "redis_version"); | |
// 校验最低版本必须大于等于 5.0.0 | |
int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false)); | |
if (majorVersion < 5) { | |
throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" + | |
"请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl())); | |
} | |
} | |
} |
- 消费者自动配置类
- 配置 Pub/Sub 监听容器 (
RedisMessageListenerContainer)- 配置 Stream 监听容器 (
StreamMessageListenerContainer)- 创建消息重发任务 (
RedisPendingMessageResendJob)
# 监听器
# AbstractRedisChannelMessageListener
package com.tz.scaffold.framework.mq.redis.core.pubsub; | |
/** | |
* <p> Project: scaffold - AbstractRedisChannelMessageListener </p> | |
* | |
* Redis Pub/Sub 监听器抽象类,用于实现广播消费 | |
* <p> | |
* @param <T> 消息类型。一定要填写噢,不然会报错 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
public abstract class AbstractRedisChannelMessageListener<T extends AbstractRedisChannelMessage> implements MessageListener { | |
/** | |
* 消息类型 | |
*/ | |
private final Class<T> messageType; | |
/** | |
* Redis Channel | |
*/ | |
private final String channel; | |
/** | |
* RedisMQTemplate | |
*/ | |
@Setter | |
private RedisMQTemplate redisMQTemplate; | |
@SneakyThrows | |
protected AbstractRedisChannelMessageListener() { | |
this.messageType = getMessageClass(); | |
this.channel = messageType.getDeclaredConstructor().newInstance().getChannel(); | |
} | |
/** | |
* 获得 Sub 订阅的 Redis Channel 通道 | |
* | |
* @return channel | |
*/ | |
public final String getChannel() { | |
return channel; | |
} | |
@Override | |
public final void onMessage(Message message, byte[] bytes) { | |
T messageObj = JsonUtils.parseObject(message.getBody(), messageType); | |
try { | |
consumeMessageBefore(messageObj); | |
// 消费消息 | |
this.onMessage(messageObj); | |
} finally { | |
consumeMessageAfter(messageObj); | |
} | |
} | |
/** | |
* 处理消息 | |
* | |
* @param message 消息 | |
*/ | |
public abstract void onMessage(T message); | |
/** | |
* 通过解析类上的泛型,获得消息类型 | |
* | |
* @return 消息类型 | |
*/ | |
@SuppressWarnings("unchecked") | |
private Class<T> getMessageClass() { | |
Type type = TypeUtil.getTypeArgument(getClass(), 0); | |
if (type == null) { | |
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName())); | |
} | |
return (Class<T>) type; | |
} | |
private void consumeMessageBefore(AbstractRedisMessage message) { | |
assert redisMQTemplate != null; | |
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors(); | |
// 正序 | |
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message)); | |
} | |
private void consumeMessageAfter(AbstractRedisMessage message) { | |
assert redisMQTemplate != null; | |
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors(); | |
// 倒序 | |
for (int i = interceptors.size() - 1; i >= 0; i--) { | |
interceptors.get(i).consumeMessageAfter(message); | |
} | |
} | |
} |
- Pub/Sub 消息监听器抽象类
- 实现
MessageListener接口- 自动解析泛型类型确定消息类型
- 支持拦截器机制
AbstractRedisStreamMessageListener
package com.tz.scaffold.framework.mq.redis.core.stream; | |
import cn.hutool.core.util.TypeUtil; | |
import com.tz.scaffold.framework.common.util.json.JsonUtils; | |
import com.tz.scaffold.framework.mq.redis.core.RedisMQTemplate; | |
import com.tz.scaffold.framework.mq.redis.core.interceptor.RedisMessageInterceptor; | |
import com.tz.scaffold.framework.mq.redis.core.message.AbstractRedisMessage; | |
import lombok.Getter; | |
import lombok.Setter; | |
import lombok.SneakyThrows; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.data.redis.connection.stream.ObjectRecord; | |
import org.springframework.data.redis.stream.StreamListener; | |
import java.lang.reflect.Type; | |
import java.util.List; | |
/** | |
* <p> Project: scaffold - AbstractRedisStreamMessageListener </p> | |
* | |
* Redis Stream 监听器抽象类,用于实现集群消费 | |
* <p> | |
* @param <T> 消息类型。一定要填写噢,不然会报错 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
public abstract class AbstractRedisStreamMessageListener<T extends AbstractRedisStreamMessage> | |
implements StreamListener<String, ObjectRecord<String, String>> { | |
/** | |
* 消息类型 | |
*/ | |
private final Class<T> messageType; | |
/** | |
* Redis Channel | |
*/ | |
@Getter | |
private final String streamKey; | |
/** | |
* Redis 消费者分组,默认使用 spring.application.name 名字 | |
*/ | |
@Value("${spring.application.name}") | |
@Getter | |
private String group; | |
/** | |
* RedisMQTemplate | |
*/ | |
@Setter | |
private RedisMQTemplate redisMQTemplate; | |
@SneakyThrows | |
protected AbstractRedisStreamMessageListener() { | |
this.messageType = getMessageClass(); | |
this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey(); | |
} | |
@Override | |
public void onMessage(ObjectRecord<String, String> message) { | |
// 消费消息 | |
T messageObj = JsonUtils.parseObject(message.getValue(), messageType); | |
try { | |
consumeMessageBefore(messageObj); | |
// 消费消息 | |
this.onMessage(messageObj); | |
//ack 消息消费完成 | |
redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message); | |
// TODO 需要额外考虑以下几个点: | |
// 1. 处理异常的情况 | |
// 2. 发送日志;以及事务的结合 | |
// 3. 消费日志;以及通用的幂等性 | |
// 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638 | |
} finally { | |
consumeMessageAfter(messageObj); | |
} | |
} | |
/** | |
* 处理消息 | |
* | |
* @param message 消息 | |
*/ | |
public abstract void onMessage(T message); | |
/** | |
* 通过解析类上的泛型,获得消息类型 | |
* | |
* @return 消息类型 | |
*/ | |
@SuppressWarnings("unchecked") | |
private Class<T> getMessageClass() { | |
Type type = TypeUtil.getTypeArgument(getClass(), 0); | |
if (type == null) { | |
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName())); | |
} | |
return (Class<T>) type; | |
} | |
private void consumeMessageBefore(AbstractRedisMessage message) { | |
assert redisMQTemplate != null; | |
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors(); | |
// 正序 | |
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message)); | |
} | |
private void consumeMessageAfter(AbstractRedisMessage message) { | |
assert redisMQTemplate != null; | |
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors(); | |
// 倒序 | |
for (int i = interceptors.size() - 1; i >= 0; i--) { | |
interceptors.get(i).consumeMessageAfter(message); | |
} | |
} | |
} |
- Stream 消息监听器抽象类
- 实现
StreamListener接口- 自动解析泛型类型确定消息类型
- 支持消费者组概念
- 自动 ack 消息
- 支持拦截器机制
# 辅助组件
# RedisMessageInterceptor
package com.tz.scaffold.framework.mq.redis.core.interceptor; | |
import com.tz.scaffold.framework.mq.redis.core.message.AbstractRedisMessage; | |
/** | |
* <p> Project: scaffold - RedisMessageInterceptor </p> | |
* | |
* {@link AbstractRedisMessage} 消息拦截器 | |
* <p> | |
* 通过拦截器,作为插件机制,实现拓展。 | |
* <p> | |
* 例如说,多租户场景下的 MQ 消息处理 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
public interface RedisMessageInterceptor { | |
default void sendMessageBefore(AbstractRedisMessage message) { | |
} | |
default void sendMessageAfter(AbstractRedisMessage message) { | |
} | |
default void consumeMessageBefore(AbstractRedisMessage message) { | |
} | |
default void consumeMessageAfter(AbstractRedisMessage message) { | |
} | |
} |
- 消息拦截器接口
- 定义四个拦截点:
sendMessageBefore/sendMessageAfter- 发送前后consumeMessageBefore/consumeMessageAfter- 消费前后
# RedisPendingMessageResendJob
package com.tz.scaffold.framework.mq.redis.core.job; | |
import cn.hutool.core.collection.CollUtil; | |
import com.tz.scaffold.framework.mq.redis.core.RedisMQTemplate; | |
import com.tz.scaffold.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; | |
import lombok.AllArgsConstructor; | |
import lombok.extern.slf4j.Slf4j; | |
import org.redisson.api.RLock; | |
import org.redisson.api.RedissonClient; | |
import org.springframework.data.domain.Range; | |
import org.springframework.data.redis.connection.stream.*; | |
import org.springframework.data.redis.core.StreamOperations; | |
import org.springframework.scheduling.annotation.Scheduled; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Objects; | |
/** | |
* <p> Project: scaffold - RedisPendingMessageResendJob </p> | |
* | |
* 这个任务用于处理,crash 之后的消费者未消费完的消息 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@Slf4j | |
@AllArgsConstructor | |
public class RedisPendingMessageResendJob { | |
private static final String LOCK_KEY = "redis:pending:msg:lock"; | |
/** | |
* 消息超时时间,默认 5 分钟 | |
* <p> | |
* <li> | |
* 1. 超时的消息才会被重新投递 | |
* <li> | |
* 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息 5 分钟过期后,再等 1 分钟才会被扫瞄到 | |
*/ | |
private static final int EXPIRE_TIME = 5 * 60; | |
private final List<AbstractRedisStreamMessageListener<?>> listeners; | |
private final RedisMQTemplate redisTemplate; | |
private final String groupName; | |
private final RedissonClient redissonClient; | |
/** | |
* 一分钟执行一次,这里选择每分钟的 35 秒执行,是为了避免整点任务过多的问题 | |
*/ | |
@Scheduled(cron = "35 * * * * ?") | |
public void messageResend() { | |
RLock lock = redissonClient.getLock(LOCK_KEY); | |
// 尝试加锁 | |
if (lock.tryLock()) { | |
try { | |
execute(); | |
} catch (Exception ex) { | |
log.error("[messageResend][执行异常]", ex); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
} | |
/** | |
* 执行清理逻辑 | |
* <p> | |
* @see <a href="https://gitee.com/zhijiantianya/ruoyi-vue-pro/pulls/480/files"> 讨论 & lt;/a> | |
*/ | |
private void execute() { | |
StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream(); | |
listeners.forEach(listener -> { | |
PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName)); | |
// 每个消费者的 pending 队列消息数量 | |
Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); | |
pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> { | |
log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount); | |
// 每个消费者的 pending 消息的详情信息 | |
PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount); | |
if (pendingMessages.isEmpty()) { | |
return; | |
} | |
pendingMessages.forEach(pendingMessage -> { | |
// 获取消息上一次传递到 consumer 的时间, | |
long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds(); | |
if (lastDelivery < EXPIRE_TIME){ | |
return; | |
} | |
// 获取指定 id 的消息体 | |
List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(), | |
Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString()))); | |
if (CollUtil.isEmpty(records)) { | |
return; | |
} | |
// 重新投递消息 | |
redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() | |
// 设置内容 | |
.ofObject(records.get(0).getValue()) | |
.withStreamKey(listener.getStreamKey())); | |
//ack 消息消费完成 | |
redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0)); | |
log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId()); | |
}); | |
}); | |
}); | |
} | |
} |
- 待处理消息重发任务
- 定时检查 Pending 状态的消息
- 对超时未处理的消息进行重新投递
- 使用分布式锁防止重复处理
# 集群消费
集群消费,是指消息发送到 Redis 时,有且只会被一个消费者 (应用 JM 实例) 收到,然后消费成功。
# 使用
需要引入 scaffold-spring-boot-starter-mq 组件
<!-- 消息队列相关 --> | |
<dependency> | |
<groupId>com.tz.boot</groupId> | |
<artifactId>scaffold-spring-boot-starter-mq</artifactId> | |
</dependency> |
实现消息模板
public SmsSendMessage extends AbstractRedisStreamMessage {
/*** 短信日志编号
*/
@NotNull(message = "短信日志编号不能为空")
private Long logId;
/*** 手机号
*/
@NotNull(message = "手机号不能为空")
private String mobile;
/*** 短信渠道编号
*/
@NotNull(message = "短信渠道编号不能为空")
private Long channelId;
/*** 短信 API 的模板编号
*/
@NotNull(message = "短信 API 的模板编号不能为空")
private String apiTemplateId;
/*** 短信模板参数
*/
private List<KeyValue<String, Object>> templateParams;
}SmsProducer 生产者
/*** <p> Project: scaffold - SmsProducer </p>
*
* Sms 短信相关消息的 Producer
* @author Tz
* @date 2024/01/09 23:45
* @version 1.0.0
* @since 1.0.0
*/
@Slf4j@Componentpublic class SmsProducer {
// 重点:注入 RedisMQTemplate 对象@Resourceprivate RedisMQTemplate redisMQTemplate;
/*** 发送 {@link SmsSendMessage} 消息*
* @param logId 短信日志编号
* @param mobile 手机号
* @param channelId 渠道编号
* @param apiTemplateId 短信模板编号
* @param templateParams 短信模板参数
*/
public void sendSmsSendMessage(Long logId, String mobile,
Long channelId, String apiTemplateId, List<KeyValue<String, Object>> templateParams) {
SmsSendMessage message = new SmsSendMessage().setLogId(logId).setMobile(mobile);
message.setChannelId(channelId).setApiTemplateId(apiTemplateId).setTemplateParams(templateParams);
// 重点: 使用 RedisMQTemplate 发送消息redisMQTemplate.send(message);
}}SmsSendConsumer 消费者
/*** <p> Project: scaffold - SmsSendConsumer </p>
*
* 针对 {@link SmsSendMessage} 的消费者* @author Tz
* @date 2024/01/09 23:45
* @version 1.0.0
* @since 1.0.0
*/
@Component@Slf4j// 重点:继承 AbstractRedisStreamMessageListener 类public class SmsSendConsumer extend AbstractRedisStreamMessageListener<SmsSendMessage> {
@Resourceprivate SmsSendService smsSendService;
// 重点:实现 onMessage 方法@Overridepublic void onMessage(SmsSendMessage message) {
log.info("[onMessage][消息内容({})]", message);
smsSendService.doSendSms(message);
}}
# 广播消费
广播消费,是指消息发送到 Redis 时,所有消费者(应用 JVM 实例)收到,然后消费成功。
# 使用场景
例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 Redis 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。
又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 Redis 广播消费。每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。
# 实现源码
广播消费基于 Redis Pub/Sub 实现:
- 实现 AbstractChannelMessage 抽象类,定义【广播】消息。
- 使用 RedisMQTemplate 的
#send(message)方法,发送消息。 - 实现 AbstractRedisChannelMessageListener 接口,消费消息。
最终使用 ScaffoldRedisMQConsumerAutoConfiguration 配置类,扫描所有的 AbstractRedisChannelMessageListener 监听器,初始化对应的消费者
代码:
/** | |
* 创建 Redis Pub/Sub 广播消费的容器 | |
*/ | |
@Bean | |
@ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听 | |
public RedisMessageListenerContainer redisMessageListenerContainer( | |
RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) { | |
// 创建 RedisMessageListenerContainer 对象 | |
RedisMessageListenerContainer container = new RedisMessageListenerContainer(); | |
// 设置 RedisConnection 工厂。 | |
container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory()); | |
// 添加监听器 | |
listeners.forEach(listener -> { | |
listener.setRedisMQTemplate(redisMQTemplate); | |
container.addMessageListener(listener, new ChannelTopic(listener.getChannel())); | |
log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]", | |
listener.getChannel(), listener.getClass().getName()); | |
}); | |
return container; | |
} |