Skip to content

死信、延迟队列案例:订单支付超时

  1. 用户下单 → 消息进入 延迟队列
    • 订单创建后,系统发一条消息给延迟队列,设定 15 分钟的延迟。
    • 这条消息“不急着”被消费,15 分钟后才“活过来”被消费者拿走。
  2. 15 分钟内用户没支付
    • 延迟队列中这条消息时间一到,被“激活”送到它对应的消费者。
  3. 延迟队列的 TTL 到期消息自动转到 死信队列
    • 在 RabbitMQ 里,延迟队列实际上就是一个带 TTL 的普通队列,消息过期后被转发到绑定的死信队列。
    • 这条“超时未支付”的消息就变成了死信消息,被丢到死信队列里。
  4. 死信队列消费者负责后续操作
    • 死信队列的消费者收到这条消息,执行“关闭订单”“释放库存”等补偿逻辑。
订单系统 → 延迟队列(消息带15分钟TTL) → 15分钟后消息过期 → 死信队列 → 补偿消费者执行关单释放库存

延迟队列一般是用“消息过期+死信转发”来模拟的

具体实现

RabbitMQ 队列配置(延迟队列+死信队列)

go
package main

import (
    "fmt"
    "log"
    "time"

    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    // 死信交换机和队列
    deadLetterExchange := "order.dlx"
    deadLetterQueue := "order.deadletter.queue"
    deadLetterRoutingKey := "order.deadletter"

    err = ch.ExchangeDeclare(deadLetterExchange, "direct", true, false, false, false, nil)
    failOnError(err, "Failed to declare dead letter exchange")

    _, err = ch.QueueDeclare(deadLetterQueue, true, false, false, false, nil)
    failOnError(err, "Failed to declare dead letter queue")

    err = ch.QueueBind(deadLetterQueue, deadLetterRoutingKey, deadLetterExchange, false, nil)
    failOnError(err, "Failed to bind dead letter queue")

    // 延迟队列,带死信配置和消息TTL(15分钟)
    delayQueue := "order.delay.queue"
    args := amqp.Table{
        "x-message-ttl":             int32(15 * 60 * 1000), // 15分钟过期(ms)
        "x-dead-letter-exchange":    deadLetterExchange,
        "x-dead-letter-routing-key": deadLetterRoutingKey,
    }

    _, err = ch.QueueDeclare(delayQueue, true, false, false, false, args)
    failOnError(err, "Failed to declare delay queue")

    fmt.Println("Queues and exchanges declared successfully")
}

发送延迟消息&死信队列消费者

go
// 发送延迟消息(模拟订单生成)
func publishDelayMessage(ch *amqp.Channel, orderID string) {
    body := orderID
    err := ch.Publish(
        "",            // exchange 为空,默认队列发送
        "order.delay.queue", // routing key 指向延迟队列
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish delay message")
    fmt.Println("Published delay message for order:", orderID)
}

// 死信队列消费者(收到消息后执行关单逻辑)
func consumeDeadLetter(ch *amqp.Channel) {
    msgs, err := ch.Consume(
        "order.deadletter.queue",
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            orderID := string(d.Body)
            fmt.Println("Received dead letter message for order:", orderID)

            // 模拟查订单状态(假设用 Redis/DB查询)
            paid := checkIfPaid(orderID)
            if paid {
                fmt.Println("Order already paid, skip closing:", orderID)
                continue
            }

            // 执行关单逻辑
            fmt.Println("Closing unpaid order:", orderID)
            closeOrder(orderID)
        }
    }()

    fmt.Println("Waiting for dead letter messages...")
    <-forever
}

func checkIfPaid(orderID string) bool {
    // 模拟检查订单是否支付,实际要访问数据库/缓存
    // 这里随便写个示例,假设所有订单ID为偶数的都已支付
    return orderID[len(orderID)-1]%2 == 0
}

func closeOrder(orderID string) {
    // 这里写真正的关单业务逻辑,比如修改数据库订单状态、释放库存等
    fmt.Println("Order closed:", orderID)
}