Docker实战之Kafka集群
当我遇上你csy 人气:0
# 1. 概述
Apache Kafka 是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。其具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。
笔者之前在物联网公司工作,其中 Kafka 作为物联网 MQ 选型的事实标准,这里优先给大家搭建 Kafka 集群环境。由于 Kafka 的安装需要依赖 Zookeeper,对 Zookeeper 还不了解的小伙伴可以在 [这里](https://mp.weixin.qq.com/s/aNpn59gHD_WOhtZkceMwug) 先认识下 Zookeeper。
Kafka 能解决什么问题呢?先说一下消息队列常见的使用场景吧,其实场景有很多,但是比较核心的有 3 个:解耦、异步、削峰。
# 2. Kafka 基本概念
Kafka 部分名词解释如下:
- Broker:消息中间件处理结点,一个 Kafka 节点就是一个 broker,多个 broker 可以组成一个 Kafka 集群。
- Topic:一类消息,例如 page view 日志、click 日志等都可以以 topic 的形式存在,Kafka 集群能够同时负责多个 topic 的分发。
- Partition:topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
- Segment:partition 物理上由多个 segment 组成,下面有详细说明。
- offset:每个 partition 都由一系列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中。partition 中的每个消息都有一个连续的序列号叫做 offset,用于 partition 唯一标识一条消息.每个 partition 中的消息都由 offset=0 开始记录消息。
# 3. Docker 环境搭建
配合上一节的 Zookeeper 环境,计划搭建一个 3 节点的集群。宿主机 IP 为 `192.168.124.5`。
**docker-compose-kafka-cluster.yml**
```yaml
version: '3.7'
networks:
docker_net:
external: true
services:
kafka1:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka1
ports:
- "9093:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主机IP
KAFKA_ADVERTISED_PORT: 9093 ## 修改:宿主机映射port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9093 ## 绑定发布订阅的端口。修改:宿主机IP
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
volumes:
- "./kafka/kafka1https://img.qb5200.com/download-x/docker.sock:/var/runhttps://img.qb5200.com/download-x/docker.sock"
- "./kafka/kafka1https://img.qb5200.com/download-x/data/:/kafka"
networks:
- docker_net
kafka2:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka2
ports:
- "9094:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主机IP
KAFKA_ADVERTISED_PORT: 9094 ## 修改:宿主机映射port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9094 ## 修改:宿主机IP
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
volumes:
- "./kafka/kafka2https://img.qb5200.com/download-x/docker.sock:/var/runhttps://img.qb5200.com/download-x/docker.sock"
- "./kafka/kafka2https://img.qb5200.com/download-x/data/:/kafka"
networks:
- docker_net
kafka3:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka3
ports:
- "9095:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 3
KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主机IP
KAFKA_ADVERTISED_PORT: 9095 ## 修改:宿主机映射port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9095 ## 修改:宿主机IP
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
volumes:
- "./kafka/kafka3https://img.qb5200.com/download-x/docker.sock:/var/runhttps://img.qb5200.com/download-x/docker.sock"
- "./kafka/kafka3https://img.qb5200.com/download-x/data/:/kafka"
networks:
- docker_net
kafka-manager:
image: sheepkiller/kafka-manager:latest
restart: unless-stopped
container_name: kafka-manager
hostname: kafka-manager
ports:
- "9000:9000"
links: # 连接本compose文件创建的container
- kafka1
- kafka2
- kafka3
external_links: # 连接本compose文件以外的container
- zoo1
- zoo2
- zoo3
environment:
ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181 ## 修改:宿主机IP
TZ: CST-8
networks:
- docker_net
```
执行以下命令启动
```bash
docker-compose -f docker-compose-kafka-cluster.yml up -d
```
可以看到 kafka 集群已经启动成功。
# 4. Kafka 初认识
## 4.1 可视化管理
细心的小伙伴发现上边的配置除了 kafka 外还有一个 kafka-manager 模块。它是 kafka 的可视化管理模块。因为 kafka 的元数据、配置信息由 Zookeeper 管理,这里我们在 UI 页面做下相关配置。
_1._ 访问 [http:localhost:9000](http:localhost:9000),按图示添加相关配置
![](https://gitee.com/idea360/oss/raw/master/images/kafka-manage-config-cluster.png)
_2._ 配置后我们可以看到默认有一个 topic(\_\_consumer_offsets),3 个 brokers。该 topic 分 50 个 partition,用于记录 kafka 的消费偏移量。
![](https://gitee.com/idea360/oss/raw/master/images/kafka-cluster-default-topic.png)
## 4.2 Zookeeper 在 kafka 环境中做了什么
_1._ 首先观察下根目录
kafka 基于 zookeeper,kafka 启动会将元数据保存在 zookeeper 中。查看 zookeeper 节点目录,会发现多了很多和 kafka 相关的目录。结果如下:
```docker
➜ docker zkCli -server 127.0.0.1:2183
Connecting to 127.0.0.1:2183
Welcome to ZooKeeper!
JLine support is enabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: 127.0.0.1:2183(CONNECTED) 0] ls /
[cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, controller_epoch, zk-test0000000000, kafka-manager, consumers, latest_producer_id_block, config]
```
_2._ 查看我们映射的 kafka 目录,新版本的 kafka 偏移量不再存储在 zk 中,而是在 kafka 自己的环境中。
我们节选了部分目录(包含 2 个 partition)
```text
├── kafka1
│ ├── data
│ │ └── kafka-logs-c4e2e9edc235
│ │ ├── __consumer_offsets-1
│ │ │ ├── 00000000000000000000.index // segment索引文件
│ │ │ ├── 00000000000000000000.log // 数据文件
│ │ │ ├── 00000000000000000000.timeindex // 消息时间戳索引文件
│ │ │ └── leader-epoch-checkpoint
...
│ │ ├── __consumer_offsets-7
│ │ │ ├── 00000000000000000000.index
│ │ │ ├── 00000000000000000000.log
│ │ │ ├── 00000000000000000000.timeindex
│ │ │ └── leader-epoch-checkpoint
│ │ ├── cleaner-offset-checkpoint
│ │ ├── log-start-offset-checkpoint
│ │ ├── meta.properties
│ │ ├── recovery-point-offset-checkpoint
│ │ └── replication-offset-checkpoint
│ └── docker.sock
```
结果与 Kafka-Manage 显示结果一致。图示的文件是一个 Segment,00000000000000000000.log 表示 offset 从 0 开始,随着数据不断的增加,会有多个 Segment 文件。
# 5. 生产与消费
## 5.1 创建主题
```bash
➜ docker docker exec -it kafka1 /bin/bash # 进入容器
bash-4.4# cd /opt/kafka/ # 进入安装目录
bash-4.4# ./bin/kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 # 查看主题列表
__consumer_offsets
bash-4.4# ./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 2 --partitions 3 --topic test # 新建主题
Created topic test.
```
> 说明:
> --replication-factor 副本数;
> --partitions 分区数;
> replication<=broker(一定);
> 有效消费者数<=partitions 分区数(一定);
新建主题后, 再次查看映射目录, 由图可见,partition 在 3 个 broker 上均匀分布。
![](https://gitee.com/idea360/oss/raw/master/images/kafka-cluster-topic-test-partition-show.png)
## 5.2 生产消息
```bash
bash-4.4# ./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test
>msg1
>msg2
>msg3
>msg4
>msg5
>msg6
```
## 5.3 消费消息
```bash
bash-4.4# ./bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning
msg1
msg3
msg2
msg4
msg6
msg5
```
> --from-beginning 代表从头开始消费
## 5.4 消费详情
_查看消费者组_
```bash
bash-4.4# ./bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --list
KafkaManagerOffsetCache
console-consumer-86137
```
_消费组偏移量_
```bash
bash-4.4# ./bin/kafka-consumer-groups.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --describe --group KafkaManagerOffsetCache
```
_查看 topic 详情_
```bash
bash-4.4# ./bin/kafka-topics.sh --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --describe --topic test
Topic: test PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: test Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
```
_查看.log 数据文件_
```bash
bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log --print-data-log
Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1583317546421 size: 72 magic: 2 compresscodec: NONE crc: 1454276831 isvalid: true
| offset: 0 CreateTime: 1583317546421 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg2
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 72 CreateTime: 1583317550369 size: 72 magic: 2 compresscodec: NONE crc: 3578672322 isvalid: true
| offset: 1 CreateTime: 1583317550369 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg4
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 144 CreateTime: 1583317554831 size: 72 magic: 2 compresscodec: NONE crc: 2727139808 isvalid: true
| offset: 2 CreateTime: 1583317554831 keysize: -1 valuesize: 4 sequence: -1 headerKeys: [] payload: msg6
```
> 这里需要看下自己的文件路径是什么,别直接 copy 我的哦
_查看.index 索引文件_
```bash
bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.index
Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.index
offset: 0 position: 0
```
_查看.timeindex 索引文件_
```bash
bash-4.4# ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex --verify-index-only
Dumping /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex
Found timestamp mismatch in :/kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.timeindex
Index timestamp: 0, log timestamp: 1583317546421
```
# 6. SpringBoot 集成
笔者 SpringBoot 版本是 `2.2.2.RELEASE`
pom.xml 添加依赖
```xml
```
生产者配置
```java
@Configuration
public class KafkaProducerConfig {
/**
* producer配置
* @return
*/
public Map
加载全部内容