Flink的数据类型
Flink使用type information来代表数据类型,Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息(type information),从而获得序列化程序和反序列化程序。但是,在某些情况下,例如lambda函数或泛型类型,需要显式地提供类型信息((type information)),从而提高其性能。本文主要讨论包括:(1)Flink支持的数据类型,(2)如何为数据类型创建type information,(3)如果无法自动推断函数的返回类型,如何使用提示(hints)来帮助Flink的类型系统识别类型信息。
支持的数据类型
Flink支持Java和Scala中所有常见的数据类型,使用比较广泛的类型主要包括以下五种:
- 原始类型
- Java和Scala的tuple类型
- Scala样例类
- POJO类型
- 一些特殊的类型
NOTE:不能被处理的类型将会被视为普通的数据类型,通过Kyro序列化框架进行序列化。
原始类型
Flink支持所有Java和Scala的原始类型,比如Int(Java中的Integer),String、Double等。下面的例子是处理一个Long类型的数据流,处理每个元素+1
val numbers: DataStream[Long] = env.fromElements(1L, 2L,3L, 4L) |
Java和Scala的tuple类型
基于Scala的DataStream API使用的Scala的tuple。下面的例子是过滤一个具有两个字段的tuple类型的数据流.
// DataStream of Tuple2[String, Integer] for Person(name,age) |
Flink提供了有效的Java tuple实现,Flink的Java tuple最多包括25个字段,分别为tuple1,tuple2,直到tuple25,tuple类型是强类型的。使用Java DataStream API重写上面的例子:
// DataStream of Tuple2<String, Integer> for Person(name,age) |
Tuple字段可以通过使用f0,f1,f2的形式访问,也可以通过getField(int pos)方法访问,参数的索引起始值为0,比如:
Tuple2<String, Integer> personTuple = Tuple2.of("Alex","42"); |
与Scala相比,Flink的Java tuple是可变的,所以tuple的元素值是可以被重新复制的。Function可以重用Java tuple,从而减小垃圾回收的压力。下面的例子展示了如何更新一个tuple字段值
personTuple.f1 = 42; // set the 2nd field to 42 |
Scala的样例类
Flink支持Scala的样例类,可以通过字段名称来访问样例类的字段,下面的例子定义了一个Person
样例类,该样例类有两个字段:name
和age
,按age
过滤DataStream,如下所示
case class Person(name: String, age: Int) |
POJO
Flink接受的POJO类型需满足以下条件:
- public 类
- 无参的共有构造方法
- 所有字段都是public的,可以通过getter和setter方法访问
- 所有字段类型必须是Flink能够支持的
下面的例子定义一个Person
POJO
public class Person { |
一些特殊的类型
Flink支持一些有特殊作用的数据类型,比如Array,Java中的ArrayList、HashMap和Enum等,也支持Hadoop的Writable类型。
为数据类型创建类型信息(type information)
显示地指定类型信息(type information)
公众号『大数据技术与数仓』,回复『资料』领取大数据资料包
- 本文链接:https://jiamaoxiang.top/2019/08/27/Flink的数据类型/
- 版权声明:本文为博主原创文章,遵循CC BY-SA 4.0版权协议,转载请附上原文出处链接和本声明
分享