第十篇|SparkStreaming手动维护Kafka Offset的几种方式
Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器,而是创建输入流直接从Kafka 集群节点拉取消息。输入流保证每个消息从Kafka 集群拉取以后只完全转换一次,保证语义一致性。但是当作业发生故障或重启时,要保障从当前的消费位点去处理数据(即Exactly Once语义),单纯的依靠SparkStreaming本身的机制是不太理想的,生产环境中通常借助手动管理offset的方式来维护kafka的消费位点。本文分享将介绍如何手动管理Kafka的Offset,希望对你有所帮助。本文主要包括以下内容:
- 如何使用MySQL管理Kafka的Offset
- 如何使用Redis管理Kafka的OffSet
公众号『大数据技术与数仓』,回复『资料』领取大数据资料包
如何使用MySQL管理Kafka的Offset
我们可以从Spark Streaming 应用程序中编写代码来手动管理Kafka偏移量,偏移量可以从每一批流处理中生成的RDDS偏移量来获取,获取方式为:
KafkaUtils.createDirectStream(...).foreachRDD { rdd => |
当获取到偏移量之后,可以将将其保存到外部存储设备中(MySQL、Redis、Zookeeper、HBase等)。
使用案例代码
- MySQL中用于保存偏移量的表
CREATE TABLE `topic_par_group_offset` ( |
- 常量配置类:ConfigConstants
object ConfigConstants { |
- JDBC连接工具类:JDBCConnPool
object JDBCConnPool { |
- Kafka生产者:KafkaProducerTest
object KafkaProducerTest { |
- 读取和保存Offset:
该对象的作用是从外部设备中读取和写入Offset,包括MySQL和Redis
object OffsetReadAndSave { |
- 业务处理类
该对象是业务处理逻辑,主要是消费Kafka数据,再处理之后进行手动将偏移量保存到MySQL中。在启动程序时,会判断外部存储设备中是否存在偏移量,如果是首次启动则从最初的消费位点消费,如果存在Offset,则从当前的Offset去消费。
观察现象:当首次启动时会从头消费数据,手动停止程序,然后再次启动,会发现会从当前提交的偏移量消费数据。
object ManualCommitOffset { |
如何使用Redis管理Kafka的OffSet
- Redis连接类
object JedisConnPool { |
- 业务逻辑处理
该对象与上面的基本类似,只不过使用的是Redis来进行存储Offset,存储到Redis的数据类型是Hash,基本格式为:[key field value] -> [ topic_groupid partition offset],即 key为topic_groupid,field为partition,value为offset。
object ManualCommitOffsetToRedis { |
总结
本文介绍了如何使用外部存储设备来保存Kafka的消费位点,通过详细的代码示例说明了使用MySQL和Redis管理消费位点的方式。当然,外部存储设备很多,用户也可以使用其他的存储设备进行管理Offset,比如Zookeeper和HBase等,其基本处理思路都十分相似。
公众号『大数据技术与数仓』,回复『资料』领取大数据资料包
相关推荐 ☟
- 本文链接:https://jiamaoxiang.top/2020/11/11/第十篇-SparkStreaming手动维护Kafka-Offset的几种方式/
- 版权声明:本文为博主原创文章,遵循CC BY-SA 4.0版权协议,转载请附上原文出处链接和本声明
分享