亲宝软件园·资讯

展开

Docker实战之Kafka集群

当我遇上你csy 人气:0
# 1. 概述 Apache Kafka 是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。其具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。 笔者之前在物联网公司工作,其中 Kafka 作为物联网 MQ 选型的事实标准,这里优先给大家搭建 Kafka 集群环境。由于 Kafka 的安装需要依赖 Zookeeper,对 Zookeeper 还不了解的小伙伴可以在 [这里](https://mp.weixin.qq.com/s/aNpn59gHD_WOhtZkceMwug) 先认识下 Zookeeper。 Kafka 能解决什么问题呢?先说一下消息队列常见的使用场景吧,其实场景有很多,但是比较核心的有 3 个:解耦、异步、削峰。 # 2. Kafka 基本概念 Kafka 部分名词解释如下: - Broker:消息中间件处理结点,一个 Kafka 节点就是一个 broker,多个 broker 可以组成一个 Kafka 集群。 - Topic:一类消息,例如 page view 日志、click 日志等都可以以 topic 的形式存在,Kafka 集群能够同时负责多个 topic 的分发。 - Partition:topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。 - Segment:partition 物理上由多个 segment 组成,下面有详细说明。 - offset:每个 partition 都由一系列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中。partition 中的每个消息都有一个连续的序列号叫做 offset,用于 partition 唯一标识一条消息.每个 partition 中的消息都由 offset=0 开始记录消息。 # 3. Docker 环境搭建 配合上一节的 Zookeeper 环境,计划搭建一个 3 节点的集群。宿主机 IP 为 `192.168.124.5`。 **docker-compose-kafka-cluster.yml** ```yaml version: '3.7' networks: docker_net: external: true services: kafka1: image: wurstmeister/kafka restart: unless-stopped container_name: kafka1 ports: - "9093:9092" external_links: - zoo1 - zoo2 - zoo3 environment: KAFKA_BROKER_ID: 1 KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主机IP KAFKA_ADVERTISED_PORT: 9093 ## 修改:宿主机映射port KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9093 ## 绑定发布订阅的端口。修改:宿主机IP KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181" volumes: - "./kafka/kafka1https://img.qb5200.com/download-x/docker.sock:/var/runhttps://img.qb5200.com/download-x/docker.sock" - "./kafka/kafka1https://img.qb5200.com/download-x/data/:/kafka" networks: - docker_net kafka2: image: wurstmeister/kafka restart: unless-stopped container_name: kafka2 ports: - "9094:9092" external_links: - zoo1 - zoo2 - zoo3 environment: KAFKA_BROKER_ID: 2 KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主机IP KAFKA_ADVERTISED_PORT: 9094 ## 修改:宿主机映射port KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9094 ## 修改:宿主机IP KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181" volumes: - "./kafka/kafka2https://img.qb5200.com/download-x/docker.sock:/var/runhttps://img.qb5200.com/download-x/docker.sock" - "./kafka/kafka2https://img.qb5200.com/download-x/data/:/kafka" networks: - docker_net kafka3: image: wurstmeister/kafka restart: unless-stopped container_name: kafka3 ports: - "9095:9092" external_links: - zoo1 - zoo2 - zoo3 environment: KAFKA_BROKER_ID: 3 KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主机IP KAFKA_ADVERTISED_PORT: 9095 ## 修改:宿主机映射port KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9095 ## 修改:宿主机IP KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181" volumes: - "./kafka/kafka3https://img.qb5200.com/download-x/docker.sock:/var/runhttps://img.qb5200.com/download-x/docker.sock" - "./kafka/kafka3https://img.qb5200.com/download-x/data/:/kafka" networks: - docker_net kafka-manager: image: sheepkiller/kafka-manager:latest restart: unless-stopped container_name: kafka-manager hostname: kafka-manager ports: - "9000:9000" links: # 连接本compose文件创建的container - kafka1 - kafka2 - kafka3 external_links: # 连接本compose文件以外的container - zoo1 - zoo2 - zoo3 environment: ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181 ## 修改:宿主机IP TZ: CST-8 networks: - docker_net ``` 执行以下命令启动 ```bash docker-compose -f docker-compose-kafka-cluster.yml up -d ``` 可以看到 kafka 集群已经启动成功。 # 4. Kafka 初认识 ## 4.1 可视化管理 细心的小伙伴发现上边的配置除了 kafka 外还有一个 kafka-manager 模块。它是 kafka 的可视化管理模块。因为 kafka 的元数据、配置信息由 Zookeeper 管理,这里我们在 UI 页面做下相关配置。 _1._ 访问 [http:localhost:9000](http:localhost:9000),按图示添加相关配置 ![](https://gitee.com/idea360/oss/raw/master/images/kafka-manage-config-cluster.png) _2._ 配置后我们可以看到默认有一个 topic(\_\_consumer_offsets),3 个 brokers。该 topic 分 50 个 partition,用于记录 kafka 的消费偏移量。 ![](https://gitee.com/idea360/oss/raw/master/images/kafka-cluster-default-topic.png) ## 4.2 Zookeeper 在 kafka 环境中做了什么 _1._ 首先观察下根目录 kafka 基于 zookeeper,kafka 启动会将元数据保存在 zookeeper 中。查看 zookeeper 节点目录,会发现多了很多和 kafka 相关的目录。结果如下: ```docker ➜ docker zkCli -server 127.0.0.1:2183 Connecting to 127.0.0.1:2183 Welcome to ZooKeeper! JLine support is enabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null [zk: 127.0.0.1:2183(CONNECTED) 0] ls / [cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, controller_epoch, zk-test0000000000, kafka-manager, consumers, latest_producer_id_block, config] ``` _2._ 查看我们映射的 kafka 目录,新版本的 kafka 偏移量不再存储在 zk 中,而是在 kafka 自己的环境中。 我们节选了部分目录(包含 2 个 partition) ```text ├── kafka1 │   ├── data │   │   └── kafka-logs-c4e2e9edc235 │   │   ├── __consumer_offsets-1 │   │   │   ├── 00000000000000000000.index // segment索引文件 │   │   │   ├── 00000000000000000000.log // 数据文件 │   │   │   ├── 00000000000000000000.timeindex // 消息时间戳索引文件 │   │   │   └── leader-epoch-checkpoint ... │   │   ├── __consumer_offsets-7 │   │   │   ├── 00000000000000000000.index │   │   │   ├── 00000000000000000000.log │   │   │   ├── 00000000000000000000.timeindex │   │   │   └── leader-epoch-checkpoint │   │   ├── cleaner-offset-checkpoint │   │   ├── log-start-offset-checkpoint │   │   ├── meta.properties │   │   ├── recovery-point-offset-checkpoint │   │   └── replication-offset-checkpoint │   └── docker.sock ``` 结果与 Kafka-Manage 显示结果一致。图示的文件是一个 Segment,00000000000000000000.log 表示 offset 从 0 开始,随着数据不断的增加,会有多个 Segment 文件。 # 5. 生产与消费 ## 5.1 创建主题 ```bash ➜ docker docker exec -it kafka1 /bin/bash # 进入容器 bash-4.4# cd /opt/kafka/ # 进入安装目录 bash-4.4# ./bin/kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 # 查看主题列表 __consumer_offsets bash-4.4# ./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 2 --partitions 3 --topic test # 新建主题 Created topic test. ``` > 说明: > --replication-factor 副本数; > --partitions 分区数; > replication<=broker(一定); > 有效消费者数<=partitions 分区数(一定); 新建主题后, 再次查看映射目录, 由图可见,partition 在 3 个 broker 上均匀分布。 ![](https://gitee.com/idea360/oss/raw/master/images/kafka-cluster-topic-test-partition-show.png) ## 5.2 生产消息 ```bash bash-4.4# ./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test >msg1 >msg2 >msg3 >msg4 >msg5 >msg6 ``` ## 5.3 消费消息 ```bash bash-4.4# ./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning msg1 msg3 msg2 msg4 msg6 msg5 ``` > --from-beginning 代表从头开始消费 ## 5.4 消费详情 _查看消费者组_ ```bash bash-4.4# ./bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --list KafkaManagerOffsetCache console-consumer-86137 ``` _消费组偏移量_ ```bash bash-4.4# ./bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --describe --group KafkaManagerOffsetCache ``` _查看 topic 详情_ ```bash bash-4.4# ./bin/kafka-topics.sh --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --describe --topic test Topic: test PartitionCount: 3 ReplicationFactor: 2 Configs: Topic: test Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1 Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3 ``` _查看.log 数据文件_ ```bash bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log --print-data-log Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log Starting offset: 0 baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1583317546421 size: 72 magic: 2 compresscodec: NONE crc: 1454276831 isvalid: true | offset: 0 CreateTime: 1583317546421 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg2 baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 72 CreateTime: 1583317550369 size: 72 magic: 2 compresscodec: NONE crc: 3578672322 isvalid: true | offset: 1 CreateTime: 1583317550369 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg4 baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 144 CreateTime: 1583317554831 size: 72 magic: 2 compresscodec: NONE crc: 2727139808 isvalid: true | offset: 2 CreateTime: 1583317554831 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg6 ``` > 这里需要看下自己的文件路径是什么,别直接 copy 我的哦 _查看.index 索引文件_ ```bash bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.index Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.index offset: 0 position: 0 ``` _查看.timeindex 索引文件_ ```bash bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex --verify-index-only Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex Found timestamp mismatch in :/kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex Index timestamp: 0, log timestamp: 1583317546421 ``` # 6. SpringBoot 集成 笔者 SpringBoot 版本是 `2.2.2.RELEASE` pom.xml 添加依赖 ```xml ``` 生产者配置 ```java @Configuration public class KafkaProducerConfig { /** * producer配置 * @return */ public Map

加载全部内容

相关教程
猜你喜欢
用户评论