Java Exchange模式
剑圣无痕 人气:1前言:
上一章讲解RabbitMq的相关知识和如何使用Spring Boot集成Rabbitmq发送消息,本文将讲解RabbitMQ三种Exchange模式。
简介
RabbitMQ的Exchange通常有三种模式分别为:Direct模式、Fanout模式、Topic模式。
Direct模式
Rabbit的Direct Exchange模式是指消息发送导RouteKey中指定的Queue,Direct模式可以使用Rabbit自带的Exchange,所以不需要将Exchange进行任何绑定。消息传递时,RouteKey必须完全匹配,消息才会被队列接收,否则消息将会被抛弃。
原理如下图:
示例代码:
可以参考上一章的示例代码,本文就不在重复阐述。
Fanout Exchange
Fanout Exchange 模式不处理路由键,只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。因此Fanout交换机转发消息是最快的。
原理如下图:
示例代码
配置类
@Configuration public class RabbmitFanoutConfig { private final static String FANOUT_QUEUE_1 = "fanoutQueue1"; //队列名称1 private final static String EXCHANGE_NAME = "fanoutExchange"; //交换器名称 @Bean public Queue queue1() { return new Queue(FANOUT_QUEUE_1); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } @Bean public Binding bindingExchange1() { return BindingBuilder.bind(queue1()).to(fanoutExchange()); } }
生产者
@Component public class FanOutMqProduce { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message) { rabbitTemplate.convertAndSend("fanoutExchange",null,message); } }
说明:convertAndSend 第一个参数为Exchange的名称,第二参数为routekey的名称,由于Fanout不需要通过Routekey来绑定所以参数为空。
消费者
@Component @RabbitListener(queues = "fanoutQueue1") public class FanOutMqConsumer { private static final Logger logger = LoggerFactory.getLogger(FanOutMqConsumer.class); @RabbitHandler public void receive(String message) { logger.info("receive message content:{}",message); } }
说明:监听队列的名称与配置的队列名称一致。
测试代码
@RequestMapping("/sendFanOutMessage") public String sendFanOutMessage() { fanOutMqProduce.sendMessage("this is fanout message"); return "success"; }
运行结果:
c.s.f.r.consumer.FanOutMqConsumer - receive message content:this is fanout message
Topic模式
Topic模式是通过Routing Key来进行通配符匹配。
- 符号#代表可以匹配一个或多个词
- 符号*代表只能匹配一个词
原理如下图:
说明:例如:usa.#能够匹配导usa.news.huofu,而europe.*只能匹配europe.news或者europe.weather
示例代码
配置类
@Configuration public class RabbmitTopicConfig { private final static String TOPIC_QUEUE_A = "topic.QueueA"; //队列名称A private final static String TOPIC_QUEUE_B = "topic.QueueB"; //队列名称B private final static String EXCHANGE_NAME = "topicExchange"; //交换器名称 @Bean public Queue queueA() { return new Queue(TOPIC_QUEUE_A); } @Bean public Queue queueB() { return new Queue(TOPIC_QUEUE_B); } @Bean public TopicExchange topicExchange() { return new TopicExchange(EXCHANGE_NAME); } /** * * @return */ @Bean public Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(topicExchange()).with("topic.QueueA"); } @Bean public Binding bindingExchange() { return BindingBuilder.bind(queueB()).to(topicExchange()).with("topic.#"); } }
消息发送者
@Component public class TopicMqProduce { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message,String routeKey) { rabbitTemplate.convertAndSend("topicExchange",routeKey,message); } }
消息接收者
#监听A队列 @Component @RabbitListener(queues = "topic.QueueA") public class TopicMqConsumerA { private static final Logger logger = LoggerFactory.getLogger(TopicMqConsumerA.class); @RabbitHandler public void receive(String message) { logger.info("receive topic QueueA message content:{}",message); } } #监听B队列 @Component @RabbitListener(queues = "topic.QueueB") public class TopicMqConsumerB { private static final Logger logger = LoggerFactory.getLogger(TopicMqConsumerB.class); @RabbitHandler public void receive(String message) { logger.info("receive topic QueueB message content:{}",message); } }
测试代码
#测试只发送A队列 @RequestMapping("/sendTopicMessageA") public String sendTopicMessageA() { topicMqProduce.sendMessage("topic.QueueA","this is fanout message"); return "success"; } @RequestMapping("/sendTopicMessageB") public String sendTopicMessageB() { topicMqProduce.sendMessage("topic.QueueB","this is fanout message"); return "success"; }
总结
加载全部内容