你真的了解Flink Kafka source吗?
Flink 提供了专门的 Kafka 连接器,向 Kafka topic 中读取或者写入数据。Flink Kafka Consumer 集成了 Flink 的 Checkpoint 机制,可提供 exactly-once 的处理语义。为此,Flink 并不完全依赖于跟踪 Kafka 消费组的偏移量,而是在内部跟踪和检查偏移量。
引言
当我们在使用Spark Streaming、Flink等计算框架进行数据实时处理时,使用Kafka作为一款发布与订阅的消息系统成为了标配。Spark Streaming与Flink都提供了相对应的Kafka Consumer,使用起来非常的方便,只需要设置一下Kafka的参数,然后添加kafka的source就万事大吉了。如果你真的觉得事情就是如此的so easy,感觉妈妈再也不用担心你的学习了,那就真的是too young too simple sometimes naive了。本文以Flink 的Kafka Source为讨论对象,首先从基本的使用入手,然后深入源码逐一剖析,一并为你拨开Flink Kafka connector的神秘面纱。值得注意的是,本文假定读者具备了Kafka的相关知识,关于Kafka的相关细节问题,不在本文的讨论范围之内。
Flink Kafka Consumer介绍
Flink Kafka Connector有很多个版本,可以根据你的kafka和Flink的版本选择相应的包(maven artifact id)和类名。本文所涉及的Flink版本为1.10,Kafka的版本为2.3.4。Flink所提供的Maven依赖于类名如下表所示:
Maven 依赖 | 自从哪个版本 开始支持 | 类名 | Kafka 版本 | 注意 |
---|---|---|---|---|
flink-connector-kafka-0.8_2.11 | 1.0.0 | FlinkKafkaConsumer08 FlinkKafkaProducer08 | 0.8.x | 这个连接器在内部使用 Kafka 的 SimpleConsumer API。偏移量由 Flink 提交给 ZK。 |
flink-connector-kafka-0.9_2.11 | 1.0.0 | FlinkKafkaConsumer09 FlinkKafkaProducer09 | 0.9.x | 这个连接器使用新的 Kafka Consumer API |
flink-connector-kafka-0.10_2.11 | 1.2.0 | FlinkKafkaConsumer010 FlinkKafkaProducer010 | 0.10.x | 这个连接器支持 带有时间戳的 Kafka 消息,用于生产和消费。 |
flink-connector-kafka-0.11_2.11 | 1.4.0 | FlinkKafkaConsumer011 FlinkKafkaProducer011 | >= 0.11.x | Kafka 从 0.11.x 版本开始不支持 Scala 2.10。此连接器支持了 Kafka 事务性的消息传递来为生产者提供 Exactly once 语义。 |
flink-connector-kafka_2.11 | 1.7.0 | FlinkKafkaConsumer FlinkKafkaProducer | >= 1.0.0 | 这个通用的 Kafka 连接器尽力与 Kafka client 的最新版本保持同步。该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。从 Flink 1.9 版本开始,它使用 Kafka 2.2.0 client。当前 Kafka 客户端向后兼容 0.10.0 或更高版本的 Kafka broker。 但是对于 Kafka 0.11.x 和 0.10.x 版本,我们建议你分别使用专用的 flink-connector-kafka-0.11_2.11 和 flink-connector-kafka-0.10_2.11 连接器。 |
Demo示例
添加Maven依赖
<!--本文使用的是通用型的connector--> |
简单代码案例
public class KafkaConnector { |
参数配置解读
在Demo示例中,给出了详细的配置信息,下面将对上面的参数配置进行逐一分析。
kakfa的properties参数配置
bootstrap.servers:kafka broker地址
zookeeper.connect:仅kafka0.8版本需要配置
group.id:消费者组
enable.auto.commit:
自动偏移量提交,该值的配置不是最终的偏移量提交模式,需要考虑用户是否开启了checkpoint,
在下面的源码分析中会进行解读
auto.commit.interval.ms:偏移量提交的时间间隔,毫秒
key.deserializer:
kafka 消息的key序列化器,如果不指定会使用ByteArrayDeserializer序列化器
value.deserializer:
kafka 消息的value序列化器,如果不指定会使用ByteArrayDeserializer序列化器
auto.offset.reset:
指定kafka的消费者从哪里开始消费数据,共有三种方式,
- 第一种:earliest
当各分区下有已提交的offset时,从提交的offset开始消费; 无提交的offset时,从头开始消费 - 第二种:latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 - 第三种:none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
注意:上面的指定消费模式并不是最终的消费模式,取决于用户在Flink程序中配置的消费模式
- 第一种:earliest
Flink程序用户配置的参数
- consumer.setCommitOffsetsOnCheckpoints(true)
解释:设置checkpoint后在提交offset,即oncheckpoint模式,该值默认为true,该参数会影响偏移量的提交方式,下面的源码中会进行分析
consumer.setStartFromEarliest()
解释: 最早的数据开始消费 ,该模式下,Kafka 中的 committed offset 将被忽略,不会用作起始位置。该方法为继承父类FlinkKafkaConsumerBase的方法。
consumer.setStartFromGroupOffsets()
解释:消费者组最近一次提交的偏移量,默认。 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置,该方法为继承父类FlinkKafkaConsumerBase的方法。
consumer.setStartFromLatest()
解释:最新的数据开始消费,该模式下,Kafka 中的 committed offset 将被忽略,不会用作起始位置。该方法为继承父类FlinkKafkaConsumerBase的方法。
consumer.setStartFromTimestamp(1585047859000L)
解释:指定具体的偏移量时间戳,毫秒。对于每个分区,其时间戳大于或等于指定时间戳的记录将用作起始位置。 如果一个分区的最新记录早于指定的时间戳,则只从最新记录读取该分区数据。在这种模式下,Kafka 中的已提交 offset 将被忽略,不会用作起始位置。
consumer.setStartFromSpecificOffsets(specificStartOffsets)
解释:为每个分区指定偏移量,该方法为继承父类FlinkKafkaConsumerBase的方法。
请注意:当 Job 从故障中自动恢复或使用 savepoint 手动恢复时,这些起始位置配置方法不会影响消费的起始位置。在恢复时,每个 Kafka 分区的起始位置由存储在 savepoint 或 checkpoint 中的 offset 确定。
Flink Kafka Consumer源码解读
继承关系
Flink Kafka Consumer继承了FlinkKafkaConsumerBase抽象类,而FlinkKafkaConsumerBase抽象类又继承了RichParallelSourceFunction,所以要实现一个自定义的source时,有两种实现方式:一种是通过实现SourceFunction接口来自定义并行度为1的数据源;另一种是通过实现ParallelSourceFunction接口或者继承RichParallelSourceFunction来自定义具有并行度的数据源。FlinkKafkaConsumer的继承关系如下图所示。
源码解读
FlinkKafkaConsumer源码
先看一下FlinkKafkaConsumer的源码,为了方面阅读,本文将尽量给出本比较完整的源代码片段,具体如下所示:代码较长,在这里可以先有有一个总体的印象,下面会对重要的代码片段详细进行分析。
public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> { |
分析
上面的代码已经给出了非常详细的注释,下面将对比较关键的部分进行分析。
构造方法分析
FlinkKakfaConsumer提供了7种构造方法,如上图所示。不同的构造方法分别具有不同的功能,通过传递的参数也可以大致分析出每种构造方法特有的功能,为了方便理解,本文将对其进行分组讨论,具体如下:
单topic
/** |
上面两种构造方法只支持单个topic,区别在于反序列化的方式不一样。第一种使用的是DeserializationSchema,第二种使用的是KafkaDeserializationSchema,其中使用带有KafkaDeserializationSchema参数的构造方法可以获取更多的附属信息,比如在某些场景下需要获取key/value对,offsets(偏移量),topic(主题名称)等信息,可以选择使用此方式的构造方法。以上两种方法都调用了私有的构造方法,私有构造方法的分析见下面。
多topic
/** |
上面的两种多topic的构造方法,可以使用一个list集合接收多个topic进行消费,区别在于反序列化的方式不一样。第一种使用的是DeserializationSchema,第二种使用的是KafkaDeserializationSchema,其中使用带有KafkaDeserializationSchema参数的构造方法可以获取更多的附属信息,比如在某些场景下需要获取key/value对,offsets(偏移量),topic(主题名称)等信息,可以选择使用此方式的构造方法。以上两种方法都调用了私有的构造方法,私有构造方法的分析见下面。
正则匹配topic
/** |
实际的生产环境中可能有这样一些需求,比如有一个flink作业需要将多种不同的数据聚合到一起,而这些数据对应着不同的kafka topic,随着业务增长,新增一类数据,同时新增了一个kafka topic,如何在不重启作业的情况下作业自动感知新的topic。首先需要在构建FlinkKafkaConsumer时的properties中设置flink.partition-discovery.interval-millis参数为非负值,表示开启动态发现的开关,以及设置的时间间隔。此时FLinkKafkaConsumer内部会启动一个单独的线程定期去kafka获取最新的meta信息。具体的调用执行信息,参见下面的私有构造方法
私有构造方法
|
- 其他方法分析
KafkaFetcher对象创建
// 父类(FlinkKafkaConsumerBase)方法重写,该方法的作用是返回一个fetcher实例, |
该方法的作用是返回一个fetcher实例,fetcher的作用是连接kafka的broker,拉去数据并进行反序列化,然后将数据输出为数据流(data stream),在这里对自动偏移量提交模式进行了强制调整,即确保当偏移量的提交模式为ON_CHECKPOINTS(条件1:开启checkpoint,条件2:consumer.setCommitOffsetsOnCheckpoints(true))时,禁用自动提交。这将覆盖用户在properties中配置的任何设置,简单可以理解为:如果开启了checkpoint,并且设置了consumer.setCommitOffsetsOnCheckpoints(true),默认为true,就会将kafka properties的enable.auto.commit强制置为false。关于offset的提交模式,见下文的偏移量提交模式分析。
判断是否设置了自动提交
|
判断是否在kafka的参数开启了自动提交,即enable.auto.commit=true,并且auto.commit.interval.ms>0, 注意:如果没有没有设置enable.auto.commit的参数,则默认为true, 如果没有设置auto.commit.interval.ms的参数,则默认为5000毫秒。该方法会在FlinkKafkaConsumerBase的open方法进行初始化的时候调用。
反序列化
private static void setDeserializer(Properties props) { |
确保配置了kafka消息的key与value的反序列化方式,如果没有配置,则使用ByteArrayDeserializer序列化器,
ByteArrayDeserializer类的deserialize方法是直接将数据进行return,未做任何处理。
FlinkKafkaConsumerBase源码
|
上述代码是FlinkKafkaConsumerBase的部分代码片段,基本上对其做了详细注释,里面的有些方法是FlinkKafkaConsumer继承的,有些是重写的。之所以在这里给出,可以对照FlinkKafkaConsumer的源码,从而方便理解。
偏移量提交模式分析
Flink Kafka Consumer 允许有配置如何将 offset 提交回 Kafka broker(或 0.8 版本的 Zookeeper)的行为。请注意:Flink Kafka Consumer 不依赖于提交的 offset 来实现容错保证。提交的 offset 只是一种方法,用于公开 consumer 的进度以便进行监控。
配置 offset 提交行为的方法是否相同,取决于是否为 job 启用了 checkpointing。在这里先给出提交模式的具体结论,下面会对两种方式进行具体的分析。基本的结论为:
开启checkpoint
情况1:用户通过调用 consumer 上的 setCommitOffsetsOnCheckpoints(true) 方法来启用 offset 的提交(默认情况下为 true )
那么当 checkpointing 完成时,Flink Kafka Consumer 将提交的 offset 存储在 checkpoint 状态中。
这确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 一致。
注意,在这个场景中,Properties 中的自动定期 offset 提交设置会被完全忽略。
此情况使用的是ON_CHECKPOINTS情况2:用户通过调用 consumer 上的 setCommitOffsetsOnCheckpoints(“false”) 方法来禁用 offset 的提交,则使用DISABLED模式提交offset
未开启checkpoint
Flink Kafka Consumer 依赖于内部使用的 Kafka client 自动定期 offset 提交功能,因此,要禁用或启用 offset 的提交情况1:配置了Kafka properties的参数配置了”enable.auto.commit” = “true”或者 Kafka 0.8 的 auto.commit.enable=true,使用KAFKA_PERIODIC模式提交offset,即自动提交offset
- 情况2:没有配置enable.auto.commit参数,使用DISABLED模式提交offset,这意味着kafka不知道当前的消费者组的消费者每次消费的偏移量。
提交模式源码分析
- offset的提交模式
public enum OffsetCommitMode { |
- 提交模式的调用
public class OffsetCommitModes { |
小结
本文主要介绍了Flink Kafka Consumer,首先对FlinkKafkaConsumer的不同版本进行了对比,然后给出了一个完整的Demo案例,并对案例的配置参数进行了详细解释,接着分析了FlinkKafkaConsumer的继承关系,并分别对FlinkKafkaConsumer以及其父类FlinkKafkaConsumerBase的源码进行了解读,最后从源码层面分析了Flink Kafka Consumer的偏移量提交模式,并对每一种提交模式进行了梳理。
公众号『大数据技术与数仓』,回复『资料』领取大数据资料包
- 本文链接:https://jiamaoxiang.top/2020/04/02/你真的了解Flink-Kafka-connector吗?/
- 版权声明:本文为博主原创文章,遵循CC BY-SA 4.0版权协议,转载请附上原文出处链接和本声明
分享