手把手带你了解消息中间件(3)——RocketMQ
一切亦如昨日 人气:2一、RocketMQ简介
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
二、RocketMQ架构
如图所示为RocketMQ基本的部署结构,主要分为NameServer集群、Broker集群、Producer集群和Consumer集群四个部分。
Broker在启动的时候会去向NameServer注册并且定时发送心跳,Producer在启动的时候会到NameServer上去拉取Topic所属的Broker具体地址,然后向具体的Broker发送消息
1、NameServer
NameServer的作用是Broker的注册中心。
每个NameServer节点互相之间是独立的,没有任何信息交互,也就不存在任何的选主或者主从切换之类的问题,因此NameServer是很轻量级的。单个NameServer节点中存储了活跃的Broker列表(包括master和slave),这里活跃的定义是与NameServer保持有心跳。
2、Topic、Tag、Queue、GroupName
Topic 与 Tag 都是业务上用来归类的标识,区分在于 Topic 是一级分类,而 Tag 可以理解为是二级分类
1) Topic(话题)
Topic是生产者在发送消息和消费者在拉取消息的类别。Topic与生产者和消费者之间的关系非常松散。一个生产者可以发送不同类型Topic的消息。消费者组可以订阅一个或多个主题,只要该组的实例保持其订阅一致即可。
我们可以理解为Topic是第一级消息类型,比如一个电商系统的消息可以分为:交易消息、物流消息等,一条消息必须有一个Topic。
2) Tag(标签)
意思就是子主题,为用户提供了额外的灵活性。有了标签,方便RocketMQ提供的查询功能。
可以理解为第二级消息类型,交易创建消息,交易完成消息..... 一条消息可以没有Tag
3) Queue(队列)
一个topic下,可以设置多个queue(消息队列),默认4个队列。当我们发送消息时,需要要指定该消息的topic。
RocketMQ会轮询该topic下的所有队列,将消息发送出去。
在 RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限。
也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标。
4) groupName(组名称)
RocketMQ中也有组的概念。代表具有相同角色的生产者组合或消费者组合,称为生产者组或消费者组。
作用是在集群HA的情况下,一个生产者down之后,本地事务回滚后,可以继续联系该组下的另外一个生产者实例,不至于导致业务走不下去。在消费者组中,可以实现消息消费的负载均衡和消息容错目标。
有了GroupName,在集群下,动态扩展容量很方便。只需要在新加的机器中,配置相同的GroupName。启动后,就立即能加入到所在的群组中,参与消息生产或消费。
3、Broker-存放消息
Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。底层的通信和连接都是基于Netty实现的。
负载均衡:Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上,会自动轮询当前所有可发送的broker ,尽量平均分布到所有队列中,最终效果就是所有消息都平均落在每个Broker上。
高可用:Broker中分master和slave两种角色,每个master可以对应多个slave,但一个slave只能对应一个master,master和slave通过指定相同的Brokername组成,其中不同的BrokerId==0 是master,非0是slave。
高可靠并发读写服务:master和slave之间的同步方式分为同步双写和异步复制,异步复制方式master和slave之间虽然会存在少量的延迟,但性能较同步双写方式要高出10%左右。
Topic、Broker、queue三者间的关系
4、Producer-生产消息
1) 与nameserver的关系
单个Producer和一台NameServer节点(随机选择)保持长连接,定时查询topic配置信息,如果该NameServer挂掉,生产者会自动连接下一个NameServer,直到有可用连接为止,并能自动重连。与NameServer之间没有心跳。
2) 与broker的关系
单个Producer和与其关联的所有broker保持长连接,并维持心跳。默认情况下消息发送采用轮询方式,会均匀发到对应Topic的所有queue中。
5、Consumer-消费消息
1) 与nameserver的关系
单个Consumer和一台NameServer保持长连接,定时查询topic配置信息,如果该NameServer挂掉,消费者会自动连接下一个NameServer,直到有可用连接为止,并能自动重连。与NameServer之间没有心跳。
2) 与broker的关系
单个Consumer和与其关联的所有broker保持长连接,并维持心跳,失去心跳后,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。
5.1 消费者类型
- 1) pull consume
Consumer 的一种,应用通常通过 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法,类似于activemq的方式 - 2) push consume
Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制
5.2 消费模式
- 1) 集群模式
在默认情况下,就是集群消费,此时消息发出去后将只有一个消费者能获取消息。
- 2) 广播模式
广播消费,一条消息被多个Consumer消费。消息会发给Consume Group中的所有消费者进行消费。
三、RocketMQ的特性
1、消息顺序
消息的顺序指的是消息消费时,能按照发送的顺序来消费。
RocketMQ是通过将“相同ID的消息发送到同一个队列,而一个队列的消息只由一个消费者处理“来实现顺序消息
2、消息重复
1) 消息重复的原因
消息领域有一个对消息投递的QoS(服务质量)定义,分为:最多一次(At most once)、至少一次(At least once)、仅一次( Exactly once)。
MQ产品都声称自己做到了At least once。既然是至少一次,就有可能发生消息重复。
有很多原因导致,比如:网络原因闪断,ACK返回失败等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者
不同的消息队列发送的确认信息形式不同:RocketMQ返回一个CONSUME_SUCCESS成功标志,RabbitMQ是发送一个ACK确认消息
2) 消息去重
- 1) 去重原则:使用业务端逻辑保持幂等性
幂等性:就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用,数据库的结果都是唯一的,不可变的。
- 2) 只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样,需要业务端来实现。
去重策略:保证每条消息都有唯一编号(比如唯一流水号),且保证消息处理成功与去重表的日志同时出现。
四、RocketMQ的应用场景
1、削峰填谷
比如如秒杀等大型活动时会带来较高的流量脉冲,如果没做相应的保护,将导致系统超负荷甚至崩溃。如果因限制太过导致请求大量失败而影响用户体验,可以利用MQ 超高性能的消息处理能力来解决。
2、异步解耦
通过上、下游业务系统的松耦合设计,比如:交易系统的下游子系统(如积分等)出现不可用甚至宕机,都不会影响到核心交易系统的正常运转。
3、顺序消息
FIFO原理类似,MQ提供的顺序消息即保证消息的先进先出,可以应用于交易系统中的订单创建、支付、退款等流程。
4、分布式事务消息
比如阿里的交易系统、支付红包等场景需要确保数据的最终一致性,需要引入 MQ 的分布式事务,既实现了系统之间的解耦,又可以保证最终的数据一致性。
五、RocketMQ集群部署方式
1、单Mater模式
优点:配置简单,方便部署
缺点:风险较大,一旦Broker重启或者宕机,会导致整个服务不可用
2、多Master模式
一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
优点:配置简单,单个Master宕机重启对应用没有影响。消息不会丢失
缺点:单台机器宕机期间,这台机器上没有被消费的消息在恢复之前不可订阅,消息实时性会受到影响。
3、多Master多Slave模式(异步)
每个Master配置一个Slave,采用异步复制方式,主备有短暂消息延迟
优点:因为Master 宕机后,消费者仍然可以从 Slave消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
缺点:Master宕机后,会丢失少量信息
4、多Master多Slave模式(同步)
每个Master配置一个Slave,采用同步双写方式,只有主和备都写成功,才返回成功
优点:数据与服务都无单点, Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能
六、RocketMQ的消息类型
消息发送步骤:
消息消费步骤:
创建一个maven工程,导入依赖
<dependencies>
<!--rocket-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
<https://img.qb5200.com/download-x/dependency>
<dependency>
<!--顺序消息中,模拟了一个消息集合,加入了lombok-->
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.22</version>
<https://img.qb5200.com/download-x/dependency>
<https://img.qb5200.com/download-x/dependencies>
1、普通消息
点击查看生产者代码
/**
* 普通消息生产者
*/
public class Producer {
public static void main(String[] args) throws Exception {
// 创建一个消息发送入口对象,主要用于消息发送,指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
// 设置NameServe地址,如果是集群环境,用分号隔开
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动并创建消息发送组件
producer.start();
// topic的名字
String topic = "rocketDemo1";
// 标签名
String taget = "tag";
// 要发送的数据
String body = "hello,RocketMq";
Message message = new Message(topic,taget,body.getBytes());
// 发送消息
SendResult result = producer.send(message);
System.out.println(result);
// 关闭消息发送对象
producer.shutdown();
}
}
点击查看消费者代码
/**
* 普通消息消费者
*/
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建一个消费管理对象,并创建消费者组名字
DefaultMQPushConsumer consumerGroup = new DefaultMQPushConsumer("ConsumerGroup");
// 设置NameServer地址,如果是集群环境,用逗号分隔
consumerGroup.setNamesrvAddr("127.0.0.1:9876");
// 设置要读取的消息主题和标签
consumerGroup.subscribe("rocketDemo1", "*");
// 设置回调函数,处理消息
//注意:MessageListenerConcurrently -- 并行消费监听
consumerGroup.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
//读取消息记录
for (MessageExt messageExt : msgs) {
//获取消息主题
String topic = messageExt.getTopic();
//获取消息标签
String tags = messageExt.getTags();
//获取消息体内容
String body = new String(messageExt.getBody(), "UTF-8");
System.out.println("topic:" + topic + ",tags:" + tags + ",body:" + body);
}
} catch (Exception e) {
e.printStackTrace();
}
//返回消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 运行消息消费对象
consumerGroup.start();
}
}
2、顺序消息
消息有序指的是可以按照消息的发送顺序来消费。RocketMQ是通过将“相同ID的消息发送到同一个队列,而一个队列的消息只由一个消费者处理“来实现顺序消息 。
如何保证顺序
- 1) 消息被发送时保持顺序:发送时保持顺序意味着对于有顺序要求的消息,用户应该在同一个线程中采用同步的方式发送。
- 2) 消息被存储时保持和发送的顺序一致:存储保持和发送的顺序一致则要求在同一线程中被发送出来的消息A和B,存储时在空间上A一定在B之前。
- 3) 消息被消费时保持和存储的顺序一致:消费保持和存储一致则要求消息A、B到达Consumer之后必须按照先A后B的顺序被处理。
点击查看模拟消息代码
/**
* 模拟消息
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private Long orderId;
private String desc;
public static List<Order> buildOrders(){
List<Order> list = new ArrayList<Order>();
Order order1001a = new Order(1001L,"创建");
Order order1004a = new Order(1004L,"创建");
Order order1006a = new Order(1006L,"创建");
Order order1009a = new Order(1009L,"创建");
list.add(order1001a);
list.add(order1004a);
list.add(order1006a);
list.add(order1009a);
Order order1001b = new Order(1001L,"付款");
Order order1004b = new Order(1004L,"付款");
Order order1006b = new Order(1006L,"付款");
Order order1009b = new Order(1009L,"付款");
list.add(order1001b);
list.add(order1004b);
list.add(order1006b);
list.add(order1009b);
Order order1001c = new Order(1001L,"完成");
Order order1006c = new Order(1006L,"完成");
list.add(order1001c);
list.add(order1006c);
return list;
}
}
点击查看生产者代码
/**
* Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的队列,
* 在RocketMQ中,通过MessageQueueSelector来实现分区的选择
*/
public class ProducerOrder {
//nameserver地址
private static String namesrvaddress="127.0.0.1:9876;";
public static void main(String[] args) throws Exception {
//创建DefaultMQProducer
DefaultMQProducer producer = new DefaultMQProducer("order_producer_name");
//设置namesrv地址
producer.setNamesrvAddr(namesrvaddress);
//启动Producer
producer.start();
List<Order> orderList = Order.buildOrders();
for (Order order : orderList) {
String body = order.toString();
//创建消息
Message message = new Message("orderTopic","order",body.getBytes());
//发送消息
SendResult sendResult = producer.send(
message,
new MessageQueueSelector() {
/**
*
* @param mqs topic中的队列集合
* @param msg 消息对象
* @param arg 业务参数
* @return
*/
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//参数是订单id号
Long orderId = (Long) arg;
//确定选择的队列的索引
long index = orderId % mqs.size();
return mqs.get((int) index);
}
},
order.getOrderId());
System.out.println("发送结果="+sendResult);
}
//关闭Producer
producer.shutdown();
}
}
点击查看消费者代码
/**
* 消费者端实现MessageListenerOrderly介口监听消息来实现顺序消息
*/
public class ConsumerOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
//从第一个开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("orderTopic","*");
//MessageListenerOrderly 顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("当前线程:"+Thread.currentThread().getName()+",接收消息:"+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer started.%n");
}
}
3、延迟消息
RocketMQ 支持定时(延迟)消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推。
延迟消息可以在生产者中直接设置,也可以在rocketmq的配置文件broker.conf中配置:messageDelayLevel=1s|5s|1m|2m|1h|2h......
点击查看生产者代码
/**
* 延迟消息 生产者
*/
public class ProducerDelay {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("delay_producer");
//设置nameserver
producer.setNamesrvAddr("127.0.0.1:9876");
//生产者开启
producer.start();
//创建消息对象
Message message = new Message("delayTopic","delay","hello world".getBytes());
//设置延迟时间级别
message.setDelayTimeLevel(2);
//发送消息
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
//生产者关闭
producer.shutdown();
}
}
点击查看消费者代码
/**
* 延迟消息 消费者
*/
public class ConsumerDelay {
public static void main(String[] args) throws Exception {
//创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
//设置nameserver
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置主题和tag
consumer.subscribe("delayTopic","*");
//注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消息ID:"+msg.getMsgId()+"发送时间:"+new Date(msg.getStoreTimestamp())+",延迟时间:"+(System.currentTimeMillis()-msg.getStoreTimestamp()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//开启消费者
consumer.start();
System.out.println("消费者启动");
}
}
4、批量发送消息
点击查看生产者代码
/**
* 批量 生产者
*/
public class ProducerBatch {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("delay_producer");
//设置nameserver
producer.setNamesrvAddr("127.0.0.1:9876");
//生产者开启
producer.start();
//创建消息对象 集合
String topic = "batchTopic";
String tag = "batch";
List<Message> messageList = new ArrayList<Message>();
Message message1 = new Message(topic,tag,"hello world1".getBytes());
Message message2 = new Message(topic,tag,"hello world2".getBytes());
Message message3 = new Message(topic,tag,"hello world3".getBytes());
messageList.add(message1);
messageList.add(message2);
messageList.add(message3);
//发送消息
SendResult sendResult = producer.send(messageList);
System.out.println(sendResult);
//生产者关闭
producer.shutdown();
}
}
点击查看消费者代码
/**
* 批量消费者
*/
public class ConsumerBatch {
public static void main(String[] args) throws Exception {
//创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
//设置nameserver
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置主题和tag
consumer.subscribe("batchTopic","*");
//注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消息ID:"+msg.getMsgId());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//开启消费者
consumer.start();
System.out.println("消费者启动");
}
}
5、广播消息
rocketmq默认采用的是集群消费,我们想要使用广播消费,只需在消费者中加入consumer.setMessageModel(MessageModel.BROADCASTING)
这段配置,MessageModel.CLUSTERING
为集群模式,是默认的;
点击查看生产者代码
/**
* 生产者
*/
public class ProducerBroadcast {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("delay_producer");
//设置nameserver
producer.setNamesrvAddr("127.0.0.1:9876");
//生产者开启
producer.start();
//创建消息对象 集合
String topic = "broadcastTopic";
String tag = "broad";
List<Message> messageList = new ArrayList<Message>();
Message message1 = new Message(topic,tag,"hello world1".getBytes());
Message message2 = new Message(topic,tag,"hello world2".getBytes());
Message message3 = new Message(topic,tag,"hello world3".getBytes());
messageList.add(message1);
messageList.add(message2);
messageList.add(message3);
//发送消息
SendResult sendResult = producer.send(messageList);
System.out.println(sendResult);
//生产者关闭
producer.shutdown();
}
}
点击查看消费者1代码
/**
* 消费者1
*/
public class ConsumerBroadcast1 {
public static void main(String[] args) throws Exception {
//创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
//设置nameserver
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置主题和tag
consumer.subscribe("broadcastTopic","*");
//设置消息模式 为 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费者1:消息ID:"+msg.getMsgId()+",内容"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//开启消费者
consumer.start();
System.out.println("消费者1启动");
}
}
点击查看消费者2代码
/**
* 消费者2
*/
public class ConsumerBroadcast2 {
public static void main(String[] args) throws Exception {
//创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
//设置nameserver
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置主题和tag
consumer.subscribe("broadcastTopic","*");
//设置消息模式 为 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//注册消息监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费者2:消息ID:"+msg.getMsgId()+",内容"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//开启消费者
consumer.start();
System.out.println("消费者2启动");
}
}
七、SpringBoot整合RocketMQ
创建一个maven工程,导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.2.1.RELEASE</version>
<https://img.qb5200.com/download-x/dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
<https://img.qb5200.com/download-x/dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.2.1.RELEASE</version>
<scope>test</scope>
<https://img.qb5200.com/download-x/dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.22</version>
<https://img.qb5200.com/download-x/dependency>
<https://img.qb5200.com/download-x/dependencies>
点击查看模拟消息代码
/**
* 模拟消息
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private Long orderId;
private String desc;
public static List<Order> buildOrders(){
List<Order> list = new ArrayList<Order>();
Order order1001a = new Order(1001L,"1001创建");
Order order1004a = new Order(1004L,"1004创建");
Order order1006a = new Order(1006L,"1006创建");
Order order1009a = new Order(1009L,"1009创建");
list.add(order1001a);
list.add(order1004a);
list.add(order1006a);
list.add(order1009a);
Order order1001b = new Order(1001L,"1001付款");
Order order1004b = new Order(1004L,"1004付款");
Order order1006b = new Order(1006L,"1006付款");
Order order1009b = new Order(1009L,"1009付款");
list.add(order1001b);
list.add(order1004b);
list.add(order1006b);
list.add(order1009b);
Order order1001c = new Order(1001L,"1001完成");
Order order1006c = new Order(1006L,"1006完成");
list.add(order1001c);
list.add(order1006c);
return list;
}
}
点击查看消息生产者代码
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketMQTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 普通消息生产者
*/
@Test
public void testSend(){
rocketMQTemplate.convertAndSend("testTopic","这是测试消息!");
}
/**
* 延迟消息生产者
*/
@Test
public void testDelaySend(){
SendResult sendResult = rocketMQTemplate.syncSend("testTopic",
new GenericMessage("这是延迟测试消息!"+new Date()),
10000,
4);
log.info("sendResult=="+sendResult);
}
/**
* 顺序消息 生产者
*/
@Test
public void testOrderlySend(){
List<Order> orderList = Order.buildOrders();
for (Order order : orderList) {
//发送消息
rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//参数是订单id号
Long orderId = Long.valueOf((String)arg);
//确定选择的队列的索引
long index = orderId % mqs.size();
log.info("mqs is ::" + mqs.get((int) index));
return mqs.get((int) index);
}
});
SendResult sendOrderly = rocketMQTemplate.syncSendOrderly("testTopicOrderLy",
new GenericMessage<String>(order.toString()), order.getOrderId().toString());
log.info("发送结果="+sendOrderly+",orderid :"+order.getOrderId());
}
}
}
点击查看普通|延迟消费者代码
/**
* 普通、延迟消息 消费者代码
*/
@Component
@RocketMQMessageListener(consumerGroup = "myConsumer", topic = "testTopic")
public class RocketConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
System.out.println("接收到消息:="+message);
}
}
点击查看顺序消费者代码
/**
* 顺序消息 ,消费者
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "myConsumerOrderly", topic = "testTopicOrderLy",consumeMode = ConsumeMode.ORDERLY)
public class RocketConsumerOrderly implements RocketMQListener<String> {
public void onMessage(String message) {
log.info("当前线程:"+Thread.currentThread().getName()+",接收到消息:="+message);
}
}
八、RocketMQ的安装配置
1、配置系统环境变量;计算机/属性/高级系统设置/环境变量/系统变量,新建系统变量ROCKETMQ_HOME=RocketMQ安装路径
2、进入RocketMQ安装目录的bin目录下,右键用记事本打开修改runserver.cmd文件
3、修改runbroker.cmd文件
4、cmd进入到MQ/bin目录下启动
1.启动mqnamesrv.cmd
start mqnamesrv.cmd
成功的弹窗,此框勿关闭。
2.启动mqbroker.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
成功的弹窗,此框勿关闭。
注意:假如弹出提示框提示‘错误: 找不到或无法加载主类 xxxxxx’。打开runbroker.cmd,然后将‘%CLASSPATH%’加上英文双引号。保存并重新执行start语句。
5、下载RocketMQ的可视化插件
1) 下载地址: https://github.com/apache/rocketmq-externals/releases
2) 修改rocketmq-console\src\main\resources\application.properties,修改如下:
3) cmd窗口执行:mvn clean package -Dmaven.test.skip=true
4) jar包运行:java -jar rocketmq-console-ng-1.0.0.jar
5) 测试输入地址: http://127.0.0.1:8080/#/ops
加载全部内容