实时数仓|基于Flink1.11的SQL构建实时数仓探索实践
实时数仓主要是为了解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的OLAP分析、实时的数据看板、业务指标实时监控等场景。虽然关于实时数仓的架构及技术选型与传统的离线数仓会存在差异,但是关于数仓建设的基本方法论是一致的。本文会分享基于Flink SQL从0到1搭建一个实时数仓的demo,涉及数据采集、存储、计算、可视化整个处理流程。通过本文你可以了解到:
- 实时数仓的基本架构
- 实时数仓的数据处理流程
- Flink1.11的SQL新特性
- Flink1.11存在的bug
- 完整的操作案例
古人学问无遗力,少壮工夫老始成。
纸上得来终觉浅,绝知此事要躬行。
案例简介
本文会以电商业务为例,展示实时数仓的数据处理流程。另外,本文旨在说明实时数仓的构建流程,所以不会涉及太复杂的数据计算。为了保证案例的可操作性和完整性,本文会给出详细的操作步骤。为了方便演示,本文的所有操作都是在Flink SQL Cli中完成的。
架构设计
具体的架构设计如图所示:首先通过canal解析MySQL的binlog日志,将数据存储在Kafka中。然后使用Flink SQL对原始数据进行清洗关联,并将处理之后的明细宽表写入kafka中。维表数据存储在MySQL中,通过Flink SQL对明细宽表与维表进行JOIN,将聚合后的数据写入MySQL,最后通过FineBI进行可视化展示。
业务数据准备
- 订单表(order_info)
CREATE TABLE `order_info` ( |
- 订单详情表(order_detail)
CREATE TABLE `order_detail` ( |
- 商品表(sku_info)
CREATE TABLE `sku_info` ( |
- 商品一级类目表(base_category1)
CREATE TABLE `base_category1` ( |
- 商品二级类目表(base_category2)
CREATE TABLE `base_category2` ( |
- 商品三级类目表(base_category3)
CREATE TABLE `base_category3` ( |
- 省份表(base_province)
CREATE TABLE `base_province` ( |
- 区域表(base_region)
CREATE TABLE `base_region` ( |
注意:以上的建表语句是在MySQL中完成的,完整的建表及模拟数据生成脚本见:
链接:https://pan.baidu.com/s/1fcMgDHGKedOpzqLbSRUGwA 提取码:zuqw
数据处理流程
ODS层数据同步
关于ODS层的数据同步参见我的另一篇文章基于Canal与Flink实现数据实时增量同步(一)。主要使用canal解析MySQL的binlog日志,然后将其写入到Kafka对应的topic中。由于篇幅限制,不会对具体的细节进行说明。同步之后的结果如下图所示:
DIM层维表数据准备
本案例中将维表存储在了MySQL中,实际生产中会用HBase存储维表数据。我们主要用到两张维表:区域维表和商品维表。处理过程如下:
- 区域维表
首先将mydw.base_province
和mydw.base_region
这个主题对应的数据抽取到MySQL中,主要使用Flink SQL的Kafka数据源对应的canal-json格式,注意:在执行装载之前,需要先在MySQL中创建对应的表,本文使用的MySQL数据库的名字为dim,用于存放维表数据。如下:
-- ------------------------- |
经过上面的步骤,将创建维表所需要的原始数据已经存储到了MySQL中,接下来就需要在MySQL中创建维表,我们使用上面的两张表,创建一张视图:dim_province
作为维表:
-- --------------------------------- |
这样我们所需要的维表:dim_province就创建好了,只需要在维表join时,使用Flink SQL创建JDBC的数据源,就可以使用该维表了。同理,我们使用相同的方法创建商品维表,具体如下:
-- ------------------------- |
经过上面的步骤,我们可以将创建商品维表的基础数据表同步到MySQL中,同样需要提前创建好对应的数据表。接下来我们使用上面的基础表在mySQL的dim库中创建一张视图:dim_sku_info
,用作后续使用的维表。
-- --------------------------------- |
至此,我们所需要的维表数据已经准备好了,接下来开始处理DWD层的数据。
DWD层数据处理
经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。具体过程如下:
-- ------------------------- |
ADS层数据
经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了Kafka中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。
- ads_province_index
首先在MySQL中创建对应的ADS目标表:ads_province_index
CREATE TABLE ads.ads_province_index( |
向MySQL的ADS层目标装载数据:
-- Flink SQL Cli操作 |
当提交任务之后:观察Flink WEB UI:
查看ADS层的ads_province_index表数据:
- ads_sku_index
首先在MySQL中创建对应的ADS目标表:ads_sku_index
CREATE TABLE ads_sku_index |
向MySQL的ADS层目标装载数据:
-- --------------------------------- |
当提交任务之后:观察Flink WEB UI:
查看ADS层的ads_sku_index表数据:
FineBI结果展示
其他注意点
Flink1.11.0存在的bug
当在代码中使用Flink1.11.0版本时,如果将一个change-log的数据源insert到一个upsert sink时,会报如下异常:
[ERROR] Could not execute SQL statement. Reason: |
该bug目前已被修复,修复可以在Flink1.11.1中使用。
总结
本文主要分享了构建一个实时数仓的demo案例,通过本文可以了解实时数仓的数据处理流程,在此基础之上,对Flink SQL的CDC会有更加深刻的认识。另外,本文给出了非常详细的使用案例,你可以直接上手进行操作,在实践中探索实时数仓的构建流程。
公众号『大数据技术与数仓』,回复『资料』领取大数据资料包
- 本文链接:https://jiamaoxiang.top/2020/08/12/实时数仓-基于Flink1-11的SQL构建实时数仓探索实践/
- 版权声明:本文为博主原创文章,遵循CC BY-SA 4.0版权协议,转载请附上原文出处链接和本声明
分享