Druid 0.17 入门(3)—— 数据接入指南
独孤风 人气:0

在快速开始中,我们演示了接入本地示例数据方式,但Druid其实支持非常丰富的数据接入方式。比如批处理数据的接入和实时流数据的接入。本文我们将介绍这几种数据接入方式。
- **文件数据接入**:从文件中加载批处理数据
- **从Kafka中接入流数据**:从Kafka中加载流数据
- **Hadoop数据接入**:从Hadoop中加载批处理数据
- **编写自己的数据接入规范**:自定义新的接入规范
本文主要介绍前两种最常用的数据接入方式。
## 1、Loading a file——加载文件
Druid提供以下几种方式加载数据:
- 通过页面数据加载器
- 通过控制台
- 通过命令行
- 通过Curl命令调用
### 1.1、数据加载器
Druid提供了一个示例数据文件,其中包含2015年9月12日发生的Wiki的示例数据。
此样本数据位于`quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz`
示例数据大概是这样:
```
{
"timestamp":"2015-09-12T20:03:45.018Z",
"channel":"#en.wikipedia",
"namespace":"Main",
"page":"Spider-Man's powers and equipment",
"user":"foobar",
"comment":"/* Artificial web-shooters */",
"cityName":"New York",
"regionName":"New York",
"regionIsoCode":"NY",
"countryName":"United States",
"countryIsoCode":"US",
"isAnonymous":false,
"isNew":false,
"isMinor":false,
"isRobot":false,
"isUnpatrolled":false,
"added":99,
"delta":99,
"deleted":0,
}
```
Druid加载数据分为以下几种:
- 加载文件
- 从kafka中加载数据
- 从hadoop中加载数据
- 自定义加载方式
我们这样演示一下加载示例文件数据
##### 1.1.1、进入localhost:8888 点击load data

##### 1.1.2、选择local disk

##### 1.1.3、选择Connect data

##### 1.1.4、预览数据
Base directory输入quickstart/tutorial/
File filter输入 wikiticker-2015-09-12-sampled.json.gz
然后点击apply预览 就可以看见数据了 点击Next:parse data解析数据

##### 1.1.5、解析数据
可以看到json数据已经被解析了 继续解析时间

##### 1.1.6、解析时间
解析时间成功 之后两步是transform和filter 这里不做演示了 直接next

##### 1.1.7、确认Schema
这一步会让我们确认Schema 可以做一些修改
由于数据量较小 我们直接关掉Rollup 直接下一步

##### 1.1.8、设置分段
这里可以设置数据分段 我们选择hour next

##### 1.1.9、确认发布


##### 1.1.10、发布成功 开始解析数据

等待任务成功

##### 1.1.11、查看数据
选择datasources 可以看到我们加载的数据
可以看到数据源名称 Fully是完全可用 还有大小等各种信息

##### 1.1.12、查询数据
点击query按钮
我们可以写sql查询数据了 还可以将数据下载

### 1.2 控制台
在任务视图中,单击Submit JSON task

这将打开规格提交对话框,粘贴规范
```
{
"type" : "index_parallel",
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"dimensionsSpec" : {
"dimensions" : [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
},
"timestampSpec": {
"column": "time",
"format": "iso"
},
"metricsSpec" : [],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "day",
"queryGranularity" : "none",
"intervals" : ["2015-09-12/2015-09-13"],
"rollup" : false
}
},
"ioConfig" : {
"type" : "index_parallel",
"inputSource" : {
"type" : "local",
"baseDir" : "quickstart/tutorial/",
"filter" : "wikiticker-2015-09-12-sampled.json.gz"
},
"inputFormat" : {
"type": "json"
},
"appendToExisting" : false
},
"tuningConfig" : {
"type" : "index_parallel",
"maxRowsPerSegment" : 5000000,
"maxRowsInMemory" : 25000
}
}
}
```

查看加载任务即可。
### 1.3 命令行
为了方便起见,Druid提供了一个加载数据的脚本
```
bin/post-index-task
```
我们可以运行命令
```
bin/post-index-task --file quickstart/tutorial/wikipedia-index.json --url http://localhost:8081
```
看到如下输出:
```
Beginning indexing data for wikipedia
Task started: index_wikipedia_2018-07-27T06:37:44.323Z
Task log: http://localhost:8081https://img.qb5200.com/download-x/druid/indexer/v1/task/index_wikipedia_2018-07-27T06:37:44.323Z/log
Task status: http://localhost:8081https://img.qb5200.com/download-x/druid/indexer/v1/task/index_wikipedia_2018-07-27T06:37:44.323Z/status
Task index_wikipedia_2018-07-27T06:37:44.323Z still running...
Task index_wikipedia_2018-07-27T06:37:44.323Z still running...
Task finished with status: SUCCESS
Completed indexing data for wikipedia. Now loading indexed data onto the cluster...
wikipedia loading complete! You may now query your data
```
查看加载任务即可。
### 1.4 CURL
我们可以通过直接调用CURL来加载数据
```
curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/tutorial/wikipedia-index.json http://localhost:8081https://img.qb5200.com/download-x/druid/indexer/v1/task
```
提交成功
```
{"task":"index_wikipedia_2018-06-09T21:30:32.802Z"}
```
## 2、Load from Apache Kafka——从Apache Kafka加载流数据
Apache Kafka是一个高性能的消息系统,由Scala 写成。是由Apache 软件基金会开发的一个开源消息系统项目。
Kafka 最初是由LinkedIn 开发,并于2011 年初开源。2012 年10 月从Apache Incubator 毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待(低延时)的平台。
更多kafka相关请查看[Kafka入门宝典(详细截图版)](https://mp.weixin.qq.com/s/oFEv5c5zO7NAMA3YYB3CrQ)
### 2.1 安装kafka
我们安装一个最新的kafka
```
curl -O https://archive.apache.orghttps://img.qb5200.com/download-x/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
tar -xzf kafka_2.12-2.1.0.tgz
cd kafka_2.12-2.1.0
```
启动kafka
```
./bin/kafka-server-start.sh config/server.properties
```
创建一个topic
```
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia
```
### 2.2 将数据写入Kafka
向kafka的topic为wikipedia写入数据
```
cd quickstart/tutorial
gunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json
```
在kafka目录中运行命令 {PATH_TO_DRUID}替换为druid目录
```
export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json
```
### 2.3 加载kafka数据到Druid
druid加载kafka的数据也有多种方式
- 数据加载器
- 控制台
- CURL
#### 2.3.1 数据加载器
##### 2.3.1.1 进入localhost:8888 点击load data
选择`Apache Kafka`并单击`Connect data`

##### 2.3.1.2 输入kafka服务器`localhost:9092`
##### 输入topic wikipedia 可以预览数据 然后下一步

##### 2.3.1.3 解析数据

2.3.1.4 解析时间戳 设置转换 设置过滤



##### 2.3.1.4 这步比较重要 确定统计的范围


##### 2.3.1.5 发布

##### 2.3.1.6 等待任务完成


##### 2.3.1.7 去查询页面查看

#### 2.3.2 控制台
在任务视图中,单击`Submit JSON supervisor`以打开对话框。

粘贴进去如下指令
```
{
"type": "kafka",
"spec" : {
"dataSchema": {
"dataSource": "wikipedia",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
},
"metricsSpec" : [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"rollup": false
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": false
},
"ioConfig": {
"topic": "wikipedia",
"inputFormat": {
"type": "json"
},
"replicas": 2,
"taskDuration": "PT10M",
"completionTimeout": "PT20M",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
}
}
}
}
```
#### 2.3.3 CURL
我们也可以通过直接调用CURL来加载kafka数据
```
curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081https://img.qb5200.com/download-x/druid/indexer/v1/supervisor
```
**静下心来,努力的提升自己,永远都没有错。更多实时计算相关博文,欢迎关注实时流式计算**

加载全部内容