Flink DataStream API编程指南
Flink DataStream API主要分为三个部分,分别为Source、Transformation以及Sink,其中Source是数据源,Flink内置了很多数据源,比如最常用的Kafka。Transformation是具体的转换操作,主要是用户定义的处理数据的逻辑,比如Map,FlatMap等。Sink(数据汇)是数据的输出,可以把处理之后的数据输出到存储设备上,Flink内置了许多的Sink,比如Kafka,HDFS等。另外除了Flink内置的Source和Sink外,用户可以实现自定义的Source与Sink。考虑到内置的Source与Sink使用起来比较简单且方便,所以,关于内置的Source与Sink的使用方式不在本文的讨论范围之内,本文会先从自定义Source开始说起,然后详细描述一些常见算子的使用方式,最后会实现一个自定义的Sink。
数据源
Flink内部实现了比较常用的数据源,比如基于文件的,基于Socket的,基于集合的等等,如果这些都不能满足需求,用户可以自定义数据源,下面将会以MySQL为例,实现一个自定义的数据源。本文的所有操作将使用该数据源,具体代码如下:
/** |
首先继承RichParallelSourceFunction,实现继承的方法,主要包括open()方法、run()方法及close方法。上述的
RichParallelSourceFunction是支持设置多并行度的,关于RichParallelSourceFunction与RichSourceFunction的区别,前者支持用户设置多并行度,后者不支持通过setParallelism()方法设置并行度,默认的并行度为1,否则会报如下错误:
另外,RichParallelSourceFunction提供了额外的open()方法与close()方法,如果定义Source时需要获取链接,那么可以在open()方法中进行初始化,然后在close()方法中关闭资源链接,关于Rich***Function与普通Function的区别,下文会详细解释,在这里先有个印象。上述的代码中的配置信息是通过配置文件传递的,由于篇幅限制,我会把本文的代码放置在github,见文末github地址。
## 基本转换
Flink提供了大量的算子操作供用户使用,常见的算子主要包括以下几种,注意:本文不讨论关于基于时间与窗口的算子,这些内容会在《Flink基于时间与窗口的算子》中进行详细介绍。
**说明**:本文的操作是基于上文自定义的MySQL Source,对应的数据解释如下:
```java
userId; // 用户ID
itemId; // 商品ID
catId; // 商品类目ID
merchantId; // 卖家ID
brandId; // 品牌ID
action; // 用户行为, 包括("pv", "buy", "cart", "fav")
gender; // 性别
timestamp; // 行为发生的时间戳,单位秒
Map
解释
DataStream → DataStream 的转换,输入一个元素,返回一个元素,如下操作:
SingleOutputStreamOperator<String> userBehaviorMap = userBehavior.map(new RichMapFunction<UserBehavior, String>() { |
示意图
将雨滴形状转换成相对应的圆形形状的map操作
flatMap
解释
DataStream → DataStream,输入一个元素,返回零个、一个或多个元素。事实上,flatMap算子可以看做是filter与map的泛化,即它能够实现这两种操作。flatMap算子对应的FlatMapFunction定义了flatMap方法,可以通过向collector对象传递数据的方式返回0个,1个或者多个事件作为结果。如下操作:
SingleOutputStreamOperator<UserBehavior> userBehaviorflatMap = userBehavior.flatMap(new RichFlatMapFunction<UserBehavior, UserBehavior>() { |
示意图
将黄色的雨滴过滤掉,将蓝色雨滴转为圆形,保留绿色雨滴
Filter
解释
DataStream → DataStream,过滤算子,对数据进行判断,符合条件即返回true的数据会被保留,否则被过滤。如下:
SingleOutputStreamOperator<UserBehavior> userBehaviorFilter = userBehavior.filter(new RichFilterFunction<UserBehavior>() { |
示意图
将红色与绿色雨滴过滤掉,保留蓝色雨滴。
keyBy
解释
DataStream→KeyedStream,从逻辑上将流划分为不相交的分区。具有相同键的所有记录都分配给同一分区。在内部,keyBy()是通过哈希分区实现的。
定义键值有3中方式:
(1)使用字段位置,如keyBy(1),此方式是针对元组数据类型,比如tuple,使用元组相应元素的位置来定义键值;
(2)字段表达式,用于元组、POJO以及样例类;
(3)键值选择器,即keySelector,可以从输入事件中提取键值
SingleOutputStreamOperator<Tuple2<String, Integer>> userBehaviorkeyBy = userBehavior.map(new RichMapFunction<UserBehavior, Tuple2<String, Integer>>() { |
示意图
基于形状对事件进行分区的keyBy操作
Reduce
解释
KeyedStream → DataStream,对数据进行滚动聚合操作,结合当前元素和上一次Reduce返回的值进行聚合,然后返回一个新的值.将一个ReduceFunction应用在一个keyedStream上,每到来一个事件都会与当前reduce的结果进行聚合,
产生一个新的DataStream,该算子不会改变数据类型,因此输入流与输出流的类型永远保持一致。
SingleOutputStreamOperator<Tuple2<String, Integer>> userBehaviorReduce = userBehavior.map(new RichMapFunction<UserBehavior, Tuple2<String, Integer>>() { |
示意图
Aggregations(滚动聚合)
KeyedStream → DataStream,Aggregations(滚动聚合),滚动聚合转换作用于KeyedStream流上,生成一个包含聚合结果(比如sum求和,min最小值)的DataStream,滚动聚合的转换会为每个流过该算子的key值保存一个聚合结果,
当有新的元素流过该算子时,会根据之前的结果值和当前的元素值,更新相应的结果值
sum():滚动聚合流过该算子的指定字段的和;
min():滚动计算流过该算子的指定字段的最小值
max():滚动计算流过该算子的指定字段的最大值
minBy():滚动计算当目前为止流过该算子的最小值,返回该值对应的事件;
maxBy():滚动计算当目前为止流过该算子的最大值,返回该值对应的事件;
union
解释
DataStream* → DataStream,将多条流合并,新的的流会包括所有流的数据,值得注意的是,两个流的数据类型必须一致,另外,来自两条流的事件会以FIFO(先进先出)的方式合并,所以并不能保证两条流的顺序,此外,union算子不会对数据去重,每个输入事件都会被发送到下游算子。
userBehaviorkeyBy.union(userBehaviorReduce).print();//将两条流union在一起,可以支持多条流(大于2)的union |
示意图
connect
解释
DataStream,DataStream → ConnectedStreams,将两个流的事件进行组合,返回一个ConnectedStreams对象,两个流的数据类型可以不一致,ConnectedStreams对象提供了类似于map(),flatMap()功能的算子,如CoMapFunction与CoFlatMapFunction分别表示map()与flatMap算子,这两个算子会分别作用于两条流,注意:CoMapFunction 或CoFlatMapFunction被调用的时候并不能控制事件的顺序只要有事件流过该算子,该算子就会被调用。
ConnectedStreams<UserBehavior, Tuple2<String, Integer>> behaviorConnectedStreams = userBehaviorFilter.connect(userBehaviorkeyBy); |
split
解释
DataStream → SplitStream,将流分割成两条或多条流,与union相反。分割之后的流与输入流的数据类型一致,
对于每个到来的事件可以被路由到0个、1个或多个输出流中。可以实现过滤与复制事件的功能,DataStream.split()接收一个OutputSelector函数,用来定义分流的规则,即将满足不同条件的流分配到用户命名的一个输出。
SplitStream<UserBehavior> userBehaviorSplitStream = userBehavior.split(new OutputSelector<UserBehavior>() { |
示意图
Sink
Flink提供了许多内置的Sink,比如writeASText,print,HDFS,Kaka等等,下面将基于MySQL实现一个自定义的Sink,可以与自定义的MysqlSource进行对比,具体如下:
/** |
关于RichFunction
细心的读者可以发现,在前文的算子操作案例中,使用的都是RichFunction,因为在很多时候需要在函数处理数据之前先进行一些初始化操作,或者获取函数的上下文信息,DataStream API提供了一类RichFunction,与普通的函数相比,该函数提供了许多额外的功能。
使用RichFunction的时候,可以实现两个额外的方法:
- open(),是初始化方法,会在每个人物首次调用转换方法(比如map)前调用一次。通常用于进行一次的设置工作,注意Configuration参数只在DataSet API中使用,而并没有在DataStream API中使用,因此在使用DataStream API时,可以将其忽略。
- close(),函数的终止方法 ,会在每个任务最后一次调用转换方法后调用一次,通常用于资源释放等操作。
此外用户还可以通过getRuntimeContext()方法访问函数的上下文信息(RuntimeContext),例如函数的并行度,函数所在subtask的编号以及执行函数的任务名称,同时也可以访问分区状态。
总结
本文首先实现了自定义MySQL Source,然后基于MySql 的Source进行了一系列的算子操作,并对常见的算子操作进行详细剖析,最后实现了一个自定义MySQL Sink,并对RichFunction进行了解释。
代码地址:https://github.com/jiamx/study-flink
公众号『大数据技术与数仓』,回复『资料』领取大数据资料包
相关推荐 ☟
- 本文链接:https://jiamaoxiang.top/2020/04/12/Flink-DataStream-API编程指南/
- 版权声明:本文为博主原创文章,遵循CC BY-SA 4.0版权协议,转载请附上原文出处链接和本声明
分享