消息队列方案选型
# 11.消息队列方案选型
本篇定位:消息队列是分布式系统的"快递员"——它解耦服务、削峰填谷、异步处理。但选错了 MQ 就像选错了快递公司,便宜的丢件,贵的小件也用,慢的耽误事。本文从一个消息丢失事故讲起,回答三个核心问题——MQ 解决什么本质问题?业界四大 MQ 怎么选?怎么保证消息不丢不重?
# 目录介绍
# 01.一条丢失的消息
# 1.1 消失的退款
某电商客服收到一个用户投诉:"已经退款 7 天还没到账"。客服查订单系统:已退款。查支付系统:未收到退款指令。两个系统数据对不上。
进一步排查发现:当天有 134 笔退款"消失了"——订单系统标记已退款,但支付系统从来没收到退款消息。
# 1.2 故障定位过程
flowchart TD
A[订单系统标记退款] --> B[发送退款消息到 MQ]
B --> C[消息进入 MQ Broker]
C --> D[支付系统消费消息]
D --> E[执行退款]
F[根因定位] --> F1[① 那天 MQ Broker 重启]
F --> F2[② 内存中的消息没刷盘就丢了]
F --> F3[③ 生产者用的是异步发送]
F --> F4[④ 没有失败回调处理]
F --> F5[⑤ 没有事务消息保证]
style F1 fill:#ffebee
style F2 fill:#ffebee
style F3 fill:#ffebee
style F4 fill:#ffebee
style F5 fill:#ffebee
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
5 道防线全部失守——任意一道做对都不会丢这 134 笔。
# 1.3 反思 MQ 设计
事后这个团队总结了三个最深刻的教训:
- MQ 不是"丢了无所谓"的轻量组件——业务依赖它就要把它当数据库一样可靠
- 可靠性不是 MQ 单方面保证——生产端、Broker、消费端三方都要做对
- 消息事务才是金融场景的兜底——没事务消息的退款链路本质上是赌博
带着这个事故,我们重新审视 MQ 解决的本质问题。
# 02.要解决的核心矛盾
# 2.1 同步耦合的代价
没有 MQ 时,A 服务调用 B 服务、B 调用 C,一个挂了全链路挂:
graph LR
subgraph "❌ 同步耦合"
A1[订单服务] -->|HTTP| B1[积分服务]
B1 -->|HTTP| C1[消息推送]
C1 -->|HTTP| D1[数据分析]
end
subgraph "✅ MQ 解耦"
A2[订单服务] -->|发消息| MQ
MQ -->|订阅| B2[积分服务]
MQ -->|订阅| C2[消息推送]
MQ -->|订阅| D2[数据分析]
end
style B1 fill:#ffebee
style C1 fill:#ffebee
style D1 fill:#ffebee
style MQ fill:#e8f5e8
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
同步链路的 3 个致命问题:
- 耦合:每加一个下游就要改 A 服务
- 扩散故障:D 挂了 A 也挂
- 响应慢:耗时 = 所有服务耗时之和
# 2.2 流量峰谷不均
某秒杀场景:平时 1000 QPS,秒杀瞬间 10w QPS。
graph LR
A[10w QPS 瞬时流量] --> B[MQ 缓冲]
B --> C[下游平稳消费 5w QPS]
A1[峰值流量] -.- A
B1[消峰填谷] -.- B
C1[平稳处理] -.- C
style B fill:#fff3e0
2
3
4
5
6
7
8
9
MQ 的削峰能力:把瞬时洪峰摊平成持续的稳定流量,下游只需要按平均流量设计容量。
# 2.3 一致性与解耦
解耦带来的副作用是不再实时一致:
| 维度 | 同步 | 异步(MQ) |
|---|---|---|
| 一致性 | 实时一致 | 最终一致 |
| 性能 | 慢 | 快 |
| 耦合 | 强 | 弱 |
| 故障扩散 | 易 | 隔离 |
关键认知:用 MQ 就是接受"最终一致"——业务方必须能容忍短暂的状态不一致。
# 2.4 MQ 的本质
MQ = 用一份数据存储 + 异步消费换"解耦"和"削峰"
它不是免费的——你换来了可扩展性 + 高性能,付出的代价是复杂度 + 最终一致性。
# 03.业界主流方案
# 3.1 四大 MQ 概览
Kafka(LinkedIn → Apache) 为大数据而生。日志、监控、数据管道首选。吞吐量极高,但功能相对简单。
RocketMQ(阿里 → Apache) 为业务而生。事务消息、延迟消息、消息回溯等业务特性丰富。国内电商场景占主导。
RabbitMQ(Erlang) 经典传统 MQ。AMQP 协议、丰富的路由能力(Exchange / Routing Key),中小型企业流行。
Pulsar(Yahoo → Apache) 新一代 MQ。云原生、计算与存储分离、多租户。后来者,社区上升中。
flowchart LR
A[RabbitMQ<br/>2007] --> B[Kafka<br/>2011]
B --> C[RocketMQ<br/>2012]
C --> D[Pulsar<br/>2016]
A1[传统 AMQP] -.- A
B1[大数据] -.- B
C1[业务特性丰富] -.- C
D1[云原生] -.- D
style A fill:#e3f2fd
style B fill:#e8f5e8
style C fill:#fff3e0
style D fill:#f3e5f5
2
3
4
5
6
7
8
9
10
11
12
13
14
# 3.2 横向对比矩阵
| 维度 | Kafka | RocketMQ | RabbitMQ | Pulsar |
|---|---|---|---|---|
| 吞吐量 | 极高(百万 TPS) | 高(10w TPS) | 中(万 TPS) | 极高 |
| 延迟 | 毫秒 | 毫秒 | 微秒(最低) | 毫秒 |
| 可靠性 | 高 | 高 | 高 | 高 |
| 事务消息 | 有但弱 | ✅ 强 | ❌ | ✅ |
| 延迟消息 | ❌ | ✅ | 插件 | ✅ |
| 消息回溯 | ✅ | ✅ | ❌ | ✅ |
| 顺序消息 | 分区内有序 | 严格有序 | 队列内有序 | 分区内有序 |
| 消息过滤 | ❌ | ✅ Tag/SQL | ✅ | ✅ |
| 多租户 | 弱 | 弱 | 中 | ✅ 强 |
| 学习曲线 | 中 | 中 | 平 | 陡 |
| 社区 | 顶级 | 顶级(中文) | 活跃 | 上升中 |
# 3.3 典型场景对照
| 场景 | 首选 MQ | 原因 |
|---|---|---|
| 日志收集 / 大数据 | Kafka | 高吞吐 + 持久化 |
| 业务消息(订单/支付) | RocketMQ | 事务消息 + 业务特性 |
| 传统企业 / 复杂路由 | RabbitMQ | AMQP + 丰富路由 |
| 云原生 / 多租户 | Pulsar | 计算存储分离 |
| 任务队列 / 简单异步 | RabbitMQ / Redis Stream | 轻量 |
| 实时计算上游 | Kafka | Flink / Spark 标配 |
| 金融级事务 | RocketMQ | 事务消息能力 |
实战建议:
- 不知道选什么 → Kafka(生态最好)
- 业务消息为主 → RocketMQ(特性最贴合业务)
- 传统企业 / 小流量 → RabbitMQ(最简单)
# 04.设计核心原则
# 4.1 不丢消息原则
端到端不丢消息需要三方协作:
graph LR
P[生产端<br/>不丢] --> B[Broker<br/>不丢]
B --> C[消费端<br/>不丢]
P1[确认机制 + 重试] -.- P
B1[多副本 + 持久化] -.- B
C1[手动 ACK + 失败重投] -.- C
style P fill:#fff3e0
style B fill:#e8f5e8
style C fill:#f3e5f5
2
3
4
5
6
7
8
9
10
11
任何一环掉链子,端到端可靠性就破功了。
# 4.2 幂等消费原则
幂等 = 同一条消息被消费多次,业务结果不变。
为什么必须幂等?因为重复消费几乎不可避免:
- 网络超时但消息其实送达了,生产者重发
- 消费者处理完没来得及 ACK 就重启了
- Broker 故障切主导致消息重发
实现幂等的 4 种典型模式:
| 模式 | 思路 | 适用 |
|---|---|---|
| 唯一索引 | 业务 ID 落 DB 唯一索引,重复直接报错 | 简单场景 |
| 状态机 | 业务流转有严格状态,重复操作无效 | 订单 / 支付 |
| 乐观锁 | version 字段,同 version 只生效一次 | 更新场景 |
| Token / 去重表 | 每条消息有唯一 ID,消费前先查去重表 | 通用 |
# 4.3 顺序保证原则
有些业务必须按顺序消费:
| 场景 | 顺序要求 |
|---|---|
| 订单状态变更 | 创建 → 支付 → 发货 → 完成 |
| 银行流水 | 必须按时间序 |
| MySQL binlog 同步 | 必须严格按写入顺序 |
实现顺序的关键:同一业务键的消息进同一队列。
// 生产端:相同 orderId 投递到同一分区
producer.send(message, partitionKey = orderId)
// 消费端:单线程消费每个分区
consumer.subscribe(partition) {
// 单线程处理
}
2
3
4
5
6
7
代价:吞吐量下降(不能并行),所以只在必要场景用顺序消息。
# 4.4 削峰填谷原则
graph TB
subgraph "请求流量"
Peak[峰值 10w QPS]
Avg[平均 1w QPS]
end
subgraph "MQ 缓冲"
Buffer[消息堆积]
end
subgraph "下游消费"
Consume[稳定 2w QPS]
end
Peak --> Buffer
Buffer --> Consume
Note["容量按 2x 平均设计<br/>峰值靠堆积消化"]
style Buffer fill:#fff3e0
style Consume fill:#e8f5e8
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
关键设计:
- 消费者容量按"平均流量 × 2"配置(不是峰值)
- MQ 容量按"峰值持续时间 × 流量差"配置
- 监控消息堆积,超过阈值告警或自动扩容
# 05.MQ 落地实战
# 5.1 整体架构设计
graph TB
subgraph "生产端"
App1[业务服务]
Producer[MQ Producer SDK]
end
subgraph "MQ 集群"
NameServer[NameServer/ZK]
B1[Broker A 主]
B2[Broker A 从]
B3[Broker B 主]
B4[Broker B 从]
end
subgraph "消费端"
C1[消费者集群1]
C2[消费者集群2]
DLQ[死信队列]
end
App1 --> Producer
Producer -->|查路由| NameServer
Producer -->|发消息| B1 & B3
B1 -.复制.-> B2
B3 -.复制.-> B4
B1 & B3 --> C1
B1 & B3 --> C2
C1 & C2 -.失败超限.-> DLQ
style B1 fill:#fff3e0
style B3 fill:#fff3e0
style DLQ fill:#ffebee
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 5.2 生产端可靠投递
三个关键动作:
// 1. 同步发送 + 确认(最可靠)
val result = producer.send(message)
if (result.status != SUCCESS) {
throw RuntimeException("发送失败")
}
// 2. 异步发送 + 回调(性能好但要处理失败)
producer.sendAsync(message, object : Callback {
override fun onSuccess(result: SendResult) { /* 记录成功 */ }
override fun onException(e: Throwable) {
// ❗ 必须有失败处理:重试 / 落库 / 告警
}
})
// 3. 事务消息(金融级一致性)
producer.sendInTransaction(message) {
// 本地事务
db.update(...)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
消息丢失最常见的原因:异步发送 + 没有失败处理 → 故障时静默丢消息。开篇那个 134 笔退款丢失就是这个原因。
# 5.3 消费端可靠消费
核心是 ACK 机制:
sequenceDiagram
participant MQ as MQ Broker
participant C as 消费者
participant DB as 业务库
MQ->>C: 推送消息
C->>DB: 业务处理
DB-->>C: 处理成功
C->>MQ: 手动 ACK
Note over MQ,C: 如果 C 没 ACK,MQ 会重新投递
alt 业务处理失败
C->>MQ: NACK / 不 ACK
MQ->>C: 一段时间后重试
end
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
铁律:业务处理成功后才 ACK——绝不能"先 ACK 后处理"。
# 5.4 死信与重试
重试策略:
flowchart TD
Msg[消息消费失败] --> R1[第1次重试 10秒后]
R1 -.失败.-> R2[第2次重试 30秒后]
R2 -.失败.-> R3[第3次重试 1分钟后]
R3 -.失败.-> R4[第4次重试 2分钟后]
R4 -.失败.-> R5[第5次重试 5分钟后]
R5 -.失败超限.-> DLQ[进入死信队列<br/>人工介入]
style DLQ fill:#ffebee
2
3
4
5
6
7
8
9
死信队列(DLQ)的价值:
- 失败消息不丢(可人工修复)
- 不阻塞正常消费
- 提供告警入口
# 5.5 消息事务方案
最严苛的场景:本地数据库操作和消息发送必须同时成功或同时失败。
sequenceDiagram
participant App as 业务
participant MQ as MQ
participant DB as 数据库
Note over App,MQ: 阶段1: 发送半消息(对消费者不可见)
App->>MQ: 半消息 (Prepare)
MQ-->>App: 半消息存储成功
Note over App,DB: 阶段2: 执行本地事务
App->>DB: 业务事务
DB-->>App: 事务结果
Note over App,MQ: 阶段3: 提交或回滚
alt 本地事务成功
App->>MQ: Commit (消息变为可见)
MQ->>MQ: 投递给消费者
else 本地事务失败
App->>MQ: Rollback (删除半消息)
end
Note over MQ,App: 异常: 如果阶段3 没收到指令
MQ->>App: 主动回查事务状态
App-->>MQ: 返回最终状态
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
这就是 RocketMQ 事务消息的核心机制——通过"半消息 + 回查"保证消息发送和本地事务原子性。
# 06.三大经典问题
# 6.1 消息丢失问题
全链路可能丢失的 5 个点 + 解决方案:
| 丢失点 | 原因 | 解决 |
|---|---|---|
| 生产端发送时 | 网络抖动 | 同步发送 + 重试 + 失败兜底 |
| Broker 内存 | 还没刷盘就重启 | 同步刷盘配置 |
| 主从切换时 | 主库消息没同步到从 | 多副本同步复制 |
| 消费端拉取后 | 没处理就 ACK 了 | 业务成功后再 ACK |
| 消费业务异常 | 处理失败但 ACK 了 | try-catch + 失败重投 |
# 6.2 消息重复问题
重复几乎不可避免,唯一对策是消费者保证幂等。
// 通用幂等模板
@MessageHandler
fun handle(msg: Message): Boolean {
val msgId = msg.id
// 1. 查去重表
if (idempotentRepo.exists(msgId)) {
return true // 已处理过,直接返回成功
}
// 2. 业务处理 + 写去重表(一个事务内)
transaction {
businessLogic(msg)
idempotentRepo.save(msgId)
}
return true
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 6.3 消息堆积问题
消费速度跟不上生产速度,消息越堆越多:
flowchart TD
Detected[发现消息堆积] --> Q1{堆积量级?}
Q1 -->|< 1万| Wait[观察是否自然消化]
Q1 -->|1万-100万| Q2{消费者瓶颈?}
Q1 -->|> 100万| Emergency[紧急扩容 + 降级]
Q2 -->|是 CPU/IO| Scale[扩容消费者]
Q2 -->|是业务逻辑| Optimize[优化业务代码]
Q2 -->|是下游慢| Async[改异步或分批]
Emergency --> Skip[非核心消息丢弃]
Emergency --> NewTopic[新建临时 topic 分流]
style Detected fill:#fff3e0
style Emergency fill:#ffebee
2
3
4
5
6
7
8
9
10
11
12
13
14
15
预防比应对更重要:
- 监控消费 lag,预警阈值要低
- 消费者集群有自动扩容能力
- 关键 topic 单独限流隔离
# 07.常见陷阱与反例
# 7.1 误用 MQ 反例
反例:某团队用 Kafka 做"实时 RPC 调用"——A 发请求消息,等 B 处理完发响应消息回来。
问题:
- 延迟比 RPC 高 10 倍
- 实现复杂(要管理 correlation ID)
- 失去了同步调用的简洁性
教训:MQ 适合异步、解耦的场景,不适合替代同步 RPC。
# 7.2 顺序错位反例
反例:订单状态变更发到多个分区,消费时并发处理,结果"已支付"消息比"创建订单"消息先到。
正确:同一 orderId 进同一分区,消费端单线程。
# 7.3 消息风暴反例
反例:用户登录发一条 MQ → 触发积分服务 → 积分变更又发 MQ → 触发等级服务 → 等级变更又发 MQ → ……一次登录引发 10+ 条消息。
问题:消息风暴时整个系统瘫痪。
教训:
- MQ 不是"事件总线滥用器"
- 每条消息都要评估必要性
- 监控 MQ 总流量
mindmap
root((三大反例))
误用 MQ
当成 RPC 用
复杂度激增
性能下降
顺序错位
同 ID 进不同分区
并发消费乱序
业务状态异常
消息风暴
事件链式触发
一变多
系统瘫痪
2
3
4
5
6
7
8
9
10
11
12
13
14
# 08.演进路线
# 8.1 V1 进程内队列
特征:单服务、流量小。
做法:
- Java BlockingQueue / Disruptor
- 简单的异步任务
适用阶段:起步、单体应用
# 8.2 V2 引入 MQ
特征:服务拆分、需要解耦。
做法:
- 选一个 MQ(推荐 RocketMQ / Kafka)
- 标准的生产消费模式
- 基本的监控和死信队列
适用阶段:中型系统
# 8.3 V3 多 MQ 体系
特征:大型公司、多业务线、多场景。
做法:
- 分场景用不同 MQ(业务用 RocketMQ、日志用 Kafka)
- 统一的 MQ 管理平台
- Topic 治理、容量管控
- 全链路追踪
适用阶段:大型互联网公司
flowchart LR
V1[V1 进程内队列<br/>单体] --> V2[V2 引入 MQ<br/>分布式]
V2 --> V3[V3 多 MQ 体系<br/>大型组织]
style V1 fill:#e3f2fd
style V2 fill:#e8f5e8
style V3 fill:#fff3e0
2
3
4
5
6
7
# 09.总结与决策
# 9.1 MQ 上线检查表
新引入 MQ 或新增 Topic 前对照:
- [ ] 已确认场景适合 MQ(异步、解耦、削峰)
- [ ] 选定了 MQ 类型(Kafka / RocketMQ / RabbitMQ)
- [ ] Topic 命名遵循规范(业务域.对象.动作)
- [ ] 生产端有可靠投递机制(同步 / 异步带回调 / 事务)
- [ ] 消费端有幂等设计
- [ ] 顺序消费场景已用同分区策略
- [ ] 失败重试 + 死信队列已配置
- [ ] 监控告警就位(生产失败率、消费延迟、堆积量)
- [ ] 容量评估完成(峰值 × 持续时间)
- [ ] 关键消息有事务消息保护
- [ ] 死信队列有人值守
- [ ] 容灾预案已演练
# 9.2 选型决策树
flowchart TD
Start([我要用 MQ 吗?]) --> Q1{场景是什么?}
Q1 -->|实时 RPC 调用| NoMQ[不要用 MQ<br/>用 RPC]
Q1 -->|事件驱动 / 异步| Q2{流量级别?}
Q2 -->|单进程内| InQueue[进程内队列<br/>BlockingQueue]
Q2 -->|跨服务| Q3{什么类型业务?}
Q3 -->|大数据 / 日志| Kafka[Kafka<br/>生态最好]
Q3 -->|业务消息| Q4{需要事务消息?}
Q4 -->|是| RocketMQ[RocketMQ<br/>事务消息+延迟消息]
Q4 -->|否| Q5{流量大小?}
Q5 -->|大| Kafka2[Kafka]
Q5 -->|小且复杂路由| RabbitMQ[RabbitMQ<br/>AMQP 路由]
Q3 -->|云原生 / 多租户| Pulsar[Pulsar]
style NoMQ fill:#e3f2fd
style InQueue fill:#e8f5e8
style Kafka fill:#fff3e0
style RocketMQ fill:#ffebee
style RabbitMQ fill:#f3e5f5
style Pulsar fill:#fff3e0
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
最后一句话:MQ 是分布式系统最常用也最容易用错的组件——它的价值不在 MQ 本身,而在你怎么用。开篇那 134 笔退款消失,不是 MQ 错了,是用 MQ 的人少做了 5 道防线。
好的 MQ 设计 = 生产不丢、消费幂等、顺序可控、堆积可监、事务可保。