[源码分析] 从FlatMap用法到Flink的内部实现
罗西的思考 人气:0
# [源码分析] 从FlatMap用法到Flink的内部实现
## 0x00 摘要
本文将从FlatMap概念和如何使用开始入手,深入到Flink是如何实现FlatMap。希望能让大家对这个概念有更深入的理解。
## 0x01 Map vs FlatMap
首先我们先从概念入手。
自从*响应式编程*慢慢壮大以来,这两个单词现在越来越被大家熟悉了。前端能见到它们的身影,后台也能见到;安卓里面有,iOS也有。很多兄弟刚遇到它们时候是懵圈的,搞不清楚之间的区别。下面我就给大家简单讲解下。
### map
它把`数组流`中的每一个值,使用所提供的函数执行一遍,一一对应。得到与元素个数相同的`数组流`。然后返回这个新数据流。
### flatMap
flat是扁平的意思。所以这个操作是:先映射(map),再拍扁(join)。
flatMap输入可能是多个`子数组流`。所以flatMap先针对 每个`子数组流`的每个元素进行映射操作。然后进行扁平化处理,最后汇集所有进行扁平化处理的结果集形成一个新的列表(扁平化简而言之就是去除所有的修饰)。
flatMap与map另外一个不一样的地方就是传入的函数在处理完后返回值必须是List。
### 实例
比如拿到一个文本文件之后,我们是按行读取,按行处理。如果要对每一行的单词数进行计数,那么应该选择Map方法,如果是统计词频,就应该选择flatMap方法。
如果还不清楚,可以看看下面这个例子:
```scala
梁山新进一批好马,准备给每个马军头领配置一批。于是得到函数以及头领名单如下:
函数 = ( 头领 => 头领 + 好马 )
五虎将 = List(关胜、林冲、秦明、呼延灼、董平 )
八骠骑 = List(花荣、徐宁、杨志、索超、张清、朱仝、史进、穆弘 )
// Map函数的例子
利用map函数,我们可以得到 五虎将马军
五虎将马军 = 五虎将.map( 头领 => 头领 + 好马 )
结果是 List( 关胜 + 马、林冲 + 马、秦明 + 马、呼延灼 + 马、董平 + 马 )
// flatMap函数的例子
但是为了得到统一的马军,则可以用flatMap:
马军头领 = List(五虎将,八骠骑)
马军 = 马军头领.flatMap( 头领 => 头领 + 好马 )
结果就是:List( 关胜 + 马、林冲 + 马、秦明 + 马、呼延灼 + 马、董平 + 马,花荣 + 马、徐宁 + 马、杨志 + 马、索超 + 马、张清 + 马、朱仝 + 马、史进 + 马、穆弘 + 马 )
```
现在大家应该清楚了吧。接下来看看几个FlatMap的实例。
### Scala语言的实现
Scala本身对于List类型就有map和flatMap操作。举例如下:
```scala
val names = List("Alice","James","Apple")
val strings = names.map(x => x.toUpperCase)
println(strings)
// 输出 List(ALICE, JAMES, APPLE)
val chars = names.flatMap(x=> x.toUpperCase())
println(chars)
// 输出 List(A, L, I, C, E, J, A, M, E, S, A, P, P, L, E)
```
### Flink的例子
以上是scala语言层面的实现。下面我们看看Flink框架是如何使用FlatMap的。
网上常见的一个Flink应用的例子:
```scala
//加载数据源
val source = env.fromElements("china is the best country","beijing is the capital of china")
//转化处理数据
val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
```
### Flink源码中的例子
```scala
case class WordWithCount(word: String, count: Long)
val text = env.socketTextStream(host, port, '\n')
val windowCounts = text.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")
windowCounts.print()
```
## 0x02 自定义算子(in Flink)
上面提到的都是简单的使用,如果有复杂需求,在Flink中,我们可以通过继承FlatMapFunction和RichFlatMapFunction来自定义算子。
### 函数类`FlatMapFunction`
对于不涉及到状态的使用,可以直接继承 FlatMapFunction,其定义如下:
```java
@Public
@FunctionalInterface
public interface FlatMapFunction
加载全部内容