Flink使用type information来代表数据类型,Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息(type information),从而获得序列化程序和反序列化程序。但是,在某些情况下,例如lambda函数或泛型类型,需要显式地提供类型信息((type information)),从而提高其性能。本文主要讨论包括:(1)Flink支持的数据类型,(2)如何为数据类型创建type information,(3)如果无法自动推断函数的返回类型,如何使用提示(hints)来帮助Flink的类型系统识别类型信息。

支持的数据类型

Flink支持Java和Scala中所有常见的数据类型,使用比较广泛的类型主要包括以下五种:

  • 原始类型
  • Java和Scala的tuple类型
  • Scala样例类
  • POJO类型
  • 一些特殊的类型

NOTE:不能被处理的类型将会被视为普通的数据类型,通过Kyro序列化框架进行序列化。

原始类型

Flink支持所有Java和Scala的原始类型,比如Int(Java中的Integer),String、Double等。下面的例子是处理一个Long类型的数据流,处理每个元素+1

val numbers: DataStream[Long] = env.fromElements(1L, 2L,3L, 4L)  
numbers.map( n => n + 1)

Java和Scala的tuple类型

基于Scala的DataStream API使用的Scala的tuple。下面的例子是过滤一个具有两个字段的tuple类型的数据流.

// DataStream of Tuple2[String, Integer] for Person(name,age)  
val persons: DataStream[(String, Integer)] = env.fromElements(("Adam", 17),("Sarah", 23))
// filter for persons of age > 18
persons.filter(p => p._2 > 18)

Flink提供了有效的Java tuple实现,Flink的Java tuple最多包括25个字段,分别为tuple1,tuple2,直到tuple25,tuple类型是强类型的。使用Java DataStream API重写上面的例子:

// DataStream of Tuple2<String, Integer> for Person(name,age)  
DataStream<Tuple2<String, Integer>> persons =env.fromElements(Tuple2.of("Adam", 17),Tuple2.of("Sarah",23));
// filter for persons of age > 18
persons.filter(p -> p.f1 > 18);

Tuple字段可以通过使用f0,f1,f2的形式访问,也可以通过getField(int pos)方法访问,参数的索引起始值为0,比如:

Tuple2<String, Integer> personTuple = Tuple2.of("Alex","42");  
Integer age = personTuple.getField(1); // age = 42

与Scala相比,Flink的Java tuple是可变的,所以tuple的元素值是可以被重新复制的。Function可以重用Java tuple,从而减小垃圾回收的压力。下面的例子展示了如何更新一个tuple字段值

personTuple.f1 = 42; // set the 2nd field to 42     
personTuple.setField(43, 1); // set the 2nd field to 43

Scala的样例类

Flink支持Scala的样例类,可以通过字段名称来访问样例类的字段,下面的例子定义了一个Person样例类,该样例类有两个字段:nameage,按age过滤DataStream,如下所示

case class Person(name: String, age: Int)  
val persons: DataStream[Person] = env.fromElements(Person("Adam", 17),Person("Sarah", 23))
// filter for persons with age > 18
persons.filter(p => p.age > 18)

POJO

Flink接受的POJO类型需满足以下条件:

  • public 类
  • 无参的共有构造方法
  • 所有字段都是public的,可以通过getter和setter方法访问
  • 所有字段类型必须是Flink能够支持的
    下面的例子定义一个PersonPOJO
public class Person {  
// both fields are public
public String name;
public int age;
// default constructor is present
public Person() {}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
}
DataStream<Person> persons = env.fromElements(
new Person("Alex", 42),
new Person("Wendy", 23));

一些特殊的类型

Flink支持一些有特殊作用的数据类型,比如Array,Java中的ArrayList、HashMap和Enum等,也支持Hadoop的Writable类型。

为数据类型创建类型信息(type information)

显示地指定类型信息(type information)

公众号『大数据技术与数仓』,回复『资料』领取大数据资料包