Druid 0.17 入门(3)—— 数据接入指南
独孤风 人气:0
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085651906-571819466.jpg)
在快速开始中,我们演示了接入本地示例数据方式,但Druid其实支持非常丰富的数据接入方式。比如批处理数据的接入和实时流数据的接入。本文我们将介绍这几种数据接入方式。
- **文件数据接入**:从文件中加载批处理数据
- **从Kafka中接入流数据**:从Kafka中加载流数据
- **Hadoop数据接入**:从Hadoop中加载批处理数据
- **编写自己的数据接入规范**:自定义新的接入规范
本文主要介绍前两种最常用的数据接入方式。
## 1、Loading a file——加载文件
Druid提供以下几种方式加载数据:
- 通过页面数据加载器
- 通过控制台
- 通过命令行
- 通过Curl命令调用
### 1.1、数据加载器
Druid提供了一个示例数据文件,其中包含2015年9月12日发生的Wiki的示例数据。
此样本数据位于`quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz`
示例数据大概是这样:
```
{
"timestamp":"2015-09-12T20:03:45.018Z",
"channel":"#en.wikipedia",
"namespace":"Main",
"page":"Spider-Man's powers and equipment",
"user":"foobar",
"comment":"/* Artificial web-shooters */",
"cityName":"New York",
"regionName":"New York",
"regionIsoCode":"NY",
"countryName":"United States",
"countryIsoCode":"US",
"isAnonymous":false,
"isNew":false,
"isMinor":false,
"isRobot":false,
"isUnpatrolled":false,
"added":99,
"delta":99,
"deleted":0,
}
```
Druid加载数据分为以下几种:
- 加载文件
- 从kafka中加载数据
- 从hadoop中加载数据
- 自定义加载方式
我们这样演示一下加载示例文件数据
##### 1.1.1、进入localhost:8888 点击load data
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085652292-1477787964.jpg)
##### 1.1.2、选择local disk
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085652704-1762187705.jpg)
##### 1.1.3、选择Connect data
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085653061-634765944.jpg)
##### 1.1.4、预览数据
Base directory输入quickstart/tutorial/
File filter输入 wikiticker-2015-09-12-sampled.json.gz
然后点击apply预览 就可以看见数据了 点击Next:parse data解析数据
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085653652-590886946.jpg)
##### 1.1.5、解析数据
可以看到json数据已经被解析了 继续解析时间
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085654068-1185738456.jpg)
##### 1.1.6、解析时间
解析时间成功 之后两步是transform和filter 这里不做演示了 直接next
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085654460-1302068690.jpg)
##### 1.1.7、确认Schema
这一步会让我们确认Schema 可以做一些修改
由于数据量较小 我们直接关掉Rollup 直接下一步
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085654856-423400439.jpg)
##### 1.1.8、设置分段
这里可以设置数据分段 我们选择hour next
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085655207-1910689495.jpg)
##### 1.1.9、确认发布
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085655533-1019324676.jpg)
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085655856-446442045.jpg)
##### 1.1.10、发布成功 开始解析数据
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085656178-822464076.jpg)
等待任务成功
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085656509-1753060725.jpg)
##### 1.1.11、查看数据
选择datasources 可以看到我们加载的数据
可以看到数据源名称 Fully是完全可用 还有大小等各种信息
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085656848-1762524416.jpg)
##### 1.1.12、查询数据
点击query按钮
我们可以写sql查询数据了 还可以将数据下载
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085657184-897146130.jpg)
### 1.2 控制台
在任务视图中,单击Submit JSON task
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085657825-842617736.jpg)
这将打开规格提交对话框,粘贴规范
```
{
"type" : "index_parallel",
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"dimensionsSpec" : {
"dimensions" : [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
},
"timestampSpec": {
"column": "time",
"format": "iso"
},
"metricsSpec" : [],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "day",
"queryGranularity" : "none",
"intervals" : ["2015-09-12/2015-09-13"],
"rollup" : false
}
},
"ioConfig" : {
"type" : "index_parallel",
"inputSource" : {
"type" : "local",
"baseDir" : "quickstart/tutorial/",
"filter" : "wikiticker-2015-09-12-sampled.json.gz"
},
"inputFormat" : {
"type": "json"
},
"appendToExisting" : false
},
"tuningConfig" : {
"type" : "index_parallel",
"maxRowsPerSegment" : 5000000,
"maxRowsInMemory" : 25000
}
}
}
```
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085658126-438187606.jpg)
查看加载任务即可。
### 1.3 命令行
为了方便起见,Druid提供了一个加载数据的脚本
```
bin/post-index-task
```
我们可以运行命令
```
bin/post-index-task --file quickstart/tutorial/wikipedia-index.json --url http://localhost:8081
```
看到如下输出:
```
Beginning indexing data for wikipedia
Task started: index_wikipedia_2018-07-27T06:37:44.323Z
Task log: http://localhost:8081https://img.qb5200.com/download-x/druid/indexer/v1/task/index_wikipedia_2018-07-27T06:37:44.323Z/log
Task status: http://localhost:8081https://img.qb5200.com/download-x/druid/indexer/v1/task/index_wikipedia_2018-07-27T06:37:44.323Z/status
Task index_wikipedia_2018-07-27T06:37:44.323Z still running...
Task index_wikipedia_2018-07-27T06:37:44.323Z still running...
Task finished with status: SUCCESS
Completed indexing data for wikipedia. Now loading indexed data onto the cluster...
wikipedia loading complete! You may now query your data
```
查看加载任务即可。
### 1.4 CURL
我们可以通过直接调用CURL来加载数据
```
curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/tutorial/wikipedia-index.json http://localhost:8081https://img.qb5200.com/download-x/druid/indexer/v1/task
```
提交成功
```
{"task":"index_wikipedia_2018-06-09T21:30:32.802Z"}
```
## 2、Load from Apache Kafka——从Apache Kafka加载流数据
Apache Kafka是一个高性能的消息系统,由Scala 写成。是由Apache 软件基金会开发的一个开源消息系统项目。
Kafka 最初是由LinkedIn 开发,并于2011 年初开源。2012 年10 月从Apache Incubator 毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待(低延时)的平台。
更多kafka相关请查看[Kafka入门宝典(详细截图版)](https://mp.weixin.qq.com/s/oFEv5c5zO7NAMA3YYB3CrQ)
### 2.1 安装kafka
我们安装一个最新的kafka
```
curl -O https://archive.apache.orghttps://img.qb5200.com/download-x/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar -xzf kafka_2.12-2.1.0.tgz
cd kafka_2.12-2.1.0
```
启动kafka
```
./bin/kafka-server-start.sh config/server.properties
```
创建一个topic
```
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia
```
### 2.2 将数据写入Kafka
向kafka的topic为wikipedia写入数据
```
cd quickstart/tutorial
gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json
```
在kafka目录中运行命令 {PATH_TO_DRUID}替换为druid目录
```
export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json
```
### 2.3 加载kafka数据到Druid
druid加载kafka的数据也有多种方式
- 数据加载器
- 控制台
- CURL
#### 2.3.1 数据加载器
##### 2.3.1.1 进入localhost:8888 点击load data
选择`Apache Kafka`并单击`Connect data`
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085658508-13515230.jpg)
##### 2.3.1.2 输入kafka服务器`localhost:9092`
##### 输入topic wikipedia 可以预览数据 然后下一步
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085659086-1877246991.jpg)
##### 2.3.1.3 解析数据
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085659516-1969246996.jpg)
2.3.1.4 解析时间戳 设置转换 设置过滤
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085659953-1741556967.jpg)
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085700451-371940198.jpg)
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085700822-141714061.jpg)
##### 2.3.1.4 这步比较重要 确定统计的范围
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085701138-1897062580.jpg)
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085701530-1741799766.jpg)
##### 2.3.1.5 发布
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085702014-626016517.jpg)
##### 2.3.1.6 等待任务完成
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085702366-1043860547.jpg)
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085702760-1551772401.jpg)
##### 2.3.1.7 去查询页面查看
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085703163-391760406.jpg)
#### 2.3.2 控制台
在任务视图中,单击`Submit JSON supervisor`以打开对话框。
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085703477-940123858.jpg)
粘贴进去如下指令
```
{
"type": "kafka",
"spec" : {
"dataSchema": {
"dataSource": "wikipedia",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
},
"metricsSpec" : [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"rollup": false
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": false
},
"ioConfig": {
"topic": "wikipedia",
"inputFormat": {
"type": "json"
},
"replicas": 2,
"taskDuration": "PT10M",
"completionTimeout": "PT20M",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
}
}
}
}
```
#### 2.3.3 CURL
我们也可以通过直接调用CURL来加载kafka数据
```
curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081https://img.qb5200.com/download-x/druid/indexer/v1/supervisor
```
**静下心来,努力的提升自己,永远都没有错。更多实时计算相关博文,欢迎关注实时流式计算**
![file](https://img2020.cnblogs.com/other/1089984/202003/1089984-20200317085704153-227283886.jpg)
加载全部内容