Appearance
死信、延迟队列案例:订单支付超时
- 用户下单 → 消息进入 延迟队列
- 订单创建后,系统发一条消息给延迟队列,设定 15 分钟的延迟。
- 这条消息“不急着”被消费,15 分钟后才“活过来”被消费者拿走。
- 15 分钟内用户没支付
- 延迟队列中这条消息时间一到,被“激活”送到它对应的消费者。
- 延迟队列的 TTL 到期消息自动转到 死信队列
- 在 RabbitMQ 里,延迟队列实际上就是一个带 TTL 的普通队列,消息过期后被转发到绑定的死信队列。
- 这条“超时未支付”的消息就变成了死信消息,被丢到死信队列里。
- 死信队列消费者负责后续操作
- 死信队列的消费者收到这条消息,执行“关闭订单”“释放库存”等补偿逻辑。
订单系统 → 延迟队列(消息带15分钟TTL) → 15分钟后消息过期 → 死信队列 → 补偿消费者执行关单释放库存延迟队列一般是用“消息过期+死信转发”来模拟的
具体实现
RabbitMQ 队列配置(延迟队列+死信队列)
go
package main
import (
"fmt"
"log"
"time"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 死信交换机和队列
deadLetterExchange := "order.dlx"
deadLetterQueue := "order.deadletter.queue"
deadLetterRoutingKey := "order.deadletter"
err = ch.ExchangeDeclare(deadLetterExchange, "direct", true, false, false, false, nil)
failOnError(err, "Failed to declare dead letter exchange")
_, err = ch.QueueDeclare(deadLetterQueue, true, false, false, false, nil)
failOnError(err, "Failed to declare dead letter queue")
err = ch.QueueBind(deadLetterQueue, deadLetterRoutingKey, deadLetterExchange, false, nil)
failOnError(err, "Failed to bind dead letter queue")
// 延迟队列,带死信配置和消息TTL(15分钟)
delayQueue := "order.delay.queue"
args := amqp.Table{
"x-message-ttl": int32(15 * 60 * 1000), // 15分钟过期(ms)
"x-dead-letter-exchange": deadLetterExchange,
"x-dead-letter-routing-key": deadLetterRoutingKey,
}
_, err = ch.QueueDeclare(delayQueue, true, false, false, false, args)
failOnError(err, "Failed to declare delay queue")
fmt.Println("Queues and exchanges declared successfully")
}发送延迟消息&死信队列消费者
go
// 发送延迟消息(模拟订单生成)
func publishDelayMessage(ch *amqp.Channel, orderID string) {
body := orderID
err := ch.Publish(
"", // exchange 为空,默认队列发送
"order.delay.queue", // routing key 指向延迟队列
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish delay message")
fmt.Println("Published delay message for order:", orderID)
}
// 死信队列消费者(收到消息后执行关单逻辑)
func consumeDeadLetter(ch *amqp.Channel) {
msgs, err := ch.Consume(
"order.deadletter.queue",
"",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
orderID := string(d.Body)
fmt.Println("Received dead letter message for order:", orderID)
// 模拟查订单状态(假设用 Redis/DB查询)
paid := checkIfPaid(orderID)
if paid {
fmt.Println("Order already paid, skip closing:", orderID)
continue
}
// 执行关单逻辑
fmt.Println("Closing unpaid order:", orderID)
closeOrder(orderID)
}
}()
fmt.Println("Waiting for dead letter messages...")
<-forever
}
func checkIfPaid(orderID string) bool {
// 模拟检查订单是否支付,实际要访问数据库/缓存
// 这里随便写个示例,假设所有订单ID为偶数的都已支付
return orderID[len(orderID)-1]%2 == 0
}
func closeOrder(orderID string) {
// 这里写真正的关单业务逻辑,比如修改数据库订单状态、释放库存等
fmt.Println("Order closed:", orderID)
}