RocketMQ事务消息图文示例讲解
一个双子座的Java攻城狮 人气:0RocketMQ 也允许我们像mysql 一样发送具有事务特征的消息
MQ 的事务流程(本地代码正常执行)
MQ 的消息补偿过程(当本地代码执行失败时)
MQ 消息的三种状态
- 提交状态:允许进入队列,此消息与非事务消息无区别
- 回滚状态:不允许进入队列,此消息等同于未发送过
- 中间状态:完成了 half 消息的发送,未对 MQ 进行二次状态确认(未知状态)
注意:事务消息仅与生产者有关,与消费者无关
生产者代码(提交状态、回滚状态):
public class Producer { public static void main(String[] args) throws Exception{ //事务消息使用的生产者是TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("192.168.23.127:9876"); //添加本地事务对应的监听 producer.setTransactionListener(new TransactionListener() { //正常事务过程 @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { // 此处写本地事务处理业务 // 如果成功,消息改为提交,如果失败改为 回滚,如果是多线程处理状态未知,就提交为未知等待事务补偿过程 //事务提交状态 return LocalTransactionState.COMMIT_MESSAGE;// 类似于msql 的 commit //return LocalTransactionState.ROLLBACK_MESSAGE;回滚状态 } //事务补偿过程 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { return null; } }); producer.start(); Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8")); SendResult result = producer.sendMessageInTransaction(msg,null); System.out.println("返回结果:"+result); producer.shutdown(); } }
生产者(中间状态):
public class Producer { public static void main(String[] args) throws Exception{ //事务消息使用的生产者是TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setNamesrvAddr("192.168.23.127:9876"); //添加本地事务对应的监听 producer.setTransactionListener(new TransactionListener() { //正常事务过程 @Override public LocalTransactionState executeLocalTransaction(Message message, Object o) { return LocalTransactionState.UNKNOW; } //事务补偿过程 @Override public LocalTransactionState checkLocalTransaction(MessageExt messageExt) { System.out.println("事务补偿过程执行"); return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); Message msg = new Message("topic8",("事务消息:hello rocketmq ").getBytes("UTF-8")); SendResult result = producer.sendMessageInTransaction(msg,null); System.out.println("返回结果:"+result); //事务补偿过程必须保障服务器在运行过程中,否则将无法进行正常的事务补偿 //producer.shutdown(); } }
加载全部内容