Skip to content

RabbitMQ 必知必会

使用 Docker 启动 RabbitMQ

bash
docker run -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management

RabbitMQ 的架构

生产者 -> 交换机 -> Binding(RoutingKey/BindingKey) -> 队列 -> 消费者

工作队列模式 Go 实现

工作队列模式(Work Queue)

生产者
go
package main

import (
	"log"
	"time"

	"github.com/streadway/amqp"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatalf("连接失败: %v", err)
	}
	defer conn.Close()

	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("创建频道失败: %v", err)
	}
	defer ch.Close()

	// 声明一个持久化队列
	_, err = ch.QueueDeclare(
		"task_queue",
		true, // durable
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		log.Fatalf("声明队列失败: %v", err)
	}

	body := "Hello Worker!" + " " + time.Now().String()
	for i := 0; i < 10; i++ {
		err = ch.Publish(
			"",           // default exchange
			"task_queue", // routing key
			false,
			false,
			amqp.Publishing{
				DeliveryMode: amqp.Persistent, // 持久化
				ContentType:  "text/plain",
				Body:         []byte(body),
			},
		)

		if err != nil {
			log.Fatalf("发送消息失败: %v", err)
		}

		log.Printf("发送消息: %s", body)
	}
}
消费者
go
package main

import (
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/streadway/amqp"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatalf("连接失败: %v", err)
	}
	defer conn.Close()

	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("创建频道失败: %v", err)
	}
	defer ch.Close()

	// 设置 Qos,保证公平分发
	ch.Qos(3, 0, false)

	msgs, err := ch.Consume(
		"task_queue",
		"",
		false, // 手动 ack
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		log.Fatalf("注册消费者失败: %v", err)
	}

	log.Println("等待消息...")

	worker := 3 // 每个消费者的并发度

	wg := sync.WaitGroup{}
	wg.Add(worker)

	for i := 0; i < worker; i++ {
		go func(id int) {
			defer wg.Done()
			for msg := range msgs {
				fmt.Printf("worker %d 收到:%s\n", id, msg.Body)
				time.Sleep(5 * time.Second) // 模拟耗时任务
				msg.Ack(false)
				fmt.Printf("worker %d ack 完成\n", id)
			}
		}(i)
	}

	wg.Wait()
}

Qos 和并发数的区别:Qos 控制“最多未确认消息数”,并发由消费者自己决定,不由 Qos 决定

交换机和工作模式

类型路由方式工作模式特点
默认 Directrouting key 是队列名工作队列模式一条消息只给一个消费者(轮询分发)
Direct精确匹配 routing key路由模式用户自定义交换机,可绑定多个队列,routing key 可自定义
Topic模糊匹配 routing key路由模式支持 *(匹配一个单词)和 #(匹配多个单词)
Fanout广播,忽略 routing key发布订阅模式同一条消息会发送到绑定在该交换机的所有队列

业务场景参考

默认 / Direct

  1. 订单支付队列:队列 order_pay_queue,RoutingKey = 队列名,消费者处理支付逻辑,确认支付成功后 Ack。
  2. 用户注册邮件:队列 user_email_queue,消费者拉消息后调用邮件服务发送邮件,处理完 Ack。
  3. 商品库存扣减:队列 inventory_queue,单消费者处理库存扣减,保证每条消息只被消费一次。
  4. 秒杀活动订单:队列 seckill_queue,高并发消费者抢购任务,Qos=1 保证公平分发。
  5. 图片/视频转码任务:队列 media_transcode_queue,消息体包含文件路径,消费者拉消息后执行转码,完成 Ack。

Direct(显式)

  1. 新订单 vs 取消订单分流:Direct Exchange orders_direct,绑定 order_new_queue(bindingKey=new)和 order_cancel_queue(bindingKey=cancel),生产者发送 routingKey=new/cancel。
  2. 普通用户任务 vs VIP 用户任务:Exchange user_task_direct,普通用户队列 task_normal,VIP 用户队列 task_vip,routingKey=normal/vip。
  3. 不同业务模块任务分流:Exchange module_direct,绑定 payment_queuedelivery_queuenotification_queue,routingKey 对应模块。
  4. 不同优先级队列:Exchange priority_direct,绑定 high_queuelow_queue,routingKey=high/low,实现任务优先级处理。
  5. 多租户队列:Exchange tenant_direct,每个租户绑定自己的队列,routingKey=tenant_id,实现租户隔离。

Fanout

  1. 系统公告通知:Exchange broadcast_fanout,绑定队列 web_queueapp_queuesms_queue,生产者发送消息,无 routingKey,消息同步到所有端。
  2. 直播间消息广播:每条弹幕消息 Fanout 到直播相关队列,所有观看者队列都能收到。
  3. 配置同步:配置中心修改配置后,Fanout Exchange 广播到多个微服务队列,每个服务消费者更新本地配置。
  4. 监控告警广播:告警消息发送到 Fanout Exchange,绑定告警邮件队列、短信队列、钉钉队列,多个通道同时收到。
  5. 分布式任务通知:一个任务状态变化广播到所有相关微服务队列,实现多系统同步。

Topic

  1. 日志系统:Exchange logs_topic,队列 payment_error_queue(bindingKey=payment.error),user_info_queue(bindingKey=user.info),生产者发送 routingKey=payment.error
  2. 按地区分类消息:Exchange order_topic,队列 cn_queue(bindingKey=order.cn.#)、us_queue(bindingKey=order.us.#),消息 routingKey=order.cn.new
  3. 事件中心:Exchange event_topic,队列按事件类型绑定,如 login_queue(login.*)、purchase_queue(purchase.*)。
  4. 微服务间按模块主题通信:Exchange module_topic,队列绑定 inventory_queue(inventory.#)、shipping_queue(shipping.#)。
  5. 消息订阅按内容标签:Exchange news_topic,队列 sports_queue(news.sports)、finance_queue(news.finance),生产者发送 news.sports / news.finance

Headers

  1. 广告投放:Exchange ad_headers,消息 header {region=CN, vip=true},队列 vip_cn_queue匹配该 header。
  2. 促销活动按用户标签:消息 header {level=gold, interest=sports},队列 gold_sports_queue接收。
  3. 复杂审批流:消息 header {dept=finance, role=manager},队列 finance_manager_queue接收。
  4. 订单路由按商户类型+等级:header {merchant_type=premium, order_type=express},队列 premium_express_queue接收。
  5. 多条件事件处理:header {region=EU, vip=true, product=electronics},队列 eu_vip_electronics_queue接收,消费者执行对应业务逻辑。

生产者封装

线程安全、重试机制和异常恢复
go
package main

import (
    "fmt"
    "log"
    "time"

    "github.com/streadway/amqp"
)

type MQProducer struct {
    conn       *amqp.Connection
    chPool     chan *amqp.Channel
    poolSize   int
    maxRetries int
}

func NewMQProducer(url string, poolSize, maxRetries int) (*MQProducer, error) {
    conn, err := amqp.Dial(url)
    if err != nil {
        return nil, err
    }

    pool := make(chan *amqp.Channel, poolSize)
    for i := 0; i < poolSize; i++ {
        ch, err := conn.Channel()
        if err != nil {
            return nil, err
        }
        pool <- ch
    }

    return &MQProducer{
        conn:       conn,
        chPool:     pool,
        poolSize:   poolSize,
        maxRetries: maxRetries,
    }, nil
}

// 发布消息
func (p *MQProducer) Publish(exchange, routingKey string, body []byte) error {
    var err error
    for i := 0; i <= p.maxRetries; i++ {
        ch := <-p.chPool // 出池
        // 尝试 publish
        err = ch.Publish(
            exchange,
            routingKey,
            false,
            false,
            amqp.Publishing{
                DeliveryMode: amqp.Persistent,
                ContentType:  "text/plain",
                Body:         body,
            },
        )
        // 如果失败,尝试重建 channel
        if err != nil {
            log.Printf("[MQProducer] publish 失败,第 %d 次重试: %v, 尝试重建 channel", i+1, err)
            ch.Close()
            ch, _ = p.conn.Channel()
        }
        p.chPool <- ch // 放回池
        if err == nil {
            return nil
        }

        // Sleep 防止 busy-loop,占用 CPU,同时给 broker 和网络恢复时间
        time.Sleep(time.Millisecond * 100)
    }

    return fmt.Errorf("消息发布失败,重试 %d 次仍失败: %v", p.maxRetries, err)
}

// 关闭 Producer
func (p *MQProducer) Close() {
    close(p.chPool)
    for ch := range p.chPool {
        ch.Close()
    }
    p.conn.Close()
}

其它

  1. 队列如何保证消息不丢?
  2. 消费者如何公平消费?
  3. 多租户数据隔离 vhost
  4. MQ 队列消息满了怎么办
  5. 如何监控 MQ 消息达 80%后发送钉钉告警:Shell 脚本+Cron
  6. RabbitMQ 的 Qos
  7. 幂等性设计:1.消息携带 msg_id 2.查 redis/mysql 任务状态