rabbitmq实现消息的收发 Java编程rabbitMQ实现消息的收发
sdyy321 人气:0java实现rAMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
本文不介绍amqp和rabbitmq相关知识,请自行网上查阅
本文是基于spring-rabbit中间件来实现消息的发送接受功能
see http://www.rabbitmq.com/tutorials/tutorial-one-java.html
see http://www.springsource.org/spring-amqp
Java编程通过操作rabbitMQ消息的收发实现代码如下:
<!-- for rabbitmq --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>com.caucho</groupId> <artifactId>hessian</artifactId> <version>4.0.7</version> </dependency> </dependencies>
首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象
public class EventMessage implements Serializable{ private String queueName; private String exchangeName; private byte[] eventData; public EventMessage(String queueName, String exchangeName, byte[] eventData) { this.queueName = queueName; this.exchangeName = exchangeName; this.eventData = eventData; } public EventMessage() { } public String getQueueName() { return queueName; } public String getExchangeName() { return exchangeName; } public byte[] getEventData() { return eventData; } @Override public String toString() { return "EopEventMessage [queueName=" + queueName + ", exchangeName=" + exchangeName + ", eventData=" + Arrays.toString(eventData) + "]"; } }
为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂
public interface CodecFactory { byte[] serialize(Object obj) throws IOException; Object deSerialize(byte[] in) throws IOException; }
下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式
public class HessionCodecFactory implements CodecFactory { private final Logger logger = Logger.getLogger(HessionCodecFactory.class); @Override public byte[] serialize(Object obj) throws IOException { ByteArrayOutputStream baos = null; HessianOutput output = null; try { baos = new ByteArrayOutputStream(1024); output = new HessianOutput(baos); output.startCall(); output.writeObject(obj); output.completeCall(); } catch (final IOException ex) { throw ex; } finally { if (output != null) { try { baos.close(); } catch (final IOException ex) { this.logger.error("Failed to close stream.", ex); } } } return baos != null ? baos.toByteArray() : null; } @Override public Object deSerialize(byte[] in) throws IOException { Object obj = null; ByteArrayInputStream bais = null; HessianInput input = null; try { bais = new ByteArrayInputStream(in); input = new HessianInput(bais); input.startReply(); obj = input.readObject(); input.completeReply(); } catch (final IOException ex) { throw ex; } catch (final Throwable e) { this.logger.error("Failed to decode object.", e); } finally { if (input != null) { try { bais.close(); } catch (final IOException ex) { this.logger.error("Failed to close stream.", ex); } } } return obj; } }
接下来就先实现发送功能,新增一个接口专门用来实现发送功能
public interface EventTemplate { void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException; void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException; }
SendRefuseException是自定义的发送失败异常类
下面是它的实现类,主要的任务就是将数据转换为EventMessage
public class DefaultEventTemplate implements EventTemplate { private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class); private AmqpTemplate eventAmqpTemplate; private CodecFactory defaultCodecFactory; // private DefaultEventController eec; // public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate, // CodecFactory defaultCodecFactory, DefaultEventController eec) { // this.eventAmqpTemplate = eopAmqpTemplate; // this.defaultCodecFactory = defaultCodecFactory; // this.eec = eec; // } public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) { this.eventAmqpTemplate = eopAmqpTemplate; this.defaultCodecFactory = defaultCodecFactory; } @Override public void send(String queueName, String exchangeName, Object eventContent) throws SendRefuseException { this.send(queueName, exchangeName, eventContent, defaultCodecFactory); } @Override public void send(String queueName, String exchangeName, Object eventContent, CodecFactory codecFactory) throws SendRefuseException { if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) { throw new SendRefuseException("queueName exchangeName can not be empty."); } // if (!eec.beBinded(exchangeName, queueName)) // eec.declareBinding(exchangeName, queueName); byte[] eventContentBytes = null; if (codecFactory == null) { if (eventContent == null) { logger.warn("Find eventContent is null,are you sure..."); } else { throw new SendRefuseException( "codecFactory must not be null ,unless eventContent is null"); } } else { try { eventContentBytes = codecFactory.serialize(eventContent); } catch (IOException e) { throw new SendRefuseException(e); } } // 构造成Message EventMessage msg = new EventMessage(queueName, exchangeName, eventContentBytes); try { eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg); } catch (AmqpException e) { logger.error("send event fail. Event Message : [" + eventContent + "]", e); throw new SendRefuseException("send event fail", e); } } }
注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
然后我们再实现接受消息
首先我们需要一个消费接口,所有的消费程序都实现这个类
public interface EventProcesser { public void process(Object e); }
为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器
/** * MessageListenerAdapter的Pojo * <p>消息处理适配器,主要功能:</p> * <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来</p> * <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p> * */ public class MessageAdapterHandler { private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class); private ConcurrentMap<String, EventProcessorWrap> epwMap; public MessageAdapterHandler() { this.epwMap = new ConcurrentHashMap<String, EventProcessorWrap>(); } public void handleMessage(EventMessage eem) { logger.debug("Receive an EventMessage: [" + eem + "]"); // 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值 if (eem == null) { logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled."); return; } if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) { logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled."); return; } // 解码,并交给对应的EventHandle执行 EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName()); if (eepw == null) { logger.warn("Receive an EopEventMessage, but no processor can do it."); return; } try { eepw.process(eem.getEventData()); } catch (IOException e) { logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e); return; } } protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) { if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) { throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. "); } EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor); EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw); if (oldProcessorWrap != null) { logger.warn("The processor of this queue and exchange exists, and the new one can't be add"); } } protected Set<String> getAllBinding() { Set<String> keySet = epwMap.keySet(); return keySet; } protected static class EventProcessorWrap { private CodecFactory codecFactory; private EventProcesser eep; protected EventProcessorWrap(CodecFactory codecFactory, EventProcesser eep) { this.codecFactory = codecFactory; this.eep = eep; } public void process(byte[] eventData) throws IOException{ Object obj = codecFactory.deSerialize(eventData); eep.process(obj); } } }
这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息
public class MessageErrorHandler implements ErrorHandler{ private static final Logger logger = Logger.getLogger(MessageErrorHandler.class); @Override public void handleError(Throwable t) { logger.error("RabbitMQ happen a error:" + t.getMessage(), t); } }
接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息
public class EventControlConfig { private final static int DEFAULT_PORT = 5672; private final static String DEFAULT_USERNAME = "guest"; private final static String DEFAULT_PASSWORD = "guest"; private final static int DEFAULT_PROCESS_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2; private static final int PREFETCH_SIZE = 1; private String serverHost ; private int port = DEFAULT_PORT; private String username = DEFAULT_USERNAME; private String password = DEFAULT_PASSWORD; private String virtualHost; /** * 和rabbitmq建立连接的超时时间 */ private int connectionTimeout = 0; /** * 事件消息处理线程数,默认是 CPU核数 * 2 */ private int eventMsgProcessNum; /** * 每次消费消息的预取值 */ private int prefetchSize; public EventControlConfig(String serverHost) { this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,new HessionCodecFactory()); } public EventControlConfig(String serverHost, int port, String username, String password, String virtualHost, int connectionTimeout, int eventMsgProcessNum,int prefetchSize,CodecFactory defaultCodecFactory) { this.serverHost = serverHost; this.port = port>0?port:DEFAULT_PORT; this.username = username; this.password = password; this.virtualHost = virtualHost; this.connectionTimeout = connectionTimeout>0?connectionTimeout:0; this.eventMsgProcessNum = eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM; this.prefetchSize = prefetchSize>0?prefetchSize:PREFETCH_SIZE; } public String getServerHost() { return serverHost; } public int getPort() { return port; } public String getUsername() { return username; } public String getPassword() { return password; } public String getVirtualHost() { return virtualHost; } public int getConnectionTimeout() { return connectionTimeout; } public int getEventMsgProcessNum() { return eventMsgProcessNum; } public int getPrefetchSize() { return prefetchSize; } }
具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信
public interface EventController { /** * 控制器启动方法 */ void start(); /** * 获取发送模版 */ EventTemplate getEopEventTemplate(); /** * 绑定消费程序到对应的exchange和queue */ EventController add(String queueName, String exchangeName, EventProcesser eventProcesser); /*in map, the key is queue name, but value is exchange name*/ EventController add(Map<String,String> bindings, EventProcesser eventProcesser); }
它的实现类如下:
/** * 和rabbitmq通信的控制器,主要负责: * <p>1、和rabbitmq建立连接</p> * <p>2、声明exChange和queue以及它们的绑定关系</p> * <p>3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上</p> * <p>4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存</p> * @author yangyong * */ public class DefaultEventController implements EventController { private CachingConnectionFactory rabbitConnectionFactory; private EventControlConfig config; private RabbitAdmin rabbitAdmin; private CodecFactory defaultCodecFactory = new HessionCodecFactory(); private SimpleMessageListenerContainer msgListenerContainer; // rabbitMQ msg listener container private MessageAdapterHandler msgAdapterHandler = new MessageAdapterHandler(); private MessageConverter serializerMessageConverter = new SerializerMessageConverter(); // 直接指定 //queue cache, key is exchangeName private Map<String, DirectExchange> exchanges = new HashMap<String,DirectExchange>(); //queue cache, key is queueName private Map<String, Queue> queues = new HashMap<String, Queue>(); //bind relation of queue to exchange cache, value is exchangeName | queueName private Set<String> binded = new HashSet<String>(); private EventTemplate eventTemplate; // 给App使用的Event发送客户端 private AtomicBoolean isStarted = new AtomicBoolean(false); private static DefaultEventController defaultEventController; public synchronized static DefaultEventController getInstance(EventControlConfig config){ if(defaultEventController==null){ defaultEventController = new DefaultEventController(config); } return defaultEventController; } private DefaultEventController(EventControlConfig config){ if (config == null) { throw new IllegalArgumentException("Config can not be null."); } this.config = config; initRabbitConnectionFactory(); // 初始化AmqpAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory); // 初始化RabbitTemplate RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); rabbitTemplate.setMessageConverter(serializerMessageConverter); eventTemplate = new DefaultEventTemplate(rabbitTemplate,defaultCodecFactory, this); } /** * 初始化rabbitmq连接 */ private void initRabbitConnectionFactory() { rabbitConnectionFactory = new CachingConnectionFactory(); rabbitConnectionFactory.setHost(config.getServerHost()); rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum()); rabbitConnectionFactory.setPort(config.getPort()); rabbitConnectionFactory.setUsername(config.getUsername()); rabbitConnectionFactory.setPassword(config.getPassword()); if (!StringUtils.isEmpty(config.getVirtualHost())) { rabbitConnectionFactory.setVirtualHost(config.getVirtualHost()); } } /** * 注销程序 */ public synchronized void destroy() throws Exception { if (!isStarted.get()) { return; } msgListenerContainer.stop(); eventTemplate = null; rabbitAdmin = null; rabbitConnectionFactory.destroy(); } @Override public void start() { if (isStarted.get()) { return; } Set<String> mapping = msgAdapterHandler.getAllBinding(); for (String relation : mapping) { String[] relaArr = relation.split("\\|"); declareBinding(relaArr[1], relaArr[0]); } initMsgListenerAdapter(); isStarted.set(true); } /** * 初始化消息监听器容器 */ private void initMsgListenerAdapter(){ MessageListener listener = new MessageListenerAdapter(msgAdapterHandler,serializerMessageConverter); msgListenerContainer = new SimpleMessageListenerContainer(); msgListenerContainer.setConnectionFactory(rabbitConnectionFactory); msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); msgListenerContainer.setMessageListener(listener); msgListenerContainer.setErrorHandler(new MessageErrorHandler()); msgListenerContainer.setPrefetchCount(config.getPrefetchSize()); // 设置每个消费者消息的预取值 msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum()); msgListenerContainer.setTxSize(config.getPrefetchSize());//设置有事务时处理的消息数 msgListenerContainer.setQueues(queues.values().toArray(new Queue[queues.size()])); msgListenerContainer.start(); } @Override public EventTemplate getEopEventTemplate() { return eventTemplate; } @Override public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser) { return add(queueName, exchangeName, eventProcesser, defaultCodecFactory); } public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser,CodecFactory codecFactory) { msgAdapterHandler.add(queueName, exchangeName, eventProcesser, defaultCodecFactory); if(isStarted.get()){ initMsgListenerAdapter(); } return this; } @Override public EventController add(Map<String, String> bindings, EventProcesser eventProcesser) { return add(bindings, eventProcesser,defaultCodecFactory); } public EventController add(Map<String, String> bindings, EventProcesser eventProcesser, CodecFactory codecFactory) { for(Map.Entry<String, String> item: bindings.entrySet()) msgAdapterHandler.add(item.getKey(),item.getValue(), eventProcesser,codecFactory); return this; } /** * exchange和queue是否已经绑定 */ protected boolean beBinded(String exchangeName, String queueName) { return binded.contains(exchangeName+"|"+queueName); } /** * 声明exchange和queue已经它们的绑定关系 */ protected synchronized void declareBinding(String exchangeName, String queueName) { String bindRelation = exchangeName+"|"+queueName; if (binded.contains(bindRelation)) return; boolean needBinding = false; DirectExchange directExchange = exchanges.get(exchangeName); if(directExchange == null) { directExchange = new DirectExchange(exchangeName, true, false, null); exchanges.put(exchangeName, directExchange); rabbitAdmin.declareExchange(directExchange);//声明exchange needBinding = true; } Queue queue = queues.get(queueName); if(queue == null) { queue = new Queue(queueName, true, false, false); queues.put(queueName, queue); rabbitAdmin.declareQueue(queue); //声明queue needBinding = true; } if(needBinding) { Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange rabbitAdmin.declareBinding(binding);//声明绑定关系 binded.add(bindRelation); } } }
搞定,现在可以将DefaultEventTemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个PO
@SuppressWarnings("serial") public class People implements Serializable{ private int id; private String name; private boolean male; private People spouse; private List<People> friends; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public boolean isMale() { return male; } public void setMale(boolean male) { this.male = male; } public People getSpouse() { return spouse; } public void setSpouse(People spouse) { this.spouse = spouse; } public List<People> getFriends() { return friends; } public void setFriends(List<People> friends) { this.friends = friends; } @Override public String toString() { // TODO Auto-generated method stub return "People[id="+id+",name="+name+",male="+male+"]"; } }
建立单元测试
public class RabbitMqTest{ private String defaultHost = "127.0.0.1"; private String defaultExchange = "EXCHANGE_DIRECT_TEST"; private String defaultQueue = "QUEUE_TEST"; private DefaultEventController controller; private EventTemplate eventTemplate; @Before public void init() throws IOException{ EventControlConfig config = new EventControlConfig(defaultHost); controller = DefaultEventController.getInstance(config); eventTemplate = controller.getEopEventTemplate(); controller.add(defaultQueue, defaultExchange, new ApiProcessEventProcessor()); controller.start(); } @Test public void sendString() throws SendRefuseException{ eventTemplate.send(defaultQueue, defaultExchange, "hello world"); } @Test public void sendObject() throws SendRefuseException{ eventTemplate.send(defaultQueue, defaultExchange, mockObj()); } @Test public void sendTemp() throws SendRefuseException, InterruptedException{ String tempExchange = "EXCHANGE_DIRECT_TEST_TEMP";//以前未声明的exchange String tempQueue = "QUEUE_TEST_TEMP";//以前未声明的queue eventTemplate.send(tempQueue, tempExchange, mockObj()); //发送成功后此时不会接受到消息,还需要绑定对应的消费程序 controller.add(tempQueue, tempExchange, new ApiProcessEventProcessor()); } @After public void end() throws InterruptedException{ Thread.sleep(2000); } private People mockObj(){ People jack = new People(); jack.setId(1); jack.setName("JACK"); jack.setMale(true); List<People> friends = new ArrayList<>(); friends.add(jack); People hanMeiMei = new People(); hanMeiMei.setId(1); hanMeiMei.setName("韩梅梅"); hanMeiMei.setMale(false); hanMeiMei.setFriends(friends); People liLei = new People(); liLei.setId(2); liLei.setName("李雷"); liLei.setMale(true); liLei.setFriends(friends); liLei.setSpouse(hanMeiMei); hanMeiMei.setSpouse(liLei); return hanMeiMei; } class ApiProcessEventProcessor implements EventProcesser{ @Override public void process(Object e) {//消费程序这里只是打印信息 Assert.assertNotNull(e); System.out.println(e); if(e instanceof People){ People people = (People)e; System.out.println(people.getSpouse()); System.out.println(people.getFriends()); } } } }
源码地址请点击这里
总结
以上就是本文关于java实现rabbitmq消息的发送接受的全部内容,希望对大家有所帮助。
感谢大家对本站的支持。
加载全部内容