Flink1.11中的CDC Connectors操作实践
Flink1.11引入了CDC的connector,通过这种方式可以很方便地捕获变化的数据,大大简化了数据处理的流程。Flink1.11的CDC connector主要包括:MySQL CDC
和Postgres CDC
,同时对Kafka的Connector支持canal-json
和debezium-json
以及changelog-json
的format。本文主要分享以下内容:
- CDC简介
- Flink提供的 table format
- 使用过程中的注意点
- mysql-cdc的操作实践
- canal-json的操作实践
- changelog-json的操作实践
简介
Flink CDC Connector 是ApacheFlink的一组数据源连接器,使用变化数据捕获change data capture (CDC))从不同的数据库中提取变更数据。Flink CDC连接器将Debezium集成为引擎来捕获数据变更。因此,它可以充分利用Debezium的功能。
特点
支持读取数据库快照,并且能够持续读取数据库的变更日志,即使发生故障,也支持exactly-once 的处理语义
对于DataStream API的CDC connector,用户无需部署Debezium和Kafka,即可在单个作业中使用多个数据库和表上的变更数据。
对于Table/SQL API 的CDC connector,用户可以使用SQL DDL创建CDC数据源,来监视单个表上的数据变更。
使用场景
- 数据库之间的增量数据同步
- 审计日志
- 数据库之上的实时物化视图
- 基于CDC的维表join
- …
Flink提供的 table format
Flink提供了一系列可以用于table connector的table format,具体如下:
Formats | Supported Connectors |
---|---|
CSV | Apache Kafka, Filesystem |
JSON | Apache Kafka, Filesystem, Elasticsearch |
Apache Avro | Apache Kafka, Filesystem |
Debezium CDC | Apache Kafka |
Canal CDC | Apache Kafka |
Apache Parquet | Filesystem |
Apache ORC | Filesystem |
使用过程中的注意点
使用MySQL CDC的注意点
如果要使用MySQL CDC connector,对于程序而言,需要添加如下依赖:
<dependency> |
如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。
使用canal-json的注意点
如果要使用Kafka的canal-json,对于程序而言,需要添加如下依赖:
<!-- universal --> |
如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。由于Flink1.11的安装包 的lib目录下并没有提供该jar包,所以必须要手动添加依赖包,否则会报如下错误:
[ERROR] Could not execute SQL statement. Reason: |
使用changelog-json的注意点
如果要使用Kafka的changelog-json Format,对于程序而言,需要添加如下依赖:
<dependency> |
如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。
mysql-cdc的操作实践
创建MySQL数据源表
在创建MySQL CDC表之前,需要先创建MySQL的数据表,如下:
-- MySQL |
Flink SQL Cli创建CDC数据源
启动 Flink 集群,再启动 SQL CLI,执行下面命令:
-- 创建订单信息表 |
在Flink SQL Cli中查询该表的数据:result-mode: tableau,+表示数据的insert
在SQL CLI中创建订单详情表:
CREATE TABLE order_detail( |
查询结果如下:
执行JOIN操作:
SELECT |
canal-json的操作实践
关于cannal的使用方式,可以参考我的另一篇文章:基于Canal与Flink实现数据实时增量同步(一)。我已经将下面的表通过canal同步到了kafka,具体格式为:
{ |
在SQL CLI中创建该canal-json格式的表:
CREATE TABLE region ( |
查询结果如下:
changelog-json的操作实践
创建MySQL数据源
参见上面的order_info
Flink SQL Cli创建changelog-json表
CREATE TABLE order_gmv2kafka ( |
查询表看一下结果:
再查一下kafka的数据:
{"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"} |
当将另外两个订单的状态order_status更新为2时,总金额=443+772+88=1293
再观察数据:
再看kafka中的数据:
总结
本文基于Flink1.11的SQL,对新添加的CDC connector的使用方式进行了阐述。主要包括MySQL CDC connector、canal-json及changelog-json的format,并指出了使用过程中的注意点。另外本文给出了完整的使用示例,如果你有现成的环境,那么可以直接进行测试使用。
公众号『大数据技术与数仓』,回复『资料』领取大数据资料包
相关推荐 ☟
- 本文链接:https://jiamaoxiang.top/2020/08/12/Flink1-11中的CDC-Connectors操作实践/
- 版权声明:本文为博主原创文章,遵循CC BY-SA 4.0版权协议,转载请附上原文出处链接和本声明
分享