篇五|ClickHouse数据导入(Flink、Spark、Kafka、MySQL、Hive)
本文分享主要是ClickHouse的数据导入方式,本文主要介绍如何使用Flink、Spark、Kafka、MySQL、Hive将数据导入ClickHouse,具体内容包括:
- 使用Flink导入数据
- 使用Spark导入数据
- 从Kafka中导入数据
- 从MySQL中导入数据
- 从Hive中导入数据
使用Flink导入数据
本文介绍使用 flink-jdbc将数据导入ClickHouse,Maven依赖为:
<dependency> |
示例
本示例使用Kafka connector,通过Flink将Kafka数据实时导入到ClickHouse
public class FlinkSinkClickHouse { |
Note:
- 由于 ClickHouse 单次插入的延迟比较高,我们需要设置
BatchSize
来批量插入数据,提高性能。- 在 JDBCAppendTableSink 的实现中,若最后一批数据的数目不足
BatchSize
,则不会插入剩余数据。
使用Spark导入数据
本文主要介绍如何通过Spark程序写入数据到Clickhouse中。
<dependency> |
示例
object Spark2ClickHouseExample { |
从Kafka中导入数据
主要是使用ClickHouse的表引擎。
使用方式
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] |
kafka_broker_list
:逗号分隔的brokers地址 (localhost:9092).kafka_topic_list
:Kafka 主题列表,多个主题用逗号分隔.kafka_group_name
:消费者组.kafka_format
– Message format. 比如JSONEachRow
、JSON、CSV等等
使用示例
在kafka中创建user_behavior主题,并向该主题写入数据,数据示例为:
{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919} |
在ClickHouse中创建表,选择表引擎为Kafka(),如下:
CREATE TABLE kafka_user_behavior ( |
通过物化视图将kafka数据导入ClickHouse
当我们一旦查询完毕之后,ClickHouse会删除表内的数据,其实Kafka表引擎只是一个数据管道,我们可以通过物化视图的方式访问Kafka中的数据。
- 首先创建一张Kafka表引擎的表,用于从Kafka中读取数据
- 然后再创建一张普通表引擎的表,比如MergeTree,面向终端用户使用
- 最后创建物化视图,用于将Kafka引擎表实时同步到终端用户所使用的表中
-- 创建Kafka引擎表 |
Note:
Kafka消费表不能直接作为结果表使用。Kafka消费表只是用来消费Kafka数据,没有真正的存储所有数据。
从MySQL中导入数据
同kafka中导入数据类似,ClickHouse同样支持MySQL表引擎,即映射一张MySQL中的表到ClickHouse中。
数据类型对应关系
MySQL中数据类型与ClickHouse类型映射关系如下表。
MySQL | ClickHouse |
---|---|
UNSIGNED TINYINT | UInt8 |
TINYINT | Int8 |
UNSIGNED SMALLINT | UInt16 |
SMALLINT | Int16 |
UNSIGNED INT, UNSIGNED MEDIUMINT | UInt32 |
INT, MEDIUMINT | Int32 |
UNSIGNED BIGINT | UInt64 |
BIGINT | Int64 |
FLOAT | Float32 |
DOUBLE | Float64 |
DATE | Date |
DATETIME, TIMESTAMP | DateTime |
BINARY | FixedString |
使用方式
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] |
使用示例
-- 连接MySQL中clickhouse数据库的test表 |
注意:对于MySQL表引擎,不支持UPDATE和DELETE操作,比如执行下面命令时,会报错:
-- 执行更新 |
从Hive中导入数据
本文使用Waterdrop进行数据导入,Waterdrop是一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上。Waterdrop拥有着非常丰富的插件,支持从Kafka、HDFS、Kudu中读取数据,进行各种各样的数据处理,并将结果写入ClickHouse、Elasticsearch或者Kafka中。
我们仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入。配置文件包括四个部分,分别是Spark、Input、filter和Output。
关于Waterdrop的安装,十分简单,只需要下载ZIP文件,解压即可。使用Waterdrop需要安装Spark。
在Waterdrop安装目录的config/文件夹下创建配置文件:hive_table_batch.conf,内容如下。主要包括四部分:Spark、Input、filter和Output。
Spark部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。
Input部分是定义数据源,其中
pre_sql
是从Hive中读取数据SQL,table_name
是将读取后的数据,注册成为Spark中临时表的表名,可为任意字段。filter部分配置一系列的转化,比如过滤字段
Output部分是将处理好的结构化数据写入ClickHouse,ClickHouse的连接配置。
需要注意的是,必须保证hive的metastore是在服务状态。
spark { |
- 执行任务
[kms@kms-1 waterdrop-1.5.1]$ bin/start-waterdrop.sh --config config/hive_table_batch.conf --master yarn --deploy-mode cluster |
这样就会启动一个Spark作业执行数据的抽取,等执行完成之后,查看ClickHouse的数据。
总结
本文主要介绍了如何通过Flink、Spark、Kafka、MySQL以及Hive,将数据导入到ClickHouse,对每一种方式都出了详细的示例,希望对你有所帮助。
公众号『大数据技术与数仓』,回复『资料』领取大数据资料包
相关推荐 ☟
- 本文链接:https://jiamaoxiang.top/2020/11/17/第五篇-ClickHouse数据导入-Flink、Spark、Kafka、MySQL/
- 版权声明:本文为博主原创文章,遵循CC BY-SA 4.0版权协议,转载请附上原文出处链接和本声明
分享