在上一篇分享Flink集成Hive之快速入门–以Flink1.12为例中,介绍了Flink集成Hive的进本步骤。本文分享,将继续介绍Flink集成Hive的另外两个概念:Hive Catalog与Hive Dialect。本文包括以下内容,希望对你有所帮助。

  • 什么是Hive Catalog
  • 如何使用Hive Catalog
  • 什么是Hive Dialect
  • 如何使用Hive Dialect

公众号『大数据技术与数仓』,回复『资料』领取大数据资料包

什么是Hive Catalog

我们知道,Hive使用Hive Metastore(HMS)存储元数据信息,使用关系型数据库来持久化存储这些信息。所以,Flink集成Hive需要打通Hive的metastore,去管理Flink的元数据,这就是Hive Catalog的功能。

Hive Catalog的主要作用是使用Hive MetaStore去管理Flink的元数据。Hive Catalog可以将元数据进行持久化,这样后续的操作就可以反复使用这些表的元数据,而不用每次使用时都要重新注册。如果不去持久化catalog,那么在每个session中取处理数据,都要去重复地创建元数据对象,这样是非常耗时的。

如何使用Hive Catalog

HiveCatalog是开箱即用的,所以,一旦配置好Flink与Hive集成,就可以使用HiveCatalog。比如,我们通过FlinkSQL 的DDL语句创建一张kafka的数据源表,立刻就能查看该表的元数据信息。

HiveCatalog可以处理两种类型的表:一种是Hive兼容的表,另一种是普通表(generic table)。其中Hive兼容表是以兼容Hive的方式来存储的,所以,对于Hive兼容表而言,我们既可以使用Flink去操作该表,又可以使用Hive去操作该表。

普通表是对Flink而言的,当使用HiveCatalog创建一张普通表,仅仅是使用Hive MetaStore将其元数据进行了持久化,所以可以通过Hive查看这些表的元数据信息(通过DESCRIBE FORMATTED命令),但是不能通过Hive去处理这些表,因为语法不兼容。

对于是否是普通表,Flink使用is_generic属性进行标识。默认情况下,创建的表是普通表,即is_generic=true,如果要创建Hive兼容表,需要在建表属性中指定is_generic=false

尖叫提示:

由于依赖Hive Metastore,所以必须开启Hive MetaStore服务

代码中使用Hive Catalog

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
// 使用注册的catalog
tableEnv.useCatalog("myhive");

在FlinkSQL Cli中使用Hive Catalog很简单,只需要配置一下sql-cli-defaults.yaml文件即可。配置内容如下:

catalogs:
- name: myhive
type: hive
default-database: default
hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf

在FlinkSQL Cli中创建一张kafka表,该表默认为普通表,即is_generic=true

CREATE TABLE user_behavior ( 
`user_id` BIGINT, -- 用户id
`item_id` BIGINT, -- 商品id
`cat_id` BIGINT, -- 品类id
`action` STRING, -- 用户行为
`province` INT, -- 用户所在的省份
`ts` BIGINT, -- 用户行为发生的时间戳
`proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列
`eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定义watermark
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behavior', -- kafka主题
'scan.startup.mode' = 'earliest-offset', -- 偏移量
'properties.group.id' = 'group1', -- 消费者组
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
'format' = 'json', -- 数据源格式为json
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
);

我们可以在Hive客户端中查看该表的元数据信息

hive (default)> desc formatted  user_behavior;
Table Parameters:
...
is_generic true
...

从上面的元数据信息可以看出,is_generic=true,说明该表是一张普通表,如果在Hive中去查看该表,则会报错。

上面创建的表是普通表,该表不能使用Hive去查询。那么,该如何创建一张Hive兼容表呢?我们只需要在建表的属性中显示指定is_generic=false即可,具体如下:

CREATE TABLE hive_compatible_tbl ( 
`user_id` BIGINT, -- 用户id
`item_id` BIGINT, -- 商品id
`cat_id` BIGINT, -- 品类id
`action` STRING, -- 用户行为
`province` INT, -- 用户所在的省份
`ts` BIGINT -- 用户行为发生的时间戳
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behavior', -- kafka主题
'scan.startup.mode' = 'earliest-offset', -- 偏移量
'properties.group.id' = 'group1', -- 消费者组
'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',
'format' = 'json', -- 数据源格式为json
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false',
'is_generic' = 'false'
);

当我们在Hive中查看该表的元数据信息时,可以看出:is_generic =false

hive (default)> desc formatted hive_compatible_tbl;
Table Parameters:
...
is_generic false
...

我们可以使用FlinkSQL Cli或者HiveCli向该表中写入数据,然后分别通过FlinkSQL Cli和Hive Cli去查看该表数据的变化

hive (default)> insert into hive_compatible_tbl select 2020,1221,100,'buy',11,1574330486;
hive (default)> select * from hive_compatible_tbl;

再在FlinkSQL Cli中查看该表,

Flink SQL> select user_id,item_id,action from hive_compatible_tbl;
user_id item_id action
2020 1221 buy

同样,我们可以在FlinkSQL Cli中去向该表中写入数据:

Flink SQL>  insert into hive_compatible_tbl select 2020,1222,101,'fav',11,1574330486;
Flink SQL> select user_id,item_id,action from hive_compatible_tbl;

user_id item_id action
2020 1221 buy
2020 1222 fav

尖叫提示:

对于Hive兼容的表,需要注意数据类型,具体的数据类型对应关系以及注意点如下

Flink 数据类型 Hive 数据类型
CHAR(p) CHAR(p)
VARCHAR(p) VARCHAR(p)
STRING STRING
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT LONG
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(p, s) DECIMAL(p, s)
DATE DATE
TIMESTAMP(9) TIMESTAMP
BYTES BINARY
ARRAY LIST
MAP<K, V> MAP<K, V>
ROW STRUCT

注意

  • Hive CHAR(p) 类型的最大长度为255
  • Hive VARCHAR(p)类型的最大长度为65535
  • Hive MAP类型的key仅支持基本类型,而Flink’s MAP 类型的key执行任意类型
  • Hive不支持联合数据类型,比如STRUCT
  • Hive’s TIMESTAMP 的精度是 9 , Hive UDFs函数只能处理 precision <= 9的 TIMESTAMP
  • Hive 不支持 Flink提供的 TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, 及MULTISET类型
  • FlinkINTERVAL 类型与 Hive INTERVAL 类型不一样

上面介绍了普通表和Hive兼容表,那么我们该如何使用Hive的语法进行建表呢?这个时候就需要使用Hive Dialect

什么是Hive Dialect

从Flink1.11.0开始,只要开启了Hive dialect配置,用户就可以使用HiveQL语法,这样我们就可以在Flink中使用Hive的语法使用一些DDL和DML操作。

Flink目前支持两种SQL方言(SQL dialects),分别为:default和hive。默认的SQL方言是default,如果要使用Hive的语法,需要将SQL方言切换到hive

如何使用Hive Dialect

在SQL Cli中使用Hive dialect

使用hive dialect只需要配置一个参数即可,该参数名称为:table.sql-dialect。我们就可以在sql-client-defaults.yaml配置文件中进行配置,也可以在具体的会话窗口中进行设定,对于SQL dialect的切换,不需要进行重启session。

execution:
planner: blink
type: batch
result-mode: table

configuration:
table.sql-dialect: hive

如果我们需要在SQL Cli中进行切换hive dialect,可以使用如下命令:

Flink SQL> set table.sql-dialect=hive; -- 使用hive dialect
Flink SQL> set table.sql-dialect=default; -- 使用default dialect

尖叫提示:

一旦切换到了hive dialect,就只能使用Hive的语法建表,如果尝试使用Flink的语法建表,则会报错

在Table API中配合dialect

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 使用hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 使用 default dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

操作示例

Flink SQL> set table.sql-dialect=hive;
-- 使用Hive语法创建一张表
CREATE TABLE IF NOT EXISTS `hive_dialect_tbl` (
`id` int COMMENT 'id',
`name` string COMMENT '名称',
`age` int COMMENT '年龄'
)
COMMENT 'hive dialect表测试'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

进入Hive客户端去查看该表的元数据信息

desc formatted hive_dialect_tbl;
col_name data_type comment
# col_name data_type comment

id int
name string
age int

# Detailed Table Information
Database: default
Owner: null
CreateTime: Mon Dec 21 17:23:48 CST 2020
LastAccessTime: UNKNOWN
Retention: 0
Location: hdfs://kms-1.apache.com:8020/user/hive/warehouse/hive_dialect_tbl
Table Type: MANAGED_TABLE
Table Parameters:
comment hive dialect表测试
is_generic false
transient_lastDdlTime 1608542628

# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: org.apache.hadoop.mapred.TextInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
field.delim ,
serialization.format ,

很明显,该表是一张Hive兼容表,即is_generic=false

使用FlinkSQLCli向该表中写入一条数据:

Flink SQL> insert into hive_dialect_tbl select 1,'tom',20;

我们也可以在Hive的Cli中去操作该表

hive (default)> select * from hive_dialect_tbl;
hive (default)> insert into hive_dialect_tbl select 2,'jack',22;

以下是使用Hive方言的一些注意事项。

  • Hive dialect只能用于操作Hive表,不能用于普通表。Hive方言应与HiveCatalog一起使用。
  • 虽然所有Hive版本都支持相同的语法,但是是否有特定功能仍然取决于使用的Hive版本。例如,仅在Hive-2.4.0或更高版本中支持更新数据库位置。
  • Hive和Calcite具有不同的保留关键字。例如,default在Calcite中是保留关键字,在Hive中是非保留关键字。所以,在使用Hive dialect时,必须使用反引号(`)引用此类关键字,才能将其用作标识符。
  • 在Hive中不能查询在Flink中创建的视图。

当然,一旦开启了Hive dialect,我们就可以按照Hive的操作方式在Flink中去处理Hive的数据了,具体的操作与Hive一致,本文不再赘述。

总结

本文主要介绍了Hive Catalog和Hive Dialect。其中Hive Catalog的作用是持久化Flink的元数据信息,Hive Dialect是支持Hive语法的一个配置参数,这两个概念是Flink集成Hive的关键。下一篇分享将介绍如何使用Flink读写Hive。

公众号『大数据技术与数仓』,回复『资料』领取大数据资料包