Flink内核原理核心抽象 解析Flink内核原理与实现核心抽象
毛凯民 人气:1Flink中设计了用户自定义函数体系(User Defined Function,UDF),开发人员实现业务逻辑就是开发UDF。
一、环境对象
StreamExecutionEnvironment是Flink应用开发时的概念,表示流计算作业的执行环境,是作业开发的入口、数据源接口、生成和转换DataStream的接口、数据Sink的接口、作业配置接口、作业启动执行的入口。
Environment是运行时作业级别的概念,从StreamExecutionEnvironment中的配置信息衍生而来。进入到Flink作业执行的时刻,作业需要的是相关的配置信息,如作业的名称、并行度、作业编号JobID、监控的Metric、容错的配置信息、IO等,用StreamExecutionRuntime对象就不适合了,很多API是不需要的,所以在Flink中抽象出了Environment作为运行时刻的上下文信息。
RuntimeContext是运行时Task实例级别的概念。Environment本身仍然是比较粗粒度作业级别的配置,对于每一个Task而言,其本身有更细节的配置信息,所以Flink又抽象了RuntimeContext,每一个Task实例有自己的RuntimeContext。
环境对象关系如下:
1.1 执行环境
StreamExecutionEnvironment
Flink流计算应用的执行环境,是Flink作业开发和启动执行的入口
开发者对StreamExecutionEnvironment的实现是无感知的。
LocalStreamEnvironment
本地执行环境,在单个JVM中使用多线程模拟Flink集群。
其基本的工作流程如下:
1) 执行Flink作业的Main函数生成Streamgraph,转化为JobGraph。
2) 设置任务运行的配置信息。
3) 根据配置信息启动对应的LocalFlinkMiniCluster。
4) 根据配置信息和miniCluster生成对应的MiniClusterClient。
5) 通过MiniClusterClient提交JobGraph 到MiniCluster。
RemoteStreamEnvironment
在大规模数据中心中部署的Flink生产集群的执行环境。
当将作业发布到Flink集群的时候,使用RemoteStreamEnvironment。
其基本的工作流程如下:
1) 执行Flink作业的Main函数生成Streamgraph,转化为JobGraph。
2) 设置任务运行的配置信息。
3) 提交JobGraph到远程的Flink集群。
StreamContextEnvironment
在Cli命令行或者单元测试时候会被使用,执行步骤同上。
StreamPlanEnvironment
在Flink Web UI管理界面中可视化展现Job的时候,专门用来生成执行计划(实际上就是StreamGraph)
ScalaShellStreamEnvironment
这是Scala Shell执行环境,可以在命令行中交互式开发Flink作业。
其基本工作流程如下:
- 校验部署模式,目前Scala Shell仅支持attached模式。
- 上传每个作业需要的Jar文件。
其余步骤与RemoteStreamEnvironment类似。
1.2 运行时环境
RuntimeEnvironment
在Task开始执行时进行初始化,把Task运行相关的信息都封装到该对象中,其中不光包含了配置信息,运行时的各种服务也会被包装到其中。
SavepointEnvironment
SavepointEnvironment是Environment的最小化实现,在状态处理器的API中使用。
1.3 运行时上下文
RuntimeContext是Function运行时的上下文,封装了Function运行时可能需要的所有信息,让Function在运行时能够获取到作业级别的信息,如并行度相关信息、Task名称、执行配置信息(ExecutionConfig)、State等。
Function的每个实例都有一个RuntimeContext对象,在RichFunction中通过getRunctionContext()可以访问该对象。
RuntimeContext的类体系图如下:
StreamingRuntimeContext:
在流计算UDF中使用的上下文,用来访问作业信息、状态等。
DistributedRuntimeUDFContext:
由运行时UDF所在的批处理算子创建,在DataSet批处理中使用。
RuntimeUDFContext:
在批处理应用的UDF中使用。
SavepointRuntimeContext:
支持对检查点和保存点进行操作,包括读取、变更、写入等。
CepRuntimeContext:
CEP复杂事件处理中使用的上下文。
二、数据流元素
数据流元素在Flink中叫做StreamElement
- 有数据记录StreamRecord,
- 延迟标记LatencyMarker、Watermark、
- 流状态标记StreamStatus这四种。
在执行层面,4种数据流元素都被序列化成二进制数据,形成混合的数据流,在算子中将混合数据流中的数据流元素反序列化出来。
StreamRecord
StreamRecord表示数据流中的一条记录(或者叫做一个事件),也叫数据记录。
包含以下内容:
1)数据的值本身
2)时间戳(可选)
LatencyMarker
用来近似评估延迟,LatencyMarker在Source中创建,并向下游发送,绕过业务处理逻辑,在Sink节点中使用LatencyMarker估计数据在整个DAG图中的流转花费的时间。
LatencyMarker包含信息如下:
1)周期性的在数据源算子中创造出来的时间戳。
2)算子编号
3)数据源算子所在的Task编号
Watermark
是一个时间戳,用来告诉算子所有时间早于等于Watermark的事件或记录都已经达到,不会再有比Watermark更早的记录。
StreamStatus
用来通知Task是否会继续接收到上游的记录或者Watermark。在数据源算子中生成,向下游沿着DataFlow传递。
有两种表示状态:
1)空闲状态(IDLE)
2)活动状态(ACTIVE)
三、数据转换
数据转换在Flink中叫做Transformation,是衔接DataStream Api和Flink内核的逻辑结构。
Transformation有两大类:
1)物理Transformation: 会转换成算子,继承了PhysicalTransformation。
2)虚拟Transformation: 不会转换成具体算子。
Tranformation包含了Flink的运行时关键参数:
1)name:转换器名称,主要用于可视化。
2)uid:用户指定的uid,该uid的主要目的是在job重启时再次分配跟之前相同的uid,可以持久保存状态。
3)bufferTimeout:buffer超时时间。
4)parallelism:并行度。
5)id:跟属性uid无关,生成方式是基于一个静态累加器。
6)outputType:输出类型,用来进行序列化数据。
7)slotSharingGroup:给当前的Transformation设置Slot共享组。
3.1 物理Transformation SourceTransformation
从数据源读取数据的Transformation,是Flink作业的起点。
只有下游Transformation,没有上游输入。
SinkTransformation
将数据写到外部存储的Transformation,是Flink作业的终点。
OneInputTransformation
单流输入的Transformation(只接收一个输入流),跟上面的SinkTransformation构造器类似,同样需要input和operator参数。
TwoInputTransformation
双输入的Transformation(接收两种流作为输入),分别叫做第一输入和第二输入。
3.2 虚拟Transformation SideOutputTransformation
在旁路输出中转换而来,表示上游Transformation的一个分流。
每个sideoutput通过OutputTag标识。
SplitTransformation
用来按条件切分数据流,该转换用于将一个流拆分成多个流。
SelectTransformation
与SplitTransformation配合使用,用来在下游选择SplitTransformation切分的数据流。
PartitionTransformation
该转换器用于改变输入元素的分区,其名称为Partition。工作时除了提供一个StreamTransformation作为输入外,还需要提供一个StreamPartitionor的实例来进行分区。
UnionTransformation
合并转换器,该转换器用于将多个输入StreamTransformation进行合并,因此该转换器接收StreamTransformation的集合。Union要求上游输入的数据的结构必须是完全相同的。
FeedbackTransformation
表示FlinkDAG中的一个反馈点。简单来说,就是把符合条件的数据发回上游Transformation处理,一个反馈点可以连接一个或多个上游的Transformation,这些连接关系叫反馈边。符合反馈条件并交给上游的Transformation的数据流叫做反馈流。
FeedbackTransformation的固定名称为Feedback
有两个重要参数:
- input:上游输入StreamTransformation
- waitTime:默认为0,即永远等待,如果设置了等待时间,一旦超过该等待时间,则计算结束并且不再接收数据。
实例化FeedbackTransformation时,会自动创建一个用于存储反馈边的集合feedbackEdges。
FeedbackTransformation通过定义一个实力方法addFeedbackEdge来收集,
在加入的StreamTransformation的实例有一个要求,
当前FeedbackTransformation跟待加入的StreamTransformation并行度一致。
CoFeedbackTransformation
与FeedbackTransformation类似,也是FlinkDAG中的一个反馈点。
- 不同之处在于,CoFeedbackTransformation反馈给上游的数据流与上游Transformation的输入类型不同
- 所以要求上游的Transformation必须是TwoInputTransformation。
四、算子行为
4.1 生命周期管理
1)setup:初始化环境、时间服务、注册监控等。
2)open:该行为由各个具体的算子负责实现,包含了算子的初始化逻辑。
3)close:所有的数据处理完毕之后关闭算子,此时需要去报将所有的缓存数据向下游发送。
4)dispose:该方法在算子生命周期的最后执行阶段,此时算子已经关闭,停止处理数据,进行资源的释放。
StreamTask作为算子的容器,负责管理算子的生命周期。
4.2 异步算子
异步算子的目的是解决与外部系统交互时网络延迟所导致的系统瓶颈问题。
异步算子的两种输出模式
1)顺序输出
先收到的数据先输出,后续数据元素的异步函数调用无论是否先完成,都需要等待,顺序模式可以保证消息不乱序,但是可能增加延迟,降低算子的吞吐量。
2)无序输出
先处理完的数据元素先输出,不保证消息顺序,相比于顺序模式,无序输出模式算子延迟低、吞吐量高。无序输出模式并不是完全无序的,仍然要保持Watermark不能超越其前面数据元素的原则。等待完成队列将按照Watermakr切分成组,组内可以无序输出,组之间必须严格保证顺序。
五、处理函数
5.1 双流Join 即时Join
逻辑如下:
1) 创建一个State对象
2)接收到输入流 1事件后更新Sate。
3)接收到输出流 2的事件后遍历State,根据Join条件进行匹配,将匹配结果发送到下游。
5.2延迟双流Join
在流式数据里,数据可能是乱序的,数据会延迟到达,并且为了提供处理效率,使用小批量模式计算,而不是每个事件触发一次Join计算。
逻辑如下:
1)创建2个state对象,分别缓存输入流1和输入流2的事件。
2)创建一个定时器,等待数据的到达,定时延迟触发Join计算。
3)接收到输入流1事件后更新State。
4)接收到输入流2事件后更新State。
5)定时器遍历State1和State2,根据Join条件进行匹配,将匹配结果发送到下游。
六、数据分区
数据分区在Flink中叫做Partition。
本质上说,分布式计算就是把一个作业切分成子任务Task,将不同的数据交给不同的Task计算。
StreamParitioner
是Flink中的数据流分区抽象接口,决定了在实际运行中的数据流分发模式。
自定义分区
使用用户自定义分区函数,为每一个元组选择目标分区。
ForwardParitioner
用于在同一个OperatorChain中上下游算子之间的数据转发, 实际上数据是直接传递给下游的。
ShufflePartitioner
随机将元素进行分区,可以确保下游的Task能够均匀的获取数据。
ReblancePartitioner
以Round-robin的方式为每个元素分配分区,确保下游的Task可以均匀的获取数据,以免数据倾斜。
RescalingPartitioner
根据上下游Task的数据进行分区。
使用Round-robin选择下游的一个Task进行数据分区,
如上游有2个Source,下游有6个Map,那么每个Source会分配3个固定下游的map,
不会向未分配给自己的分区写入数据。
BroadcastPartitioner
将该记录广播给所有分区,即有N个分区,就把数据复制N份,每个分区1分
KeyGroupStreamPartitioner
keyedStream根据KeyGroup索引编号进行分区,该分区器不是提供给用户来用。
KeyedStream在构造Transformation的时候默认使用KeyedGroup分区形式,从而在底层上支持作业Rescale功能。
七、分布式ID
加载全部内容