Appearance
RabbitMQ 必知必会
使用 Docker 启动 RabbitMQ
bash
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management- 后台管理界面:http://localhost:15672
- 账号密码(默认):guest / guest
RabbitMQ 的架构
生产者 -> 交换机 -> Binding(RoutingKey/BindingKey) -> 队列 -> 消费者
工作队列模式 Go 实现
工作队列模式(Work Queue)
生产者
go
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("创建频道失败: %v", err)
}
defer ch.Close()
// 声明一个持久化队列
_, err = ch.QueueDeclare(
"task_queue",
true, // durable
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("声明队列失败: %v", err)
}
body := "Hello Worker!" + " " + time.Now().String()
for i := 0; i < 10; i++ {
err = ch.Publish(
"", // default exchange
"task_queue", // routing key
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 持久化
ContentType: "text/plain",
Body: []byte(body),
},
)
if err != nil {
log.Fatalf("发送消息失败: %v", err)
}
log.Printf("发送消息: %s", body)
}
}消费者
go
package main
import (
"fmt"
"log"
"sync"
"time"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("连接失败: %v", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("创建频道失败: %v", err)
}
defer ch.Close()
// 设置 Qos,保证公平分发
ch.Qos(3, 0, false)
msgs, err := ch.Consume(
"task_queue",
"",
false, // 手动 ack
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("注册消费者失败: %v", err)
}
log.Println("等待消息...")
worker := 3 // 每个消费者的并发度
wg := sync.WaitGroup{}
wg.Add(worker)
for i := 0; i < worker; i++ {
go func(id int) {
defer wg.Done()
for msg := range msgs {
fmt.Printf("worker %d 收到:%s\n", id, msg.Body)
time.Sleep(5 * time.Second) // 模拟耗时任务
msg.Ack(false)
fmt.Printf("worker %d ack 完成\n", id)
}
}(i)
}
wg.Wait()
}Qos 和并发数的区别:Qos 控制“最多未确认消息数”,并发由消费者自己决定,不由 Qos 决定
交换机和工作模式
| 类型 | 路由方式 | 工作模式 | 特点 |
|---|---|---|---|
| 默认 Direct | routing key 是队列名 | 工作队列模式 | 一条消息只给一个消费者(轮询分发) |
| Direct | 精确匹配 routing key | 路由模式 | 用户自定义交换机,可绑定多个队列,routing key 可自定义 |
| Topic | 模糊匹配 routing key | 路由模式 | 支持 *(匹配一个单词)和 #(匹配多个单词) |
| Fanout | 广播,忽略 routing key | 发布订阅模式 | 同一条消息会发送到绑定在该交换机的所有队列 |
业务场景参考
默认 / Direct
- 订单支付队列:队列
order_pay_queue,RoutingKey = 队列名,消费者处理支付逻辑,确认支付成功后 Ack。 - 用户注册邮件:队列
user_email_queue,消费者拉消息后调用邮件服务发送邮件,处理完 Ack。 - 商品库存扣减:队列
inventory_queue,单消费者处理库存扣减,保证每条消息只被消费一次。 - 秒杀活动订单:队列
seckill_queue,高并发消费者抢购任务,Qos=1 保证公平分发。 - 图片/视频转码任务:队列
media_transcode_queue,消息体包含文件路径,消费者拉消息后执行转码,完成 Ack。
Direct(显式)
- 新订单 vs 取消订单分流:Direct Exchange
orders_direct,绑定order_new_queue(bindingKey=new)和order_cancel_queue(bindingKey=cancel),生产者发送 routingKey=new/cancel。 - 普通用户任务 vs VIP 用户任务:Exchange
user_task_direct,普通用户队列task_normal,VIP 用户队列task_vip,routingKey=normal/vip。 - 不同业务模块任务分流:Exchange
module_direct,绑定payment_queue、delivery_queue、notification_queue,routingKey 对应模块。 - 不同优先级队列:Exchange
priority_direct,绑定high_queue、low_queue,routingKey=high/low,实现任务优先级处理。 - 多租户队列:Exchange
tenant_direct,每个租户绑定自己的队列,routingKey=tenant_id,实现租户隔离。
Fanout
- 系统公告通知:Exchange
broadcast_fanout,绑定队列web_queue、app_queue、sms_queue,生产者发送消息,无 routingKey,消息同步到所有端。 - 直播间消息广播:每条弹幕消息 Fanout 到直播相关队列,所有观看者队列都能收到。
- 配置同步:配置中心修改配置后,Fanout Exchange 广播到多个微服务队列,每个服务消费者更新本地配置。
- 监控告警广播:告警消息发送到 Fanout Exchange,绑定告警邮件队列、短信队列、钉钉队列,多个通道同时收到。
- 分布式任务通知:一个任务状态变化广播到所有相关微服务队列,实现多系统同步。
Topic
- 日志系统:Exchange
logs_topic,队列payment_error_queue(bindingKey=payment.error),user_info_queue(bindingKey=user.info),生产者发送 routingKey=payment.error。 - 按地区分类消息:Exchange
order_topic,队列cn_queue(bindingKey=order.cn.#)、us_queue(bindingKey=order.us.#),消息 routingKey=order.cn.new。 - 事件中心:Exchange
event_topic,队列按事件类型绑定,如login_queue(login.*)、purchase_queue(purchase.*)。 - 微服务间按模块主题通信:Exchange
module_topic,队列绑定inventory_queue(inventory.#)、shipping_queue(shipping.#)。 - 消息订阅按内容标签:Exchange
news_topic,队列sports_queue(news.sports)、finance_queue(news.finance),生产者发送news.sports/news.finance。
Headers
- 广告投放:Exchange
ad_headers,消息 header{region=CN, vip=true},队列vip_cn_queue匹配该 header。 - 促销活动按用户标签:消息 header
{level=gold, interest=sports},队列gold_sports_queue接收。 - 复杂审批流:消息 header
{dept=finance, role=manager},队列finance_manager_queue接收。 - 订单路由按商户类型+等级:header
{merchant_type=premium, order_type=express},队列premium_express_queue接收。 - 多条件事件处理:header
{region=EU, vip=true, product=electronics},队列eu_vip_electronics_queue接收,消费者执行对应业务逻辑。
生产者封装
线程安全、重试机制和异常恢复
go
package main
import (
"fmt"
"log"
"time"
"github.com/streadway/amqp"
)
type MQProducer struct {
conn *amqp.Connection
chPool chan *amqp.Channel
poolSize int
maxRetries int
}
func NewMQProducer(url string, poolSize, maxRetries int) (*MQProducer, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}
pool := make(chan *amqp.Channel, poolSize)
for i := 0; i < poolSize; i++ {
ch, err := conn.Channel()
if err != nil {
return nil, err
}
pool <- ch
}
return &MQProducer{
conn: conn,
chPool: pool,
poolSize: poolSize,
maxRetries: maxRetries,
}, nil
}
// 发布消息
func (p *MQProducer) Publish(exchange, routingKey string, body []byte) error {
var err error
for i := 0; i <= p.maxRetries; i++ {
ch := <-p.chPool // 出池
// 尝试 publish
err = ch.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: body,
},
)
// 如果失败,尝试重建 channel
if err != nil {
log.Printf("[MQProducer] publish 失败,第 %d 次重试: %v, 尝试重建 channel", i+1, err)
ch.Close()
ch, _ = p.conn.Channel()
}
p.chPool <- ch // 放回池
if err == nil {
return nil
}
// Sleep 防止 busy-loop,占用 CPU,同时给 broker 和网络恢复时间
time.Sleep(time.Millisecond * 100)
}
return fmt.Errorf("消息发布失败,重试 %d 次仍失败: %v", p.maxRetries, err)
}
// 关闭 Producer
func (p *MQProducer) Close() {
close(p.chPool)
for ch := range p.chPool {
ch.Close()
}
p.conn.Close()
}其它
- 队列如何保证消息不丢?
- 消费者如何公平消费?
- 多租户数据隔离 vhost
- MQ 队列消息满了怎么办
- 如何监控 MQ 消息达 80%后发送钉钉告警:Shell 脚本+Cron
- RabbitMQ 的 Qos
- 幂等性设计:1.消息携带 msg_id 2.查 redis/mysql 任务状态