RocketMQ生产消息与消费消息超详细讲解
一个双子座的Java攻城狮 人气:01 RocketMQ简介
RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)
2 MQ的常见产品
ActiveMQ:java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高
RabbitMQ :erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,
RocketMQ :java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强,扩展性强
kafka :scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多
3 环境搭建
创建maven工程
引入依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency>
4 单生产者单消费者模式
生产者:
//生产者,产生消息 public class Producer { public static void main(String[] args) throws Exception{ //1.创建一个发送消息的对象Producer DefaultMQProducer producer=new DefaultMQProducer("group1"); //2.设定发送的命名服务器地址 producer.setNamesrvAddr("192.168.23.127:9876"); //3启动发送的服务 producer.start(); //4.1创建要发送的消息对象,指定topic,指定内容body Message msg=new Message("topic1","hello rocketmq".getBytes("UTF-8")); //4.2发送消息 SendResult result = producer.send(msg); System.out.println("返回结果:"+result); //5.关闭连接 producer.shutdown(); } }
消费者:
//消费者,消费消息 public class Consumer { public static void main(String[] args) throws Exception{ //1.创建一个接收消息的对象Consumer DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1"); //2.设定接收的命名服务器地址 consumer.setNamesrvAddr("192.168.23.127:9876"); //3.设置接收消息对应的topic,对应的sub标签为任意* consumer.subscribe("topic1","*"); //4.开启监听,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { //System.out.println("收到消息:"+msg); System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者 } }); //5.启动接收消息的服务 consumer.start();// 开启多线程 监控消息,持续运行 System.out.println("接收消息服务已运行"); } }
测试:
5 单生产者多消费者模式
5.1默认模式(负载均衡)
生产者:
//生产者,产生消息 public class Producer { public static void main(String[] args) throws Exception{ //1.创建一个发送消息的对象Producer DefaultMQProducer producer=new DefaultMQProducer("group1"); //2.设定发送的命名服务器地址 producer.setNamesrvAddr("192.168.23.127:9876"); //3启动发送的服务 producer.start(); for (int i = 1; i <= 10; i++) { Message msg = new Message("topic1",("生产者2: hello rocketmq "+i).getBytes("UTF-8")); SendResult result = producer.send(msg); System.out.println("返回结果:"+result); } //5.关闭连接 producer.shutdown(); } }
消费者:
//消费者,消费消息 public class Consumer { public static void main(String[] args) throws Exception{ //1.创建一个接收消息的对象Consumer DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1"); //2.设定接收的命名服务器地址 consumer.setNamesrvAddr("192.168.23.127:9876"); //3.设置接收消息对应的topic,对应的sub标签为任意* consumer.subscribe("topic1","*"); //4.开启监听,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { //System.out.println("收到消息:"+msg); System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者 } }); //5.启动接收消息的服务 consumer.start();// 开启多线程 监控消息,持续运行 System.out.println("接收消息服务已运行"); } }
测试:
5.2广播模式
生产者的代码不变,消费者的代码改动如下:
//设置当前消费者的消费模式(默认模式:负载均衡) consumer.setMessageModel(MessageModel.CLUSTERING); //设置当前消费者的消费模式为广播模式:所有客户端接收的消息是一样的 consumer.setMessageModel(MessageModel.BROADCASTING);
具体消费者代码:
//消费者,消费消息 public class Consumer { public static void main(String[] args) throws Exception{ //1.创建一个接收消息的对象Consumer DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1"); //2.设定接收的命名服务器地址 consumer.setNamesrvAddr("192.168.23.127:9876"); //3.设置接收消息对应的topic,对应的sub标签为任意* consumer.subscribe("topic1","*"); //设置当前消费者的消费模式(默认模式:负载均衡) //consumer.setMessageModel(MessageModel.CLUSTERING); //设置当前消费者的消费模式为广播模式:所有客户端接收的消息是一样的 consumer.setMessageModel(MessageModel.BROADCASTING); //4.开启监听,用于接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { //System.out.println("收到消息:"+msg); System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者 } }); //5.启动接收消息的服务 consumer.start();// 开启多线程 监控消息,持续运行 System.out.println("接收消息服务已运行"); } }
测试:
广播模式的现象
如果 生产者先发送消息, 后启动消费者, 消息只能被消费一次
如果多个消费者先启动(广播模式),后发消息,才有广播的效果
结论: 必须先启动消费者再启动发送者才有广播的效果
6 多生产者多消费者模式
多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费
运行多个生产者,在启动消费者
测试:
加载全部内容