springboot整合RabbitMQ
笑霸final 人气:0TTL简介
TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。
下面就根据这个图片来验证代码
配置类代码
这里写一些配置,比如创建队列 交换机 和它们之间的绑定关系
- @Qualifier 注解与我们想要使用的特定 Spring bean 的名称一起进行装配,Spring 框架就能从多个相同类型并满足装配要求的 bean 中找到我们想要的,避免让Spring脑裂。我们需要做的是@Component或者@Bean注解中声明的value属性以确定名称
注意 包别导错了
package com.xbfinal.springbootrabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; /** * ttl队列 配置文件类 * */ @Configuration public class TtlQueueConfig { //普通交换机名称 public static final String X_EXCHANGE="X"; //死信交换机名称 public static final String Y_DEAD_LETTER_EXCHANGE="Y"; //普通队列名称 public static final String QUEUE_A="QA"; public static final String QUEUE_B="QB"; //死信队列名称 public static final String DEAD_LETTER_QUEUE_D="QD"; /** * 声明x交换机 * @return */ @Bean("xExchange")//别名和方法名取一样 public DirectExchange xExchange(){ return new DirectExchange(X_EXCHANGE); } /** * 声明y交换机 * @return */ @Bean("yExchange")//别名和方法名取一样 public DirectExchange yExchange(){ return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } //声明队列A @Bean("queueA") public Queue queueA(){ final HashMap<String, Object> arguments = new HashMap<>(); //设置死信交换机 arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","YD"); //设置TTL设置10秒过期 arguments.put("x-message-ttl",10000); return QueueBuilder.durable(QUEUE_A) .withArguments(arguments) .build(); } //声明队列B @Bean("queueB") public Queue queueB(){ HashMap<String, Object> arguments = new HashMap<>(); //设置死信交换机 arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE); //设置死信RoutingKey arguments.put("x-dead-letter-routing-key","YD"); //设置TTL设置40秒过期 arguments.put("x-message-ttl",40000); return QueueBuilder.durable(QUEUE_B) .withArguments(arguments) .build(); } @Bean("queueD") public Queue queueD(){ return QueueBuilder.durable(DEAD_LETTER_QUEUE_D) .build(); } /** * A队列绑定X交换机 * @param queueA * @return */ @Bean public Binding queueABindingX(@Qualifier("queueA")Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } /** * B队列绑定X交换机 * @param queueB * @param xExchange * @return */ @Bean public Binding queueBBindingX(@Qualifier("queueB")Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){ return BindingBuilder.bind(queueB).to(xExchange).with("XB"); } /** * D队列绑定死信y交换机 * @param queueD * @param yExchange * @return */ @Bean public Binding queueDBindingX(@Qualifier("queueD")Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){ return BindingBuilder.bind(queueD).to(yExchange).with("YD"); } }
生产者代码
我们用
Controller
写,通过网页提交的方式 生产消息
url:http://localhost:8080/ttl/sendMsg/message
package com.xbfinal.springbootrabbitmq.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; /** * 生产者 * 发送延迟消息 */ @Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message){ log.info("当前时间:{},发送了一条消息({})给两个队列", new Date().toString(),message); //发送消息 rabbitTemplate.convertAndSend("X","XA","10秒"+message); rabbitTemplate.convertAndSend("X","XB","40秒"+message); } }
消息消费者代码
注意@RabbitListener注解
@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。使用@RabbitListener可以设置一个自己明确默认值的RabbitListenerContainerFactory对象。
- @RabbitListener标注在方法上,直接监听指定的队列,此时接收的参数需要与发送市类型一致
- 3.@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。
package com.xbfinal.springbootrabbitmq.consumer; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; /** * 队列TTL的消费者 */ @Slf4j @Component public class DeadLetterQueueConsumer { //接收消息 @RabbitListener(queues = "QD") public void receivedD(Message message, Channel channel)throws Exception{ String msg=new String(message.getBody()); log.info("当前时间:{},收到消息:{}",new Date().toString(),msg); } }
验证代码
先在历览器输入http://localhost:8080/ttl/sendMsg/%E7%AC%91%E9%9C%B8fianl
查看控制台:
加载全部内容