RocketMQ生产者一个应用不能发送多个NameServer消息解决
梦想实现家_Z 人气:0前言
目前有两套RocketMQ集群,集群A包含topic
名称为cluster_A_topic
,集群B包含topic
名称为cluster_B_topic
,在应用服务OrderApp
上通过RocketMQ Client
创建两个DefaultMQProducer
实例发送消息给集群A和集群B
架构图如下:
根据上述架构图,我们给出的示例代码如下:
// 创建第一个DefaultMQProducer DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1"); // 设置nameServer地址 producer1.setNamesrvAddr("192.168.2.230:9876"); try { producer1.start(); // 发送消息 SendResult result1 = producer1.send(new Message("cluster_A_topic", "ping".getBytes(StandardCharsets.UTF_8))); switch (result1.getSendStatus()) { case SEND_OK: System.out.println("cluster_A_topic 发送成功!"); break; case FLUSH_DISK_TIMEOUT: System.out.println("cluster_A_topic 持久化失败!"); break; case FLUSH_SLAVE_TIMEOUT: System.out.println("cluster_A_topic 同步slave失败!"); break; case SLAVE_NOT_AVAILABLE: System.out.println("cluster_A_topic 副本不可用!"); } } catch (Exception e) { e.printStackTrace(); } // 创建第二个DefaultMQProducer DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_2"); // 设置nameServer地址 producer2.setNamesrvAddr("192.168.2.231:9876"); try { producer2.start(); // 发送消息 SendResult result2 = producer2.send(new Message("cluster_B_topic", "ping".getBytes(StandardCharsets.UTF_8))); switch (result2.getSendStatus()) { case SEND_OK: System.out.println("cluster_B_topic 发送成功!"); break; case FLUSH_DISK_TIMEOUT: System.out.println("cluster_B_topic 持久化失败!"); break; case FLUSH_SLAVE_TIMEOUT: System.out.println("cluster_B_topic 同步slave失败!"); break; case SLAVE_NOT_AVAILABLE: System.out.println("cluster_B_topic 副本不可用!"); } return "ok"; } catch (Exception e) { e.printStackTrace(); } finally { producer1.shutdown(); producer2.shutdown(); }
结果竟然报错了,报错内容时cluster_B_topic
不存在:
经过不断的测试,发现只有放在最前面启动的DefaultMQProducer
会生效,后面启动的DefaultMQProducer
发送消息就报错说对应的topic
不存在,而且报错的broker
竟然是前面启动的DefaultMQProducer
对应的broker
。这就不科学了,难道RocketMQ不允许在一个应用上创建多个生产者?
问题定位
首先说明一下,当前使用的RocketMQ Client
版本是4.8.0
。为了确定是哪儿出了问题,不得不对源码来一波探索[哭泣脸
加载全部内容