亲宝软件园·资讯

展开

GoLang RabbitMQ TTL与死信队列以及延迟队列详细讲解

Onemorelight95 人气:0

TTL

TTL 全称 Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,就会被自动清除。RabbitMQ可以设置两种过期时间:

如何设置

如果两者都设置了过期时间,以时间短的为准。

在streadway/amqp库提供的API中设置TTL

设置队列过期时间:

QueueDeclare函数的最后一个参数是一个amqp.Table类型,它的声明是这样的: type Table map[string]interface{},其实是一个可以用于设置队列属性的map。

// 设置Queue ttl为5s
args := amqp.Table{"x-message-ttl": 5000}
q, e := ch.QueueDeclare(
		name,  //队列名
		false, 
		true, 
		false, 
		false,
		args,   //设置Queue ttl为5s
	)

设置消息过期时间:

e = q.channel.Publish(
		"",    
		queue, 
		false, 
		false, 
		amqp.Publishing{
			// 设置当前发送消息的过期时间为3s
			Expiration: "3000",
			ReplyTo:    q.Name,
			Body:       []byte(str),
})

死信队列

当一个队列中存在死信时,RabbitMQ会把消息发送给DLX(死信交换机),进而被路由到另一个队列中,这个队列就叫做死信队列。

死信就是指没有被消费者消费成功的消息,一条消息变成死信有三种情况:

如何将死信发送给DLX

为队列设置参数即可,将要发送死信的队列配置以下两个参数:

x-dead-letter-exchange: [DLX的名字]
x-dead-letter-routing-key: [DLX的routing key]

下面是死信队列的工作流程:

延迟队列

延时队列就是用来存放需要在指定时间被处理的元素的队列,通常可以用来处理一些具有过期性操作的业务。

比如十分钟内未支付则取消订单,原先这个功能我们可以使用定时器来实现,即每隔一段时间去数据库对比未支付订单的当前时间与订单创建时间。但是定时器的时长难以确定,太长会导致订单失效时间出现误差,太短则会增大数据库压力。

实现

在RabbitMQ中没有提供延迟队列的功能,但是我们可以使用:TTL+死信队列组合的方式来实现延迟队列的效果。

下面是实现延迟队列的流程图:

Go实现延迟队列

创建一个死信交换机

再创建一个死信队列

将死信队列绑定至死信交换机

创建一个正常队列,并指定消息过期后被发往的死信交换机

生产者

func main() {
	conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/")
	ch, _ := conn.Channel()
	body := "This is a delayed message, created at " + time.Now().Format("2006-01-02 15:04:05")
	fmt.Println(body)
	// 发送消息到queue.normal队列中
	ch.Publish("", "queue.normal", false, false, amqp.Publishing{
		Body:       []byte(body),
		Expiration: "10000", // 设置TTL为10秒
	})
	defer conn.Close()
	defer ch.Close()
}

消费者

func main() {
	conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/")
	ch, _ := conn.Channel()
	//监听queue.dlx队列
	msgs, _ := ch.Consume(
		"queue.dlx",
		"",
		true,
		false,
		false,
		false,
		nil,
	)
	for d := range msgs {
		fmt.Printf("receive: %s\n", d.Body) // 收到消息,业务处理
	}
}

流程说明

生产者生产一条消息,然后指定消息的TTL为10s,接着将消息发给普通队列,消息在普通队列中过期后被发往死信交换机,死信交换机将这条消息路由给延迟队列。消费者一直在监听到延迟队列中的死信后,开始消费。

加载全部内容

相关教程
猜你喜欢
用户评论