spring-cloud-stream整合kafka spring-cloud-stream结合kafka使用详解
KyleYaoKeepGoing 人气:10想了解spring-cloud-stream结合kafka使用详解的相关内容吗,KyleYaoKeepGoing在本文为您仔细讲解spring-cloud-stream整合kafka的相关知识和一些Code实例,欢迎阅读和指正,我们先划重点:spring-cloud-stream整合kafka,spring-cloud-stream,下面大家一起来学习吧。
1.pom文件导入依赖
<!-- kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency>
2.application.yml文件配置
spring: cloud: stream: kafka: binder: brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中间件服务器地址 bindings: xxx_output: // 通道名称 destination: xxx // 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的 // 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json) xxx_input: destination: xxx // 消息发往的目的地,对应topic group: xxx // 对应kafka的group
3.创建消息发送者
@EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表示这是一个可绑定的发布通道 @Service public class MqService { @Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT) private MessageChannel oesWorkbenchChannel; /** * 发送一条kafka消息 */ public boolean sendLifeData(Object object) { return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT); } } // 发布通道 public interface Source { @Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT) MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道用MessageChannel }
4.创建消息监听者
@Slf4j @EnableBinding(Sink.class) public class WorkbenchStreamListener { @Resource private FileService fileService; @StreamListener(KafkaConstants.xxx_input) // 监听接受通道 public void receiveData(MoveMessage moveMessage) { } } // 接受通道 public interface Sink { @Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT) SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel }
接下来就可以愉快的发送监听消息了
加载全部内容