spring 整合kafka spring 整合kafka监听消费的配置过程
蜗牛学编程 人气:0想了解spring 整合kafka监听消费的配置过程的相关内容吗,蜗牛学编程在本文为您仔细讲解spring 整合kafka的相关知识和一些Code实例,欢迎阅读和指正,我们先划重点:spring,整合kafka,spring,整合kafka监听消费,下面大家一起来学习吧。
前言
最近项目里有个需求,要消费kafka里的数据。之前也手动写过代码去消费kafka数据。但是转念一想。既然spring提供了消费kafka的方法。就没必要再去重复造轮子。于是尝试使用spring的API。
项目技术背景,使用springMVC,XML配置和注解相互使用。kafka的配置都是使用XML方式。
整合过程
1. 引入spring-kafka的依赖包
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency>
2. 在spring的xml文件里增加配置项,也可以单独创建一个spring-context-XX.xml文件。
<!-- consumer configuration 该配置项可以根据自己业务的实际需求做增加或删除--> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" /> <entry key="group.id" value="group" /> <entry key="enable.auto.commit" value="true" /> <entry key="auto.commit.interval.ms" value="3000" /> <entry key="session.timeout.ms" value="10000" /> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" /> </map> </constructor-arg> </bean> <!-- create factory 该类是spring jar包里提供,就这么配置--> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties" /> </constructor-arg> </bean> <!-- 自定义的消费类,需要实现spring的接口 --> <bean id="payPalConsumer" class="com.chao.service.consumer.PayPalConsumer" /> <!-- 该类也是jar包里提供的,注入的监听类是自己定义的,topic名称是配置文件引入的--> <bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties"> <constructor-arg name="topics" value="${kafka.paypal.topic.name}"/> <property name="messageListener" ref="payPalConsumer" /> </bean> <!-- 改类也是jar里提供的,把这个containerProperties和consumerfactory 注入 --> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory" /> <constructor-arg ref="containerProperties" /> </bean>
2. 自定义消费者类,消费者类依然可以使用注解。
/** * get msg from kafka */ @Component public class PayPalConsumer implements MessageListener<String, String> { private static Logger logger = LoggerFactory.getLogger(PayPalConsumer.class); @Autowired private XXService XXService; @Override public void onMessage(ConsumerRecord<String, String> authorizeRecord) { String value = authorizeRecord.value(); if (StringUtils.isEmpty(value)){ logger.warn("receive message from kafka is null"); return; } logger.info("receive message from kafka is {}",value); } }
使用这个步骤配置,一次性过。非常顺利。
加载全部内容